Skip to content
Snippets Groups Projects

proto: Make DataWriter::close actually do something.

Merged Nick Mathewson requested to merge nickm/arti:stream_close_v2 into main
4 unresolved threads

Previously we had a bug where <DataWriter as AsyncWrite>::close (or shutdown in tokio-land) would not actually have any effect. It would drop the StreamTarget held by the DataWriter, but since the DataReader also held a StreamTarget, the MPSC channel would not get closed, and the circuit reactor would not realize that the stream wanted to shut down.

Now we use mpsc::Sender::close_channel to make our closes effectual.

Closes #1368 (closed).

Merge request reports

Loading
Loading

Activity

Filter activity
  • Approvals
  • Assignees & reviewers
  • Comments (from bots)
  • Comments (from users)
  • Commits & branches
  • Edits
  • Labels
  • Lock status
  • Mentions
  • Merge request status
  • Tracking
564 564 self.state = Some(DataWriterState::Closed);
565 565 Poll::Ready(Err(e.into()))
566 566 }
567 Poll::Ready((imp, Ok(()))) => {
567 Poll::Ready((mut imp, Ok(()))) => {
568 568 if should_close {
569 // Tell the StreamTarget to close, so that the reactor
570 // realizes that we are done sending. (The Reader has its own
571 // StreamTarget, so just dropping ours would not be sufficient for
  • It's a little surprising that the reader has its own StreamTarget. Digging a little, I see that it's so that it can signal that the application is reading (by sending sendmes), and so that it can signal protocol errors.

    The former might eventually go away once we no longer support pre-prop324 flow control, but that's a long ways off. I suppose it'd still need it for signaling errors though, barring some further refactor to move that up a level or something. In any case, I agree that doing an explicit close seems more robust than just dropping the object.

    Maybe tweak the comment here along the lines of "it's difficult to be certain that no other clones of the sender, and in particular StreamReader currently has its own copy"? It'd also be helpful to comment the target field in StreamReader to explain a bit why it has it.

  • Nick Mathewson changed this line in version 3 of the diff

    changed this line in version 3 of the diff

  • Please register or sign in to reply
  • Jim Newsome approved this merge request

    approved this merge request

  • Nick Mathewson added 3 commits

    added 3 commits

    • f2bb7092 - proto: Add a test for closing streams.
    • 1f036c5e - squash! proto: Make DataWriter::close actually do something.
    • d86cc7bc - proto: Try to clarify why StreamReader has a StreamTarget.

    Compare with previous version

  • Nick Mathewson marked this merge request as draft from nickm/arti@1f036c5e

    marked this merge request as draft from nickm/arti@1f036c5e

  • Pushed a few more commits to address your comments. In particular, it seems that there was a second bug that had been keeping close() from working. With both bugs fixed, the new tests pass.

  • Nick Mathewson requested review from @jnewsome and removed approval

    requested review from @jnewsome and removed approval

  • Jim Newsome
    Jim Newsome @jnewsome started a thread on commit f2bb7092
  • 2037 let (r, mut w) = stream.split();
    2038 if by_drop {
    2039 // Drop the writer and the reader, which should close the stream.
    2040 drop(r);
    2041 drop(w);
    2042 (None, circ) // make sure to keep the circuit alive
    2043 } else {
    2044 // Call close on the writer, while keeping the reader alive.
    2045 w.close().await.unwrap();
    2046 (Some(r), circ)
    2047 }
    2048 };
    2049 let handler_fut = async {
    2050 // Read the BEGIN message.
    2051 let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
    2052 let rmsg = match msg {
    • Optional nit: can unnest with refutable let.

      let AnyChanMsg::Relay(r) = msg else {
        panic!()
      };
      let rmsg = AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body()).unwrap();
    • Please register or sign in to reply
  • Jim Newsome
    Jim Newsome @jnewsome started a thread on commit f2bb7092
  • 2054 AnyRelayMsgOuter::decode_singleton(RelayCellFormat::V0, r.into_relay_body())
    2055 .unwrap()
    2056 }
    2057 _ => panic!(),
    2058 };
    2059 let (streamid, rmsg) = rmsg.into_streamid_and_msg();
    2060 assert_eq!(rmsg.cmd(), RelayCmd::BEGIN);
    2061
    2062 // Reply with a CONNECTED.
    2063 let connected =
    2064 relaymsg::Connected::new_with_addr("10.0.0.1".parse().unwrap(), 1234).into();
    2065 sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
    2066
    2067 // Expect an END.
    2068 let (_, msg) = rx.next().await.unwrap().into_circid_and_msg();
    2069 let rmsg = match msg {
  • Jim Newsome approved this merge request

    approved this merge request

  • Nick Mathewson added 4 commits

    added 4 commits

    • 6739f337 - proto: Make DataWriter::close actually do something.
    • 99dcd037 - proto: Improve documentation about DataStream lifetimes and closing
    • e358e5f8 - proto: Add a test for closing streams.
    • b077b68e - proto: Try to clarify why StreamReader has a StreamTarget.

    Compare with previous version

  • Squashed!

  • Nick Mathewson marked this merge request as ready

    marked this merge request as ready

  • @nickm invited me to take a look at this.

    I have reviewed the new API docs and commentary. This all looks good to me. I think the behaviour described is the desired behaviour.

    I haven't reviewed the implementation in detail, nor double checked that the new test case expects the same behaviour as is documented. I'm trusting @jnewsome to do that as part of his review :-).

  • Nick Mathewson mentioned in commit ee96fa6f

    mentioned in commit ee96fa6f

  • Please register or sign in to reply
    Loading