From 846e753768aed2668d77abd1d249a54098dfff94 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 1 Apr 2026 13:58:35 +0800 Subject: [PATCH 1/2] impl `Clone` for `RouterSendHalf` Signed-off-by: Bugen Zhao --- src/router.rs | 1 + tests/router_split.rs | 86 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 87 insertions(+) diff --git a/src/router.rs b/src/router.rs index 41d2621..af359aa 100644 --- a/src/router.rs +++ b/src/router.rs @@ -158,6 +158,7 @@ impl Drop for RouterSocketInner { } /// The send half of a [`RouterSocket`] produced by [`RouterSocket::split`]. +#[derive(Clone)] pub struct RouterSendHalf { inner: Arc, } diff --git a/tests/router_split.rs b/tests/router_split.rs index 46b3b06..9854f8c 100644 --- a/tests/router_split.rs +++ b/tests/router_split.rs @@ -1,6 +1,7 @@ #[cfg(test)] mod test { + use bytes::Bytes; use zeromq::__async_rt as async_rt; use zeromq::prelude::*; use zeromq::ZmqMessage; @@ -9,6 +10,7 @@ mod test { use std::time::Duration; fn assert_send() {} + fn assert_clone() {} #[test] fn split_halves_are_send() { @@ -16,6 +18,11 @@ mod test { assert_send::(); } + #[test] + fn router_send_half_is_clone() { + assert_clone::(); + } + #[async_rt::test] async fn test_router_split_concurrent_send_recv() -> Result<(), Box> { pretty_env_logger::try_init().ok(); @@ -69,4 +76,83 @@ mod test { Ok(()) } + + #[async_rt::test] + async fn test_router_split_send_half_clone_concurrent_send() -> Result<(), Box> { + pretty_env_logger::try_init().ok(); + + fn message_for(identity: Bytes, payload: String) -> ZmqMessage { + let mut msg = ZmqMessage::from(payload); + msg.push_front(identity); + msg + } + + // ROUTER binds, DEALER connects + let mut router = zeromq::RouterSocket::new(); + let endpoint = router.bind("tcp://localhost:0").await?; + + let mut dealer = zeromq::DealerSocket::new(); + dealer.connect(endpoint.to_string().as_str()).await?; + + async_rt::task::sleep(Duration::from_millis(100)).await; + + // Split the router into send/recv halves and clone the send halves + let (send_half, mut recv_half) = router.split(); + let mut send_half_1 = send_half.clone(); + let mut send_half_2 = send_half; + + let num_messages: u32 = 10; + + // Dealer sends one message so router recv half can learn its identity + dealer.send(ZmqMessage::from("register-dealer")).await?; + let registration = recv_half.recv().await?; + let dealer_identity = registration.get(0).unwrap().clone(); + + let mut expected = Vec::new(); + for i in 0..num_messages { + expected.push(format!("sender-1->{i}")); + expected.push(format!("sender-2->{i}")); + } + expected.sort(); + + // First cloned send half sends its own series of messages + let dealer_identity_1 = dealer_identity.clone(); + let send_task_1 = async_rt::task::spawn(async move { + for i in 0..num_messages { + let message = message_for(dealer_identity_1.clone(), format!("sender-1->{i}")); + send_half_1.send(message).await.unwrap(); + } + }); + + // Second cloned send half sends concurrently to the same dealer + let dealer_identity_2 = dealer_identity.clone(); + let send_task_2 = async_rt::task::spawn(async move { + for i in 0..num_messages { + let message = message_for(dealer_identity_2.clone(), format!("sender-2->{i}")); + send_half_2.send(message).await.unwrap(); + } + }); + + // Dealer receives the full set of messages from both senders + let dealer_task = async_rt::task::spawn(async move { + let mut received = Vec::new(); + for _ in 0..(num_messages * 2) { + let reply = dealer.recv().await.unwrap(); + received.push(String::try_from(reply).unwrap()); + } + received.sort(); + received + }); + + let timeout = Duration::from_secs(5); + async_rt::task::timeout(timeout, async { + send_task_1.await.unwrap(); + send_task_2.await.unwrap(); + assert_eq!(dealer_task.await.unwrap(), expected); + }) + .await + .expect("test timed out"); + + Ok(()) + } } From 5b85c6f1d7ab15c4af78df3688c08bc38f8eeecb Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 1 Apr 2026 14:00:12 +0800 Subject: [PATCH 2/2] add docs Signed-off-by: Bugen Zhao --- src/router.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/router.rs b/src/router.rs index af359aa..f4dc7f6 100644 --- a/src/router.rs +++ b/src/router.rs @@ -158,6 +158,8 @@ impl Drop for RouterSocketInner { } /// The send half of a [`RouterSocket`] produced by [`RouterSocket::split`]. +/// +/// This half can be cloned to send from multiple async tasks concurrently. #[derive(Clone)] pub struct RouterSendHalf { inner: Arc,