diff --git a/.cargo/config.in b/.cargo/config.in index eca625d3e95ce85d7e382e220233a07f2999c6e3..f616234138a6c219a35081e2ac418a7ed52f28cd 100644 --- a/.cargo/config.in +++ b/.cargo/config.in @@ -65,7 +65,7 @@ rev = "21c26326f5f45f415c49eac4ba5bc41a2f961321" [source."https://github.com/kinetiknz/audioipc-2"] git = "https://github.com/kinetiknz/audioipc-2" replace-with = "vendored-sources" -rev = "8b5c7db0f0af54f25a2ee7f7aa55c2a0689c97fc" +rev = "8e3c4c1b1edbd19cfafc833c3254f12731af31c4" [source."https://github.com/jfkthame/mapped_hyph.git"] git = "https://github.com/jfkthame/mapped_hyph.git" diff --git a/Cargo.lock b/Cargo.lock index 8bd2a4f9770b4bfb177071325bfb64bb5e9a2fcf..253c44db29bc2529e444ac38775ba4c23b83a4be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -266,8 +266,9 @@ dependencies = [ [[package]] name = "audioipc2" version = "0.5.0" -source = "git+https://github.com/kinetiknz/audioipc-2?rev=8b5c7db0f0af54f25a2ee7f7aa55c2a0689c97fc#8b5c7db0f0af54f25a2ee7f7aa55c2a0689c97fc" +source = "git+https://github.com/kinetiknz/audioipc-2?rev=8e3c4c1b1edbd19cfafc833c3254f12731af31c4#8e3c4c1b1edbd19cfafc833c3254f12731af31c4" dependencies = [ + "arrayvec 0.7.2", "ashmem", "audio_thread_priority", "bincode", @@ -291,7 +292,7 @@ dependencies = [ [[package]] name = "audioipc2-client" version = "0.5.0" -source = "git+https://github.com/kinetiknz/audioipc-2?rev=8b5c7db0f0af54f25a2ee7f7aa55c2a0689c97fc#8b5c7db0f0af54f25a2ee7f7aa55c2a0689c97fc" +source = "git+https://github.com/kinetiknz/audioipc-2?rev=8e3c4c1b1edbd19cfafc833c3254f12731af31c4#8e3c4c1b1edbd19cfafc833c3254f12731af31c4" dependencies = [ "audio_thread_priority", "audioipc2", @@ -302,7 +303,7 @@ dependencies = [ [[package]] name = "audioipc2-server" version = "0.5.0" -source = "git+https://github.com/kinetiknz/audioipc-2?rev=8b5c7db0f0af54f25a2ee7f7aa55c2a0689c97fc#8b5c7db0f0af54f25a2ee7f7aa55c2a0689c97fc" +source = "git+https://github.com/kinetiknz/audioipc-2?rev=8e3c4c1b1edbd19cfafc833c3254f12731af31c4#8e3c4c1b1edbd19cfafc833c3254f12731af31c4" dependencies = [ "audio_thread_priority", "audioipc2", diff --git a/third_party/rust/audioipc2-client/.cargo-checksum.json b/third_party/rust/audioipc2-client/.cargo-checksum.json index 85789ad095507204218b9b896e19ed1fe08739f9..8a67a294e99949ecb094501c2c47386cc87a2eb1 100644 --- a/third_party/rust/audioipc2-client/.cargo-checksum.json +++ b/third_party/rust/audioipc2-client/.cargo-checksum.json @@ -1 +1 @@ -{"files":{"Cargo.toml":"4304d6695b5ae83a67ec50eeb7c8b05f0292b31bf6d93bfec629492116cb05fd","cbindgen.toml":"fb6abe1671497f432a06e40b1db7ed7cd2cceecbd9a2382193ad7534e8855e34","src/context.rs":"060680f87d406cef1f6f94cfae991ae59b5f3a13253e15481716a1a4224682ee","src/lib.rs":"c4a6797734489280f6b97dd72c9e51a7bd7be4104592eece3929e29d45cbca4a","src/send_recv.rs":"450bdb1d8a346634c0237f2081b424d11e2c19ad81670009303f8a03b3bfb196","src/stream.rs":"8f2f33b75b78fb1ef8ebb7b14bbb81dbb0f046f9e91b6110a3c49e424690bb8b"},"package":null} \ No newline at end of file +{"files":{"Cargo.toml":"4304d6695b5ae83a67ec50eeb7c8b05f0292b31bf6d93bfec629492116cb05fd","cbindgen.toml":"fb6abe1671497f432a06e40b1db7ed7cd2cceecbd9a2382193ad7534e8855e34","src/context.rs":"060680f87d406cef1f6f94cfae991ae59b5f3a13253e15481716a1a4224682ee","src/lib.rs":"c4a6797734489280f6b97dd72c9e51a7bd7be4104592eece3929e29d45cbca4a","src/send_recv.rs":"450bdb1d8a346634c0237f2081b424d11e2c19ad81670009303f8a03b3bfb196","src/stream.rs":"a6c07796e6fe704cfa6baf8b904e7ffe874d3c884d44d4ed307e668dec25452b"},"package":null} \ No newline at end of file diff --git a/third_party/rust/audioipc2-client/src/stream.rs b/third_party/rust/audioipc2-client/src/stream.rs index 735c1327c1c8ba3ecd2a4d7162eac46f9308af94..a904f83f06a3b0faeb2f932d19fb6a3dadadcd37 100644 --- a/third_party/rust/audioipc2-client/src/stream.rs +++ b/third_party/rust/audioipc2-client/src/stream.rs @@ -46,16 +46,8 @@ pub struct ClientStream<'ctx> { shutdown_rx: mpsc::Receiver<()>, } -#[derive(Copy, Clone, Debug, PartialEq)] -enum StreamDirection { - Input, - Output, - Duplex, -} - struct CallbackServer { - dir: StreamDirection, - shm: Option<SharedMem>, + shm: SharedMem, duplex_input: Option<Vec<u8>>, data_cb: ffi::cubeb_data_callback, state_cb: ffi::cubeb_state_callback, @@ -86,67 +78,30 @@ impl rpccore::Server for CallbackServer { let input_nbytes = nframes as usize * input_frame_size; let output_nbytes = nframes as usize * output_frame_size; - // Clone values that need to be moved into the cpu pool thread. - let mut shm = unsafe { self.shm.as_ref().unwrap().unsafe_view() }; - - let duplex_copy_ptr = match &mut self.duplex_input { - Some(buf) => { - assert_eq!(self.dir, StreamDirection::Duplex); - assert!(input_frame_size > 0); - assert!(buf.capacity() >= input_nbytes); - buf.as_mut_ptr() - } - None => ptr::null_mut(), - } as usize; - let user_ptr = self.user_ptr; - let cb = self.data_cb.unwrap(); - let dir = self.dir; - // Input and output reuse the same shmem backing. Unfortunately, cubeb's data_callback isn't // specified in such a way that would require the callee to consume all of the input before // writing to the output (i.e., it is passed as two pointers that aren't expected to alias). // That means we need to copy the input here. - let (input_ptr, output_ptr) = match dir { - StreamDirection::Duplex => unsafe { - assert!(input_frame_size > 0); - assert!(output_frame_size > 0); - assert_ne!(duplex_copy_ptr, 0); - let input = shm.get_slice(input_nbytes).unwrap(); - ptr::copy_nonoverlapping( - input.as_ptr(), - duplex_copy_ptr as *mut _, - input.len(), - ); - ( - duplex_copy_ptr as _, - shm.get_mut_slice(output_nbytes).unwrap().as_mut_ptr(), - ) - }, - StreamDirection::Input => unsafe { - assert!(input_frame_size > 0); - assert_eq!(output_frame_size, 0); - ( - shm.get_slice(input_nbytes).unwrap().as_ptr(), - ptr::null_mut(), - ) - }, - StreamDirection::Output => unsafe { - assert!(output_frame_size > 0); - assert_eq!(input_frame_size, 0); - ( - ptr::null(), - shm.get_mut_slice(output_nbytes).unwrap().as_mut_ptr(), - ) - }, - }; + if let Some(buf) = &mut self.duplex_input { + assert!(input_nbytes > 0); + assert!(buf.capacity() >= input_nbytes); + unsafe { + let input = self.shm.get_slice(input_nbytes).unwrap(); + ptr::copy_nonoverlapping(input.as_ptr(), buf.as_mut_ptr(), input.len()); + } + } run_in_callback(|| { let nframes = unsafe { - cb( + self.data_cb.unwrap()( ptr::null_mut(), // https://github.com/kinetiknz/cubeb/issues/518 - user_ptr as *mut c_void, - input_ptr as *const _, - output_ptr as *mut _, + self.user_ptr as *mut c_void, + if let Some(buf) = &mut self.duplex_input { + buf.as_mut_ptr() + } else { + self.shm.get_slice(input_nbytes).unwrap().as_ptr() + } as *const _, + self.shm.get_mut_slice(output_nbytes).unwrap().as_mut_ptr() as *mut _, nframes as _, ) }; @@ -156,22 +111,18 @@ impl rpccore::Server for CallbackServer { } CallbackReq::State(state) => { trace!("stream_thread: State Callback: {:?}", state); - let user_ptr = self.user_ptr; - let cb = self.state_cb.unwrap(); run_in_callback(|| unsafe { - cb(ptr::null_mut(), user_ptr as *mut _, state); + self.state_cb.unwrap()(ptr::null_mut(), self.user_ptr as *mut _, state); }); CallbackResp::State } CallbackReq::DeviceChange => { - let cb = self.device_change_cb.clone(); - let user_ptr = self.user_ptr; run_in_callback(|| { - let cb = cb.lock().unwrap(); - if let Some(cb) = *cb { + let cb = *self.device_change_cb.lock().unwrap(); + if let Some(cb) = cb { unsafe { - cb(user_ptr as *mut _); + cb(self.user_ptr as *mut _); } } else { warn!("DeviceChange received with null callback"); @@ -180,35 +131,6 @@ impl rpccore::Server for CallbackServer { CallbackResp::DeviceChange } - CallbackReq::SharedMem(mut handle, shm_area_size) => { - self.shm = match unsafe { SharedMem::from(handle.take_handle(), shm_area_size) } { - Ok(shm) => Some(shm), - Err(e) => { - warn!( - "sharedmem client mapping failed (size={}, err={:?})", - shm_area_size, e - ); - return CallbackResp::Error(ffi::CUBEB_ERROR); - } - }; - - self.duplex_input = if let StreamDirection::Duplex = self.dir { - let mut duplex_input = Vec::new(); - match duplex_input.try_reserve_exact(shm_area_size) { - Ok(()) => Some(duplex_input), - Err(e) => { - warn!( - "duplex_input allocation failed (size={}, err={:?})", - shm_area_size, e - ); - return CallbackResp::Error(ffi::CUBEB_ERROR); - } - } - } else { - None - }; - CallbackResp::SharedMem - } } } } @@ -231,11 +153,44 @@ impl<'ctx> ClientStream<'ctx> { let mut data = send_recv!(rpc, StreamCreate(create_params) => StreamCreated())?; debug!( - "token = {}, handle = {:?}", - data.token, data.platform_handle + "token = {}, handle = {:?} area_size = {:?}", + data.token, data.shm_handle, data.shm_area_size ); - let stream = unsafe { sys::Pipe::from_raw_handle(data.platform_handle.take_handle()) }; + let shm = + match unsafe { SharedMem::from(data.shm_handle.take_handle(), data.shm_area_size) } { + Ok(shm) => shm, + Err(e) => { + warn!( + "SharedMem client mapping failed (size={}, err={:?})", + data.shm_area_size, e + ); + return Err(Error::default()); + } + }; + + let duplex_input = if let (Some(_), Some(_)) = ( + init_params.input_stream_params, + init_params.output_stream_params, + ) { + let mut duplex_input = Vec::new(); + match duplex_input.try_reserve_exact(data.shm_area_size) { + Ok(()) => Some(duplex_input), + Err(e) => { + warn!( + "duplex_input allocation failed (size={}, err={:?})", + data.shm_area_size, e + ); + return Err(Error::default()); + } + } + } else { + None + }; + + let mut stream = + send_recv!(rpc, StreamInit(data.token, init_params) => StreamInitialized())?; + let stream = unsafe { sys::Pipe::from_raw_handle(stream.take_handle()) }; let user_data = user_ptr as usize; @@ -244,20 +199,9 @@ impl<'ctx> ClientStream<'ctx> { let (_shutdown_tx, shutdown_rx) = mpsc::channel(); - let dir = match ( - init_params.input_stream_params, - init_params.output_stream_params, - ) { - (Some(_), Some(_)) => StreamDirection::Duplex, - (Some(_), None) => StreamDirection::Input, - (None, Some(_)) => StreamDirection::Output, - (None, None) => unreachable!(), - }; - let server = CallbackServer { - dir, - shm: None, - duplex_input: None, + shm, + duplex_input, data_cb: data_callback, state_cb: state_callback, user_ptr: user_data, @@ -269,8 +213,6 @@ impl<'ctx> ClientStream<'ctx> { .bind_server(server, stream) .map_err(|_| Error::default())?; - send_recv!(rpc, StreamInit(data.token, init_params) => StreamInitialized)?; - let stream = Box::into_raw(Box::new(ClientStream { context: ctx, user_ptr, diff --git a/third_party/rust/audioipc2-server/.cargo-checksum.json b/third_party/rust/audioipc2-server/.cargo-checksum.json index 69f9859903eaf4edeef56740ed4a05c1a07c40a7..b1373cc095bb9032f3030086241b7037d572434c 100644 --- a/third_party/rust/audioipc2-server/.cargo-checksum.json +++ b/third_party/rust/audioipc2-server/.cargo-checksum.json @@ -1 +1 @@ -{"files":{"Cargo.toml":"d06fa544ba9c0f2ff3ab418b63764b3e395bf326ffef129d6952ae1ae92213f1","cbindgen.toml":"fb6abe1671497f432a06e40b1db7ed7cd2cceecbd9a2382193ad7534e8855e34","src/lib.rs":"06aff4fd1326aeabb16b01f81a6f3c59c1717ebe96285a063724830cdf30303a","src/server.rs":"c745bc7bbc1207c1cc642b8da95ff72e6e56d6ed0a3f56e755f27e9b79871e38"},"package":null} \ No newline at end of file +{"files":{"Cargo.toml":"d06fa544ba9c0f2ff3ab418b63764b3e395bf326ffef129d6952ae1ae92213f1","cbindgen.toml":"fb6abe1671497f432a06e40b1db7ed7cd2cceecbd9a2382193ad7534e8855e34","src/lib.rs":"06aff4fd1326aeabb16b01f81a6f3c59c1717ebe96285a063724830cdf30303a","src/server.rs":"0e33d5f4b37c07f627f2998616fc1c740267fd711daf66ec2a460b489a8c2b8a"},"package":null} \ No newline at end of file diff --git a/third_party/rust/audioipc2-server/src/server.rs b/third_party/rust/audioipc2-server/src/server.rs index 06e83e53b46f7a42bca09b725dc6fd4ab8b90c08..aa7af7e82561b9f8cb5c2d8db91a9eccd5a65860 100644 --- a/third_party/rust/audioipc2-server/src/server.rs +++ b/third_party/rust/audioipc2-server/src/server.rs @@ -12,7 +12,7 @@ use audioipc::messages::{ StreamInitParams, StreamParams, }; use audioipc::shm::SharedMem; -use audioipc::{ipccore, rpccore, sys}; +use audioipc::{ipccore, rpccore, sys, PlatformHandle}; use cubeb_core as cubeb; use cubeb_core::ffi; use std::convert::From; @@ -47,14 +47,15 @@ impl CubebDeviceCollectionManager { server: &Rc<RefCell<DeviceCollectionChangeCallback>>, devtype: cubeb::DeviceType, ) -> cubeb::Result<()> { - let mut servers = self.servers.lock().unwrap(); - if servers.is_empty() { - self.internal_register(context, true)?; - } server.borrow_mut().devtype.insert(devtype); + let mut servers = self.servers.lock().unwrap(); + let do_register = servers.is_empty(); if !servers.iter().any(|s| Rc::ptr_eq(s, server)) { servers.push(server.clone()); } + if do_register { + self.internal_register(context, true)?; + } Ok(()) } @@ -64,9 +65,12 @@ impl CubebDeviceCollectionManager { server: &Rc<RefCell<DeviceCollectionChangeCallback>>, devtype: cubeb::DeviceType, ) -> cubeb::Result<()> { + let do_remove = { + server.borrow_mut().devtype.remove(devtype); + server.borrow().devtype.is_empty() + }; let mut servers = self.servers.lock().unwrap(); - server.borrow_mut().devtype.remove(devtype); - if server.borrow().devtype.is_empty() { + if do_remove { servers.retain(|s| !Rc::ptr_eq(s, server)); } if servers.is_empty() { @@ -75,6 +79,20 @@ impl CubebDeviceCollectionManager { Ok(()) } + fn unregister_server( + &mut self, + context: &cubeb::Context, + server: &Rc<RefCell<DeviceCollectionChangeCallback>>, + ) -> cubeb::Result<()> { + server.borrow_mut().devtype = cubeb::DeviceType::UNKNOWN; + let mut servers = self.servers.lock().unwrap(); + servers.retain(|s| !Rc::ptr_eq(s, server)); + if servers.is_empty() { + self.internal_register(context, false)?; + } + Ok(()) + } + fn internal_register(&self, context: &cubeb::Context, enable: bool) -> cubeb::Result<()> { let user_ptr = if enable { self as *const CubebDeviceCollectionManager as *mut c_void @@ -119,6 +137,12 @@ impl CubebDeviceCollectionManager { } } +impl Drop for CubebDeviceCollectionManager { + fn drop(&mut self) { + assert!(self.servers.lock().unwrap().is_empty()); + } +} + struct DevIdMap { devices: Vec<usize>, } @@ -157,8 +181,9 @@ impl DevIdMap { } struct CubebContextState { - context: cubeb::Result<cubeb::Context>, + // `manager` must be dropped before the `context` is destroyed. manager: CubebDeviceCollectionManager, + context: cubeb::Result<cubeb::Context>, } thread_local!(static CONTEXT_KEY: RefCell<Option<CubebContextState>> = RefCell::new(None)); @@ -182,11 +207,11 @@ where let mut state = k.borrow_mut(); if state.is_none() { *state = Some(CubebContextState { - context: cubeb_init_from_context_params(), manager: CubebDeviceCollectionManager::new(), + context: cubeb_init_from_context_params(), }); } - let CubebContextState { context, manager } = state.as_mut().unwrap(); + let CubebContextState { manager, context } = state.as_mut().unwrap(); // Always reattempt to initialize cubeb, OS config may have changed. if context.is_err() { *context = cubeb_init_from_context_params(); @@ -216,8 +241,12 @@ struct ServerStreamCallbacks { output_frame_size: u16, /// Shared memory buffer for transporting audio data to/from client shm: SharedMem, - /// RPC interface to callback server running in client - rpc: rpccore::Proxy<CallbackReq, CallbackResp>, + /// RPC interface for data_callback (on OS audio thread) to server callback thread + data_callback_rpc: rpccore::Proxy<CallbackReq, CallbackResp>, + /// RPC interface for state_callback (on any thread) to server callback thread + state_callback_rpc: rpccore::Proxy<CallbackReq, CallbackResp>, + /// RPC interface for device_change_callback (on any thread) to server callback thread + device_change_callback_rpc: rpccore::Proxy<CallbackReq, CallbackResp>, } impl ServerStreamCallbacks { @@ -239,7 +268,7 @@ impl ServerStreamCallbacks { } let r = self - .rpc + .data_callback_rpc .call(CallbackReq::Data { nframes, input_frame_size: self.input_frame_size as usize, @@ -272,7 +301,10 @@ impl ServerStreamCallbacks { fn state_callback(&mut self, state: cubeb::State) { trace!("Stream state callback: {:?}", state); - let r = self.rpc.call(CallbackReq::State(state.into())).wait(); + let r = self + .state_callback_rpc + .call(CallbackReq::State(state.into())) + .wait(); match r { Ok(CallbackResp::State) => {} _ => { @@ -283,7 +315,10 @@ impl ServerStreamCallbacks { fn device_change_callback(&mut self) { trace!("Stream device change callback"); - let r = self.rpc.call(CallbackReq::DeviceChange).wait(); + let r = self + .device_change_callback_rpc + .call(CallbackReq::DeviceChange) + .wait(); match r { Ok(CallbackResp::DeviceChange) => {} _ => { @@ -310,7 +345,7 @@ fn get_shm_id() -> String { struct ServerStream { stream: Option<cubeb::Stream>, cbs: Box<ServerStreamCallbacks>, - shm_setup: Option<rpccore::ProxyResponse<CallbackResp>>, + client_pipe: Option<PlatformHandle>, } impl Drop for ServerStream { @@ -349,6 +384,27 @@ pub struct CubebServer { shm_area_size: usize, } +impl Drop for CubebServer { + fn drop(&mut self) { + if let Some(device_collection_change_callbacks) = &self.device_collection_change_callbacks { + debug!("CubebServer: dropped with device_collection_change_callbacks registered"); + CONTEXT_KEY.with(|k| { + let mut state = k.borrow_mut(); + if let Some(CubebContextState { + manager, + context: Ok(context), + }) = state.as_mut() + { + let r = manager.unregister_server(context, device_collection_change_callbacks); + if r.is_err() { + debug!("CubebServer: unregister_server failed: {:?}", r); + } + } + }) + } + } +} + #[allow(unknown_lints)] // non_send_fields_in_send_ty is Nightly-only as of 2021-11-29. #[allow(clippy::non_send_fields_in_send_ty)] // XXX: required for server setup, verify this is safe. @@ -688,17 +744,13 @@ impl CubebServer { .callback_thread .bind_client::<CallbackClient>(server_pipe)?; - // Send shm configuration to client but don't wait for result; that'll be checked in process_stream_init. - let shm_setup = Some(rpc.call(CallbackReq::SharedMem( - SerializableHandle::new(shm_handle, self.remote_pid.unwrap()), - shm_area_size, - ))); - let cbs = Box::new(ServerStreamCallbacks { input_frame_size, output_frame_size, shm, - rpc, + state_callback_rpc: rpc.clone(), + device_change_callback_rpc: rpc.clone(), + data_callback_rpc: rpc, }); let entry = self.streams.vacant_entry(); @@ -707,13 +759,14 @@ impl CubebServer { entry.insert(ServerStream { stream: None, - shm_setup, cbs, + client_pipe: Some(client_pipe), }); Ok(ClientMessage::StreamCreated(StreamCreate { token: key, - platform_handle: SerializableHandle::new(client_pipe, self.remote_pid.unwrap()), + shm_handle: SerializableHandle::new(shm_handle, self.remote_pid.unwrap()), + shm_area_size, })) } @@ -748,38 +801,6 @@ impl CubebServer { assert!(size_of::<Box<ServerStreamCallbacks>>() == size_of::<usize>()); let user_ptr = server_stream.cbs.as_ref() as *const ServerStreamCallbacks as *mut c_void; - // SharedMem setup message should've been processed by client by now. - let shm_setup = server_stream - .shm_setup - .take() - .expect("invalid shm_setup state"); - match shm_setup.wait() { - Ok(CallbackResp::SharedMem) => {} - Ok(CallbackResp::Error(e)) => { - // If the client replied with an error (e.g. client OOM), log error and fail stream init. - debug!( - "Shmem setup for stream {:?} failed (raw error {:?})", - stm_tok, e - ); - return Ok(ClientMessage::Error(e)); - } - Ok(r) => { - debug!( - "Shmem setup for stream {:?} failed (unexpected response {:?})", - stm_tok, r - ); - return Ok(error(cubeb::Error::error())); - } - Err(e) => { - // If the client errored before responding, log error and fail stream init. - debug!( - "Shmem setup for stream {:?} failed (error {:?})", - stm_tok, e - ); - return Err(e.into()); - } - } - let stream = unsafe { let stream = context.stream_init( stream_name, @@ -804,7 +825,14 @@ impl CubebServer { server_stream.stream = Some(stream); - Ok(ClientMessage::StreamInitialized) + let client_pipe = server_stream + .client_pipe + .take() + .expect("invalid state after StreamCreated"); + Ok(ClientMessage::StreamInitialized(SerializableHandle::new( + client_pipe, + self.remote_pid.unwrap(), + ))) } } diff --git a/third_party/rust/audioipc2/.cargo-checksum.json b/third_party/rust/audioipc2/.cargo-checksum.json index ea527eaf9c4d31a87d71257ba4c4f983a55a81ec..56ccc7cbd7de16335dd083d227fcc688af7fa28f 100644 --- a/third_party/rust/audioipc2/.cargo-checksum.json +++ b/third_party/rust/audioipc2/.cargo-checksum.json @@ -1 +1 @@ -{"files":{"Cargo.toml":"d12aaf7e88bb704890b9401de55775c9e746be92448c6f280c1cb8e37e2e773f","benches/serialization.rs":"d56855d868dab6aa22c8b03a61084535351b76c94b68d8b1d20764e352fe473f","build.rs":"3f061cf9a989f63a71c693a543d26f7003e8b643c39c23ea555110252a2c39d2","src/cmsghdr.c":"d7344b3dc15cdce410c68669b848bb81f7fe36362cd3699668cb613fa05180f8","src/codec.rs":"58351c01b0414ec15f29d7dab4693508acfd4d7ca9df575e0eafffe4fe621e8e","src/errors.rs":"67a4a994d0724397657581cde153bdfc05ce86e7efc467f23fafc8f64df80fa4","src/ipccore.rs":"11030d7a163c35ceb1b798552805a8982c33da24862a95a98bfc2633bbbd8c17","src/lib.rs":"84d4b3db37309ca6dd735a59270a787049028d048458af514ef9b3aaf6a2dd58","src/messages.rs":"d78247a1c2d5de1d799df100baaeebf6e1b6daba95ef6058fd90387f82526f80","src/rpccore.rs":"e6800250049690cd900b66a010541affad4b30a6f4dea168ebe807043490f916","src/shm.rs":"748c4a261e298900ee0266acf42e4e9f72594d9b90f530c71d8d4f1d659a6de0","src/sys/mod.rs":"da4412ee630e53a0d3a79d9e18953280818bd58ed3fb3c6abedeeb8a092d3dfc","src/sys/unix/cmsg.rs":"71a51a5cd6fd54054f500af82a76b19a690258767ac7b68c6c35cc9d962d5629","src/sys/unix/mod.rs":"246e83874d9765f6db88ea8bc3511b8d1ce4557744f4415ad41de135f6b9baff","src/sys/unix/msg.rs":"d29d3974c145df8b1b931222f62aa64be0ec165b578f31b8f98555fa4d052b01","src/sys/windows/mod.rs":"50af90f17d9b61045ac009e0f53077f9a645c72c214c400b116c4eca2adce0d7"},"package":null} \ No newline at end of file +{"files":{"Cargo.toml":"569e9cc28a92fa1e2080b9433eb4cd77ae4296cc8eb93a390278419c45c367fb","benches/serialization.rs":"d56855d868dab6aa22c8b03a61084535351b76c94b68d8b1d20764e352fe473f","build.rs":"3f061cf9a989f63a71c693a543d26f7003e8b643c39c23ea555110252a2c39d2","src/cmsghdr.c":"d7344b3dc15cdce410c68669b848bb81f7fe36362cd3699668cb613fa05180f8","src/codec.rs":"58351c01b0414ec15f29d7dab4693508acfd4d7ca9df575e0eafffe4fe621e8e","src/errors.rs":"67a4a994d0724397657581cde153bdfc05ce86e7efc467f23fafc8f64df80fa4","src/ipccore.rs":"74e91855ebfae112ecf93fd591987d7a9a7a36a7e19b364960e34455016e892d","src/lib.rs":"84d4b3db37309ca6dd735a59270a787049028d048458af514ef9b3aaf6a2dd58","src/messages.rs":"d5db81981851fec20c6b9ff86a97b360b6e8c4fba2106f5afa286cbada303a72","src/rpccore.rs":"9fa24cb6d487b436382e35f82d0809ad2b315ce049ebaa767b4f88d3d5637f2e","src/shm.rs":"0036b686b4b53171a30e46b7c44335f89db13d37fa2ef24af1ee5a8c920e79e6","src/sys/mod.rs":"da4412ee630e53a0d3a79d9e18953280818bd58ed3fb3c6abedeeb8a092d3dfc","src/sys/unix/cmsg.rs":"e10e26cdfa92035ccb300dc4f2aef05eb1935833045cffb6b1107acc55889c8e","src/sys/unix/mod.rs":"3a2807c7b87ab5230d73fafd2f6417f6647e6d8ffdad7965d1d71bf511da0bcc","src/sys/unix/msg.rs":"d29d3974c145df8b1b931222f62aa64be0ec165b578f31b8f98555fa4d052b01","src/sys/windows/mod.rs":"50af90f17d9b61045ac009e0f53077f9a645c72c214c400b116c4eca2adce0d7"},"package":null} \ No newline at end of file diff --git a/third_party/rust/audioipc2/Cargo.toml b/third_party/rust/audioipc2/Cargo.toml index 1f666a2660752cfc623c02024a9fe47d04636c50..dbbdc67feb18f140464a439fe9a2738e1b7676f8 100644 --- a/third_party/rust/audioipc2/Cargo.toml +++ b/third_party/rust/audioipc2/Cargo.toml @@ -25,6 +25,7 @@ scopeguard = "1.1.0" iovec = "0.1" libc = "0.2" memmap2 = "0.2" +arrayvec = "0.7" [target.'cfg(target_os = "linux")'.dependencies] audio_thread_priority = "0.23.4" diff --git a/third_party/rust/audioipc2/src/ipccore.rs b/third_party/rust/audioipc2/src/ipccore.rs index 8813101294ffa0897b84ad92299ffb97a3966bbb..4d9699392f5be2b89bd03bd41a7cb4d33ee916f2 100644 --- a/third_party/rust/audioipc2/src/ipccore.rs +++ b/third_party/rust/audioipc2/src/ipccore.rs @@ -25,6 +25,8 @@ use std::fmt::Debug; use crate::duplicate_platform_handle; #[cfg(unix)] use crate::sys::cmsg; +#[cfg(unix)] +use crate::PlatformHandle; const WAKE_TOKEN: Token = Token(!0); @@ -201,7 +203,14 @@ impl EventLoop { // Each step may call `handle_event` on any registered connection that // has received readiness events from the poll wakeup. fn poll(&mut self) -> Result<bool> { - self.poll.poll(&mut self.events, None)?; + loop { + let r = self.poll.poll(&mut self.events, None); + match r { + Ok(()) => break, + Err(ref e) if interrupted(e) => continue, + Err(e) => return Err(e), + } + } for event in self.events.iter() { match event.token() { @@ -600,7 +609,15 @@ where #[allow(unused_mut)] while let Some(mut item) = self.codec.decode(&mut inbound.buf)? { #[cfg(unix)] - item.receive_owned_message_handle(|| cmsg::decode_handle(&mut inbound.cmsg)); + item.receive_owned_message_handle(|| { + if let Some(handle) = self.extra_handle.take() { + unsafe { handle.into_raw() } + } else { + let handles = cmsg::decode_handles(&mut inbound.cmsg); + self.extra_handle = handles.get(1).map(|h| PlatformHandle::new(*h)); + handles[0] + } + }); self.handler.consume(item)?; } @@ -636,6 +653,8 @@ where struct FramedDriver<T: Handler> { codec: LengthDelimitedCodec<T::Out, T::In>, handler: T, + #[cfg(unix)] + extra_handle: Option<PlatformHandle>, } impl<T: Handler> FramedDriver<T> { @@ -643,6 +662,8 @@ impl<T: Handler> FramedDriver<T> { FramedDriver { codec: Default::default(), handler, + #[cfg(unix)] + extra_handle: None, } } } @@ -673,15 +694,27 @@ impl EventLoopThread { .stack_size(stack_size.unwrap_or(64 * 4096)); let thread = builder.spawn(move || { + trace!("{}: event loop thread enter", event_loop.name); after_start(); let _thread_exit_guard = scopeguard::guard((), |_| before_stop()); - while event_loop.poll()? { - trace!("{}: event loop poll", event_loop.name); - } + let r = loop { + let start = std::time::Instant::now(); + let r = event_loop.poll(); + trace!( + "{}: event loop poll r={:?}, took={}μs", + event_loop.name, + r, + start.elapsed().as_micros() + ); + match r { + Ok(true) => continue, + _ => break r, + } + }; - trace!("{}: event loop shutdown", event_loop.name); - Ok(()) + trace!("{}: event loop thread exit", event_loop.name); + r.map(|_| ()) })?; Ok(EventLoopThread { diff --git a/third_party/rust/audioipc2/src/messages.rs b/third_party/rust/audioipc2/src/messages.rs index 1918a74c2efa01d0f4a2d5b835e5288967cc277b..a725a482a9a0cd035a2c156e0ae42e3f77623705 100644 --- a/third_party/rust/audioipc2/src/messages.rs +++ b/third_party/rust/audioipc2/src/messages.rs @@ -188,7 +188,8 @@ fn opt_str(v: Option<Vec<u8>>) -> *mut c_char { #[derive(Debug, Serialize, Deserialize)] pub struct StreamCreate { pub token: usize, - pub platform_handle: SerializableHandle, + pub shm_handle: SerializableHandle, + pub shm_area_size: usize, } #[derive(Debug, Serialize, Deserialize)] @@ -246,7 +247,7 @@ pub enum ClientMessage { ContextRegisteredDeviceCollectionChanged, StreamCreated(StreamCreate), - StreamInitialized, + StreamInitialized(SerializableHandle), StreamDestroyed, StreamStarted, @@ -274,7 +275,6 @@ pub enum CallbackReq { }, State(ffi::cubeb_state), DeviceChange, - SharedMem(SerializableHandle, usize), } #[derive(Debug, Deserialize, Serialize)] @@ -282,7 +282,6 @@ pub enum CallbackResp { Data(isize), State, DeviceChange, - SharedMem, Error(c_int), } @@ -425,6 +424,7 @@ pub trait AssociateHandleForMessage { // Update the item's handle with the received value, making it a valid owned handle. // Called on the receiving side after deserialization. + // Implementations must only call `F` for message types expecting a handle. #[cfg(unix)] fn receive_owned_message_handle<F>(&mut self, _: F) where @@ -443,13 +443,22 @@ impl AssociateHandleForMessage for ClientMessage { unsafe { match *self { ClientMessage::StreamCreated(ref mut data) => { - let handle = data.platform_handle.take_handle_for_send(); - data.platform_handle = + let handle = data.shm_handle.take_handle_for_send(); + data.shm_handle = SerializableHandle::new_serializable_value(f(handle.0, handle.1)?); trace!( "StreamCreated handle: {:?} remote_handle: {:?}", handle, - data.platform_handle + data.shm_handle + ); + } + ClientMessage::StreamInitialized(ref mut data) => { + let handle = data.take_handle_for_send(); + *data = SerializableHandle::new_serializable_value(f(handle.0, handle.1)?); + trace!( + "StreamInitialized handle: {:?} remote_handle: {:?}", + handle, + data ); } ClientMessage::ContextSetupDeviceCollectionCallback(ref mut data) => { @@ -475,7 +484,10 @@ impl AssociateHandleForMessage for ClientMessage { { match *self { ClientMessage::StreamCreated(ref mut data) => { - data.platform_handle = SerializableHandle::new_owned(f()); + data.shm_handle = SerializableHandle::new_owned(f()); + } + ClientMessage::StreamInitialized(ref mut data) => { + *data = SerializableHandle::new_owned(f()); } ClientMessage::ContextSetupDeviceCollectionCallback(ref mut data) => { data.platform_handle = SerializableHandle::new_owned(f()); @@ -488,32 +500,7 @@ impl AssociateHandleForMessage for ClientMessage { impl AssociateHandleForMessage for DeviceCollectionReq {} impl AssociateHandleForMessage for DeviceCollectionResp {} -impl AssociateHandleForMessage for CallbackReq { - fn prepare_send_message_handle<F>(&mut self, f: F) -> io::Result<()> - where - F: FnOnce(PlatformHandleType, u32) -> io::Result<PlatformHandleType>, - { - unsafe { - if let CallbackReq::SharedMem(ref mut data, _) = *self { - let handle = data.take_handle_for_send(); - *data = SerializableHandle::new_serializable_value(f(handle.0, handle.1)?); - trace!("SharedMem handle: {:?} remote_handle: {:?}", handle, data); - } - } - Ok(()) - } - - #[cfg(unix)] - fn receive_owned_message_handle<F>(&mut self, f: F) - where - F: FnOnce() -> PlatformHandleType, - { - if let CallbackReq::SharedMem(ref mut data, _) = *self { - *data = SerializableHandle::new_owned(f()); - } - } -} - +impl AssociateHandleForMessage for CallbackReq {} impl AssociateHandleForMessage for CallbackResp {} #[cfg(test)] diff --git a/third_party/rust/audioipc2/src/rpccore.rs b/third_party/rust/audioipc2/src/rpccore.rs index b87453351d9c92a44e83a787ecd563faa70900eb..f208ba3b1f7d696262c3c97ad56d4ce81c22da87 100644 --- a/third_party/rust/audioipc2/src/rpccore.rs +++ b/third_party/rust/audioipc2/src/rpccore.rs @@ -113,7 +113,9 @@ impl<Request, Response> Drop for Proxy<Request, Response> { // Must drop Sender before waking the connection, otherwise // the wake may be processed before Sender is closed. unsafe { ManuallyDrop::drop(&mut self.tx) } - self.wake_connection(); + if self.handle.is_some() { + self.wake_connection() + } } } @@ -178,12 +180,6 @@ pub(crate) fn make_client<C: Client>( in_flight: VecDeque::with_capacity(32), }; - // Sender is Send, but !Sync, so it's not safe to move between threads - // without cloning it first. Force a clone here, since we use Proxy in - // native code and it's possible to move it between threads without Rust's - // type system noticing. - #[allow(clippy::redundant_clone)] - let tx = tx.clone(); let proxy = Proxy { handle: None, tx: ManuallyDrop::new(tx), diff --git a/third_party/rust/audioipc2/src/shm.rs b/third_party/rust/audioipc2/src/shm.rs index 2be38823353f1629545de463616682ff3f1df560..e5216a234356099c7830b3ac6c08b60c981d20bb 100644 --- a/third_party/rust/audioipc2/src/shm.rs +++ b/third_party/rust/audioipc2/src/shm.rs @@ -15,7 +15,7 @@ pub use unix::SharedMem; pub use windows::SharedMem; #[derive(Copy, Clone)] -pub struct SharedMemView { +struct SharedMemView { ptr: *mut c_void, size: usize, } @@ -228,10 +228,6 @@ mod unix { }) } - pub unsafe fn unsafe_view(&self) -> SharedMemView { - self.view - } - pub unsafe fn get_slice(&self, size: usize) -> Result<&[u8]> { self.view.get_slice(size) } @@ -319,10 +315,6 @@ mod windows { }) } - pub unsafe fn unsafe_view(&self) -> SharedMemView { - self.view - } - pub unsafe fn get_slice(&self, size: usize) -> Result<&[u8]> { self.view.get_slice(size) } diff --git a/third_party/rust/audioipc2/src/sys/unix/cmsg.rs b/third_party/rust/audioipc2/src/sys/unix/cmsg.rs index 1210b8febd8046e633a38a14e08aa5572f4bb1bc..bc26191f76d1362a00e3e0a016b49a98a3624e24 100644 --- a/third_party/rust/audioipc2/src/sys/unix/cmsg.rs +++ b/third_party/rust/audioipc2/src/sys/unix/cmsg.rs @@ -43,14 +43,19 @@ pub fn encode_handle(cmsg: &mut BytesMut, handle: RawFd) { } } -// Decode one cmsghdr containing a handle, and adjust the `cmsg` buffer cursor past -// the decoded handle. -pub fn decode_handle(cmsg: &mut BytesMut) -> RawFd { +// Decode one cmsghdr containing handle(s), and adjust the `cmsg` buffer cursor past +// the decoded handle(s). +// Note: ideally this would be a single handle, but due to buffering multiple +// sendmsgs can coalesce into a single recvmsg. On some (64-bit) systems, the +// minimum alignment of the cmsghdr buffer provides capacity for 2 handles, so +// this code must expect 1 or 2 handles per decode call. +pub fn decode_handles(cmsg: &mut BytesMut) -> arrayvec::ArrayVec<RawFd, 2> { + let mut fds = arrayvec::ArrayVec::<RawFd, 2>::new(); let b = cmsg.split_to(space(size_of::<i32>())).freeze(); - // TODO: Clean this up to only expect a single fd per message. let fd = iterator(b).next().unwrap(); - assert_eq!(fd.len(), 1); - fd[0] + assert!(fd.len() == 1 || fd.len() == 2); + fds.try_extend_from_slice(&fd).unwrap(); + fds } fn iterator(c: Bytes) -> ControlMsgIter { diff --git a/third_party/rust/audioipc2/src/sys/unix/mod.rs b/third_party/rust/audioipc2/src/sys/unix/mod.rs index 67f89abcb5f4f9a63f677d8e9a9b83840a5ee927..6e86aaafe6d160a4981b3c91e3c89302e98d0916 100644 --- a/third_party/rust/audioipc2/src/sys/unix/mod.rs +++ b/third_party/rust/audioipc2/src/sys/unix/mod.rs @@ -133,9 +133,11 @@ impl Drop for ConnectionBuffer { fn close_fds(cmsg: &mut BytesMut) { while !cmsg.is_empty() { - let fd = cmsg::decode_handle(cmsg); - unsafe { - close_platform_handle(fd); + let fds = cmsg::decode_handles(cmsg); + for fd in fds { + unsafe { + close_platform_handle(fd); + } } } assert!(cmsg.is_empty()); diff --git a/toolkit/library/rust/shared/Cargo.toml b/toolkit/library/rust/shared/Cargo.toml index 466347ec3b8e171a1cd2a4882fa2b579d35d4fd6..c4793b336908b5fe916df82069e06b77903ecb45 100644 --- a/toolkit/library/rust/shared/Cargo.toml +++ b/toolkit/library/rust/shared/Cargo.toml @@ -23,8 +23,8 @@ webrender_bindings = { path = "../../../../gfx/webrender_bindings" } cubeb-coreaudio = { git = "https://github.com/mozilla/cubeb-coreaudio-rs", rev = "dea28d0c7d9952bbf430701ee244f6a4349c98f1", optional = true } cubeb-pulse = { git = "https://github.com/mozilla/cubeb-pulse-rs", rev="f2456201dbfdc467b80f0ff6bbb1b8a6faf7df02", optional = true, features=["pulse-dlopen"] } cubeb-sys = { version = "0.9", optional = true, features=["gecko-in-tree"] } -audioipc2-client = { git = "https://github.com/kinetiknz/audioipc-2", rev = "8b5c7db0f0af54f25a2ee7f7aa55c2a0689c97fc", optional = true } # macos (v2) branch -audioipc2-server = { git = "https://github.com/kinetiknz/audioipc-2", rev = "8b5c7db0f0af54f25a2ee7f7aa55c2a0689c97fc", optional = true } # macos (v2) branch +audioipc2-client = { git = "https://github.com/kinetiknz/audioipc-2", rev = "8e3c4c1b1edbd19cfafc833c3254f12731af31c4", optional = true } # macos (v2) branch +audioipc2-server = { git = "https://github.com/kinetiknz/audioipc-2", rev = "8e3c4c1b1edbd19cfafc833c3254f12731af31c4", optional = true } # macos (v2) branch audioipc-client = { git = "https://github.com/mozilla/audioipc-2", rev = "515bb210a93f520642fd3a60f391652680b3e988", optional = true } audioipc-server = { git = "https://github.com/mozilla/audioipc-2", rev = "515bb210a93f520642fd3a60f391652680b3e988", optional = true } encoding_glue = { path = "../../../../intl/encoding_glue" }