Skip to content

Draft: Stream responses in tor-dirmgr

Robin Leander Schröder requested to merge rls/arti:stream-downloads-2 into main

This resolves the TODO:

    // TODO: instead of waiting for all the queries to finish, we
    // could stream the responses back or something.

The changes to the download_attempt function are actually a lot smaller than they seem, many lines just have a different indentation.

Diff of this function before `cargo fmt`
@@ -143,11 +138,11 @@ async fn download_attempt<R: Runtime>(
     state: &mut Box<dyn DirState>,
     parallelism: usize,
 ) -> Result<bool> {
-    let mut changed = false;
     let missing = state.missing_docs();
     let fetched = fetch_multiple(Arc::clone(dirmgr), missing, parallelism).await?;
-    for (client_req, dir_response) in fetched {
+    let (_state, changed) = fetched.fold((state, false), |(state, changed), (client_req, dir_response)| async move {
         let text = dir_response.into_output();
+        let changed = changed | 
         match dirmgr.expand_response_text(&client_req, text).await {
             Ok(text) => {
                 let outcome = state
@@ -155,18 +150,19 @@ async fn download_attempt<R: Runtime>(
                     .await;
                 dirmgr.notify().await;
                 match outcome {
-                    Ok(b) => changed |= b,
+                    Ok(b) => b,
                     // TODO: in this case we might want to stop using this source.
-                    Err(e) => warn!("error while adding directory info: {}", e),
+                    Err(e) => { warn!("error while adding directory info: {}", e); false }
                 }
             }
             Err(e) => {
                 // TODO: in this case we might want to stop using this source.
                 warn!("Error when expanding directory text: {}", e);
+                false
             }
-        }
-    }
-
+        };
+        (state, changed)
+    }).await;
     Ok(changed)
 }

I couldn't manage without the .fold() because of the state variable which is a &mut Box<dyn DirState>. I got this workaround from here. Maybe there's also different ways to express this? (e.g. while if let Some(x) = stream.next().await or something)

This merge request conflicts with !47 (closed) so I'm leaving it as a Draft until we know whether something akin to !47 (closed) will be accepted.

Edited by Robin Leander Schröder

Merge request reports