Skip to content
GitLab
Menu
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
The Tor Project
Network Health
Metrics
Collector
Commits
38079c96
Commit
38079c96
authored
Jan 11, 2022
by
Hiro
🏄
Browse files
Modify downloader algorithm
Edit downloader tests to account for changes
parent
806b47ca
Pipeline
#23392
failed with stage
in 2 minutes and 55 seconds
Changes
4
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
.gitignore
View file @
38079c96
...
...
@@ -14,5 +14,5 @@ cobertura.ser
*~
build.properties
/index/
/indexed/
/.idea/
src/main/java/org/torproject/metrics/collector/downloader/Downloader.java
View file @
38079c96
...
...
@@ -3,6 +3,9 @@
package
org.torproject.metrics.collector.downloader
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.io.BufferedInputStream
;
import
java.io.ByteArrayOutputStream
;
import
java.io.IOException
;
...
...
@@ -15,6 +18,12 @@ import java.util.zip.InflaterInputStream;
*/
public
class
Downloader
{
/**
* Logger for this class.
*/
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
Downloader
.
class
);
/**
* Download the given URL from an HTTP server and return downloaded bytes.
*
...
...
@@ -43,20 +52,25 @@ public class Downloader {
huc
.
setReadTimeout
(
5000
);
huc
.
connect
();
int
response
=
huc
.
getResponseCode
();
if
(
response
!=
200
)
{
return
null
;
String
message
=
huc
.
getResponseMessage
();
if
(
message
.
contains
(
"Servers unavailable"
))
{
throw
new
IOException
(
"404 Servers unavailable"
);
}
try
(
BufferedInputStream
in
=
isDeflated
?
new
BufferedInputStream
(
new
InflaterInputStream
(
huc
.
getInputStream
()))
:
new
BufferedInputStream
(
huc
.
getInputStream
()))
{
int
len
;
byte
[]
data
=
new
byte
[
1024
];
while
((
len
=
in
.
read
(
data
,
0
,
1024
))
>=
0
)
{
downloadedBytes
.
write
(
data
,
0
,
len
);
else
if
(
response
==
200
)
{
try
(
BufferedInputStream
in
=
isDeflated
?
new
BufferedInputStream
(
new
InflaterInputStream
(
huc
.
getInputStream
()))
:
new
BufferedInputStream
(
huc
.
getInputStream
()))
{
int
len
;
byte
[]
data
=
new
byte
[
1024
];
while
((
len
=
in
.
read
(
data
,
0
,
1024
))
>=
0
)
{
downloadedBytes
.
write
(
data
,
0
,
len
);
}
}
return
downloadedBytes
.
toByteArray
();
}
else
{
return
null
;
}
return
downloadedBytes
.
toByteArray
();
}
}
src/main/java/org/torproject/metrics/collector/relaydescs/RelayDescriptorDownloader.java
View file @
38079c96
...
...
@@ -454,10 +454,11 @@ public class RelayDescriptorDownloader {
this
.
downloadAllDescriptorsFromAuthorities
.
add
(
authority
);
}
/* I am not sure considering more than 2 authrity per run is improving the
* algorigthm but is certainly increasing badnwidth */
if
(
this
.
downloadAllDescriptorsFromAuthorities
.
size
()
>=
4
)
{
break
;
}
* algorigthm but is certainly increasing badnwidth
* if (this.downloadAllDescriptorsFromAuthorities.size() >= 2) {
* break;
* }
*/
}
/* Prepare statistics on this execution. */
...
...
@@ -681,59 +682,70 @@ public class RelayDescriptorDownloader {
* that we distribute the load somewhat fairly over time. */
for
(
String
authority
:
authorities
)
{
/* Make all requests to an authority in a single try block. If
* something goes wrong with this authority, we give up on all
* downloads and continue with the next authority. */
/* TODO Some authorities provide very little bandwidth and could
* slow down the entire download process. Ponder adding a timeout of
* 3 or 5 minutes per authority to avoid getting in the way of the
* next execution. */
try
{
/* Start with downloading the current consensus, unless we already
* have it. */
if
(
downloadCurrentConsensus
)
{
if
(
this
.
missingDescriptors
.
containsKey
(
consensusKey
)
&&
this
.
missingDescriptors
.
get
(
consensusKey
).
equals
(
"NA"
))
{
this
.
requestedConsensuses
++;
/* Start with downloading the current consensus, unless we already
* have it. */
if
(
downloadCurrentConsensus
)
{
if
(
this
.
missingDescriptors
.
containsKey
(
consensusKey
)
&&
this
.
missingDescriptors
.
get
(
consensusKey
).
equals
(
"NA"
))
{
this
.
requestedConsensuses
++;
try
{
this
.
downloadedConsensuses
+=
this
.
downloadResourceFromAuthority
(
authority
,
"/tor/status-vote/current/consensus"
);
}
catch
(
IOException
e
)
{
logger
.
warn
(
"Failed downloading consensus from {}!"
,
authority
,
e
);
}
}
}
/* Then try to download the microdesc consensus. */
if
(
downloadCurrentMicrodescConsensus
)
{
if
(
this
.
missingDescriptors
.
containsKey
(
microdescConsensusKey
)
&&
this
.
missingDescriptors
.
get
(
microdescConsensusKey
)
.
equals
(
"NA"
))
{
this
.
requestedMicrodescConsensuses
++;
/* Then try to download the microdesc consensus. */
if
(
downloadCurrentMicrodescConsensus
)
{
if
(
this
.
missingDescriptors
.
containsKey
(
microdescConsensusKey
)
&&
this
.
missingDescriptors
.
get
(
microdescConsensusKey
)
.
equals
(
"NA"
))
{
this
.
requestedMicrodescConsensuses
++;
try
{
this
.
downloadedMicrodescConsensuses
+=
this
.
downloadResourceFromAuthority
(
authority
,
"/tor/status-vote/current/consensus-microdesc"
);
}
catch
(
IOException
e
)
{
logger
.
warn
(
"Failed downloading consensus microdesc"
+
" from {}!"
,
authority
,
e
);
}
}
}
/* Next, try to download current votes that we're missing. */
if
(
downloadCurrentVotes
)
{
String
voteKeyPrefix
=
"vote,"
+
this
.
currentValidAfter
;
SortedSet
<
String
>
fingerprints
=
new
TreeSet
<>();
for
(
Map
.
Entry
<
String
,
String
>
e
:
this
.
missingDescriptors
.
entrySet
())
{
if
(
e
.
getValue
().
equals
(
"NA"
)
&&
e
.
getKey
().
startsWith
(
voteKeyPrefix
))
{
String
fingerprint
=
e
.
getKey
().
split
(
","
)[
2
];
fingerprints
.
add
(
fingerprint
);
}
/* Next, try to download current votes that we're missing. */
if
(
downloadCurrentVotes
)
{
String
voteKeyPrefix
=
"vote,"
+
this
.
currentValidAfter
;
SortedSet
<
String
>
fingerprints
=
new
TreeSet
<>();
for
(
Map
.
Entry
<
String
,
String
>
e
:
this
.
missingDescriptors
.
entrySet
())
{
if
(
e
.
getValue
().
equals
(
"NA"
)
&&
e
.
getKey
().
startsWith
(
voteKeyPrefix
))
{
String
fingerprint
=
e
.
getKey
().
split
(
","
)[
2
];
fingerprints
.
add
(
fingerprint
);
}
for
(
String
fingerprint
:
fingerprints
)
{
this
.
requestedVotes
++;
}
for
(
String
fingerprint
:
fingerprints
)
{
this
.
requestedVotes
++;
try
{
this
.
downloadedVotes
+=
this
.
downloadResourceFromAuthority
(
authority
,
"/tor/status-vote/current/"
+
fingerprint
);
}
catch
(
IOException
e
)
{
logger
.
warn
(
"Failed downloading current vote"
+
" from {}!"
,
authority
,
e
);
}
}
}
try
{
/* Now try to download the bandwidth file, regardless of whether this
* authority might provide one or when we last downloaded a bandwidth
* file from it. */
...
...
@@ -742,19 +754,23 @@ public class RelayDescriptorDownloader {
this
.
downloadResourceFromAuthority
(
authority
,
"/tor/status-vote/next/bandwidth"
);
/* Download either all server and extra-info descriptors or only
* those that we're missing. Start with server descriptors, then
* request extra-info descriptors. Finally, request missing
* microdescriptors. */
for
(
String
type
:
new
String
[]
{
"server"
,
"extra"
,
"micro"
})
{
/* Download all server or extra-info descriptors from this
* authority if we haven't done so for 24 hours and if we're
* configured to do so. */
if
(
this
.
downloadAllDescriptorsFromAuthorities
.
contains
(
authority
)
&&
((
type
.
equals
(
"server"
)
&&
this
.
downloadAllServerDescriptors
)
||
(
type
.
equals
(
"extra"
)
&&
this
.
downloadAllExtraInfos
)))
{
}
catch
(
IOException
e
)
{
logger
.
warn
(
"Failed downloading bandwidth file from {}!"
,
authority
,
e
);
}
/* Download either all server and extra-info descriptors or only
* those that we're missing. Start with server descriptors, then
* request extra-info descriptors. Finally, request missing
* microdescriptors. */
for
(
String
type
:
new
String
[]
{
"server"
,
"extra"
,
"micro"
})
{
/* Download all server or extra-info descriptors from this
* authority if we haven't done so for 24 hours and if we're
* configured to do so. */
if
(
this
.
downloadAllDescriptorsFromAuthorities
.
contains
(
authority
)
&&
((
type
.
equals
(
"server"
)
&&
this
.
downloadAllServerDescriptors
)
||
(
type
.
equals
(
"extra"
)
&&
this
.
downloadAllExtraInfos
)))
{
try
{
int
downloadedAllDescriptors
=
this
.
downloadResourceFromAuthority
(
authority
,
"/tor/"
+
type
+
"/all"
);
...
...
@@ -767,15 +783,19 @@ public class RelayDescriptorDownloader {
this
.
downloadedAllExtraInfoDescriptors
+=
downloadedAllDescriptors
;
}
/* Download missing server descriptors, extra-info descriptors,
* and microdescriptors if we're configured to do so. */
}
else
if
((
type
.
equals
(
"server"
)
}
catch
(
IOException
e
)
{
logger
.
warn
(
"Failed downloading all server or extra-info "
+
"descriptors files from {}!"
,
authority
,
e
);
}
/* Download missing server descriptors, extra-info descriptors,
* and microdescriptors if we're configured to do so. */
}
else
if
((
type
.
equals
(
"server"
)
&&
this
.
downloadMissingServerDescriptors
)
||
(
type
.
equals
(
"extra"
)
&&
this
.
downloadMissingExtraInfos
)
||
(
type
.
equals
(
"micro"
)
&&
this
.
downloadMissingMicrodescriptors
))
{
try
{
/* Go through the list of missing descriptors of this type
* and combine the descriptor identifiers to a URL of up to
* 96 server or extra-info descriptors or 92 microdescriptors
...
...
@@ -845,13 +865,11 @@ public class RelayDescriptorDownloader {
/* Unknown type, nothing to do. */
break
;
}
}
catch
(
IOException
e
)
{
logger
.
warn
(
"Failed downloading "
+
"descriptors files from {}!"
,
authority
,
e
);
}
}
/* If a download failed, stop requesting descriptors from this
* authority and move on to the next. */
}
catch
(
IOException
e
)
{
logger
.
warn
(
"Failed downloading from {}!"
,
authority
,
e
);
}
}
}
...
...
@@ -887,9 +905,9 @@ public class RelayDescriptorDownloader {
authority
);
}
else
{
try
{
logger
.
warn
(
"Downloaded 0 bytes from {}"
+
"will wait
1 minute
to avoid DOS protections."
,
authority
);
TimeUnit
.
MINUTE
S
.
sleep
(
1
);
logger
.
warn
(
"Downloaded 0 bytes from {}
"
+
"will wait
45 SECONDS
to avoid DOS protections."
,
authority
);
TimeUnit
.
SECOND
S
.
sleep
(
45
);
allData
=
Downloader
.
downloadFromHttpServer
(
url
,
isCompressed
);
logger
.
warn
(
"Downloaded {} -> ({} bytes)"
,
fullUrl
,
allData
==
null
?
0
:
allData
.
length
);
...
...
src/test/java/org/torproject/metrics/collector/downloader/DownloaderTest.java
View file @
38079c96
...
...
@@ -15,6 +15,7 @@ import org.junit.BeforeClass;
import
org.junit.Test
;
import
java.io.ByteArrayInputStream
;
import
java.io.IOException
;
import
java.net.HttpURLConnection
;
import
java.net.SocketTimeoutException
;
import
java.net.URL
;
...
...
@@ -87,20 +88,22 @@ public class DownloaderTest {
HttpURLConnection
urlConnection
=
mock
(
HttpURLConnection
.
class
);
httpUrlStreamHandler
.
addConnection
(
requestedUrl
,
urlConnection
);
given
(
urlConnection
.
getResponseCode
()).
willReturn
(
200
);
given
(
urlConnection
.
getResponseMessage
()).
willReturn
(
"OK"
);
given
(
urlConnection
.
getInputStream
()).
willReturn
(
new
ByteArrayInputStream
(
expectedDownloadedBytes
));
byte
[]
downloadedBytes
=
Downloader
.
downloadFromHttpServer
(
requestedUrl
);
assertArrayEquals
(
expectedDownloadedBytes
,
downloadedBytes
);
}
@Test
@Test
(
expected
=
IOException
.
class
)
public
void
testNonExistingResource
()
throws
Exception
{
URL
requestedUrl
=
new
URL
(
"http://localhost/notfound"
);
IOException
expectedException
=
new
IOException
(
"404 Servers unavailable"
);
HttpURLConnection
urlConnection
=
mock
(
HttpURLConnection
.
class
);
httpUrlStreamHandler
.
addConnection
(
requestedUrl
,
urlConnection
);
given
(
urlConnection
.
getResponseMessage
()).
willReturn
(
"Servers unavailable"
);
given
(
urlConnection
.
getResponseCode
()).
willReturn
(
404
);
byte
[]
downloadedBytes
=
Downloader
.
downloadFromHttpServer
(
requestedUrl
);
assertNull
(
downloadedBytes
);
}
@Test
...
...
@@ -110,6 +113,7 @@ public class DownloaderTest {
HttpURLConnection
urlConnection
=
mock
(
HttpURLConnection
.
class
);
httpUrlStreamHandler
.
addConnection
(
requestedUrl
,
urlConnection
);
given
(
urlConnection
.
getResponseCode
()).
willReturn
(
200
);
given
(
urlConnection
.
getResponseMessage
()).
willReturn
(
"OK"
);
given
(
urlConnection
.
getInputStream
()).
willReturn
(
new
ByteArrayInputStream
(
expectedDownloadedBytes
));
byte
[]
downloadedBytes
=
Downloader
.
downloadFromHttpServer
(
requestedUrl
);
...
...
@@ -123,9 +127,9 @@ public class DownloaderTest {
HttpURLConnection
urlConnection
=
mock
(
HttpURLConnection
.
class
);
httpUrlStreamHandler
.
addConnection
(
requestedUrl
,
urlConnection
);
given
(
urlConnection
.
getResponseCode
()).
willReturn
(
200
);
given
(
urlConnection
.
getResponseMessage
()).
willReturn
(
"OK"
);
given
(
urlConnection
.
getInputStream
()).
willThrow
(
expectedException
);
Downloader
.
downloadFromHttpServer
(
requestedUrl
);
fail
(
"Should have thrown a SocketTimeoutException."
);
}
}
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment