diff --git a/src/dealer.rs b/src/dealer.rs index 89071a63..f9e03003 100644 --- a/src/dealer.rs +++ b/src/dealer.rs @@ -94,3 +94,94 @@ impl SocketSend for DealerSocket { } impl CaptureSocket for DealerSocket {} + +impl DealerSocket { + /// Splits the socket into separate send and recv halves, allowing concurrent + /// sending and receiving from independent async tasks. + /// + /// The underlying socket stays alive until both halves are dropped. + pub fn split(mut self) -> (DealerSendHalf, DealerRecvHalf) { + // Swap the real fields out before self drops. The dummy backend's + // shutdown() is a no-op on an empty peer map. + let backend = std::mem::replace( + &mut self.backend, + Arc::new(GenericSocketBackend::with_options( + None, + SocketType::DEALER, + SocketOptions::default(), + )), + ); + let fair_queue = std::mem::replace(&mut self.fair_queue, FairQueue::new(true)); + let binds = std::mem::take(&mut self.binds); + + let inner = Arc::new(DealerSocketInner { + backend, + _binds: binds, + }); + + ( + DealerSendHalf { + inner: inner.clone(), + }, + DealerRecvHalf { + _inner: inner, + fair_queue, + }, + ) + } +} + +struct DealerSocketInner { + backend: Arc, + _binds: HashMap, +} + +impl Drop for DealerSocketInner { + fn drop(&mut self) { + self.backend.shutdown(); + } +} + +/// The send half of a [`DealerSocket`] produced by [`DealerSocket::split`]. +pub struct DealerSendHalf { + inner: Arc, +} + +/// The recv half of a [`DealerSocket`] produced by [`DealerSocket::split`]. +pub struct DealerRecvHalf { + _inner: Arc, + fair_queue: FairQueue, +} + +#[async_trait] +impl SocketSend for DealerSendHalf { + async fn send(&mut self, message: ZmqMessage) -> ZmqResult<()> { + self.inner + .backend + .send_round_robin(Message::Message(message)) + .await?; + Ok(()) + } +} + +impl CaptureSocket for DealerSendHalf {} + +#[async_trait] +impl SocketRecv for DealerRecvHalf { + async fn recv(&mut self) -> ZmqResult { + loop { + match self.fair_queue.next().await { + Some((_peer_id, Ok(Message::Message(message)))) => { + return Ok(message); + } + Some((_peer_id, Ok(_))) => {} + Some((_peer_id, Err(e))) => { + return Err(e.into()); + } + None => { + return Err(ZmqError::NoMessage); + } + }; + } + } +} diff --git a/tests/dealer_split.rs b/tests/dealer_split.rs new file mode 100644 index 00000000..085eca46 --- /dev/null +++ b/tests/dealer_split.rs @@ -0,0 +1,71 @@ +#[cfg(test)] +mod test { + + use zeromq::__async_rt as async_rt; + use zeromq::prelude::*; + use zeromq::ZmqMessage; + + use std::error::Error; + use std::time::Duration; + + fn assert_send() {} + + #[test] + fn split_halves_are_send() { + assert_send::(); + assert_send::(); + } + + #[async_rt::test] + async fn test_dealer_split_concurrent_send_recv() -> Result<(), Box> { + pretty_env_logger::try_init().ok(); + + // Use DEALER-to-DEALER (compatible pair, no envelope framing needed) + let mut server = zeromq::DealerSocket::new(); + let endpoint = server.bind("tcp://localhost:0").await?; + + let mut client = zeromq::DealerSocket::new(); + client.connect(endpoint.to_string().as_str()).await?; + + // Give connection time to establish + async_rt::task::sleep(Duration::from_millis(100)).await; + + let (mut send_half, mut recv_half) = client.split(); + + let num_messages: u32 = 5; + + // Server echo loop + let server_task = async_rt::task::spawn(async move { + for _ in 0..num_messages { + let msg = server.recv().await.unwrap(); + server.send(msg).await.unwrap(); + } + }); + + // Sender in its own task + let send_task = async_rt::task::spawn(async move { + for i in 0..num_messages { + let msg = ZmqMessage::from(format!("msg-{}", i)); + send_half.send(msg).await.unwrap(); + } + }); + + // Receiver in its own task + let recv_task = async_rt::task::spawn(async move { + for _ in 0..num_messages { + let _reply = recv_half.recv().await.unwrap(); + } + }); + + let timeout = Duration::from_secs(5); + async_rt::task::timeout(timeout, async { + send_task.await.unwrap(); + recv_task.await.unwrap(); + server_task.await.unwrap(); + }) + .await + .expect("test timed out"); + + Ok(()) + } +}