Draft: Stream responses in tor-dirmgr
This resolves the TODO:
// TODO: instead of waiting for all the queries to finish, we
// could stream the responses back or something.
The changes to the download_attempt
function are actually a lot smaller than they seem, many lines just have a different indentation.
Diff of this function before `cargo fmt`
@@ -143,11 +138,11 @@ async fn download_attempt<R: Runtime>(
state: &mut Box<dyn DirState>,
parallelism: usize,
) -> Result<bool> {
- let mut changed = false;
let missing = state.missing_docs();
let fetched = fetch_multiple(Arc::clone(dirmgr), missing, parallelism).await?;
- for (client_req, dir_response) in fetched {
+ let (_state, changed) = fetched.fold((state, false), |(state, changed), (client_req, dir_response)| async move {
let text = dir_response.into_output();
+ let changed = changed |
match dirmgr.expand_response_text(&client_req, text).await {
Ok(text) => {
let outcome = state
@@ -155,18 +150,19 @@ async fn download_attempt<R: Runtime>(
.await;
dirmgr.notify().await;
match outcome {
- Ok(b) => changed |= b,
+ Ok(b) => b,
// TODO: in this case we might want to stop using this source.
- Err(e) => warn!("error while adding directory info: {}", e),
+ Err(e) => { warn!("error while adding directory info: {}", e); false }
}
}
Err(e) => {
// TODO: in this case we might want to stop using this source.
warn!("Error when expanding directory text: {}", e);
+ false
}
- }
- }
-
+ };
+ (state, changed)
+ }).await;
Ok(changed)
}
I couldn't manage without the .fold()
because of the state
variable which is a &mut Box<dyn DirState>
.
I got this workaround from here.
Maybe there's also different ways to express this? (e.g. while if let Some(x) = stream.next().await
or something)
This merge request conflicts with !47 (closed) so I'm leaving it as a Draft until we know whether something akin to !47 (closed) will be accepted.
Edited by Robin Leander Schröder