Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ impl Drop for RouterSocketInner {
}

/// The send half of a [`RouterSocket`] produced by [`RouterSocket::split`].
///
/// This half can be cloned to send from multiple async tasks concurrently.
#[derive(Clone)]
pub struct RouterSendHalf {
inner: Arc<RouterSocketInner>,
}
Expand Down
86 changes: 86 additions & 0 deletions tests/router_split.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#[cfg(test)]
mod test {

use bytes::Bytes;
use zeromq::__async_rt as async_rt;
use zeromq::prelude::*;
use zeromq::ZmqMessage;
Expand All @@ -9,13 +10,19 @@ mod test {
use std::time::Duration;

fn assert_send<T: Send>() {}
fn assert_clone<T: Clone>() {}

#[test]
fn split_halves_are_send() {
assert_send::<zeromq::RouterSendHalf>();
assert_send::<zeromq::RouterRecvHalf>();
}

#[test]
fn router_send_half_is_clone() {
assert_clone::<zeromq::RouterSendHalf>();
}

#[async_rt::test]
async fn test_router_split_concurrent_send_recv() -> Result<(), Box<dyn Error>> {
pretty_env_logger::try_init().ok();
Expand Down Expand Up @@ -69,4 +76,83 @@ mod test {

Ok(())
}

#[async_rt::test]
async fn test_router_split_send_half_clone_concurrent_send() -> Result<(), Box<dyn Error>> {
pretty_env_logger::try_init().ok();

fn message_for(identity: Bytes, payload: String) -> ZmqMessage {
let mut msg = ZmqMessage::from(payload);
msg.push_front(identity);
msg
}

// ROUTER binds, DEALER connects
let mut router = zeromq::RouterSocket::new();
let endpoint = router.bind("tcp://localhost:0").await?;

let mut dealer = zeromq::DealerSocket::new();
dealer.connect(endpoint.to_string().as_str()).await?;

async_rt::task::sleep(Duration::from_millis(100)).await;

// Split the router into send/recv halves and clone the send halves
let (send_half, mut recv_half) = router.split();
let mut send_half_1 = send_half.clone();
let mut send_half_2 = send_half;

let num_messages: u32 = 10;

// Dealer sends one message so router recv half can learn its identity
dealer.send(ZmqMessage::from("register-dealer")).await?;
let registration = recv_half.recv().await?;
let dealer_identity = registration.get(0).unwrap().clone();

let mut expected = Vec::new();
for i in 0..num_messages {
expected.push(format!("sender-1->{i}"));
expected.push(format!("sender-2->{i}"));
}
expected.sort();

// First cloned send half sends its own series of messages
let dealer_identity_1 = dealer_identity.clone();
let send_task_1 = async_rt::task::spawn(async move {
for i in 0..num_messages {
let message = message_for(dealer_identity_1.clone(), format!("sender-1->{i}"));
send_half_1.send(message).await.unwrap();
}
});

// Second cloned send half sends concurrently to the same dealer
let dealer_identity_2 = dealer_identity.clone();
let send_task_2 = async_rt::task::spawn(async move {
for i in 0..num_messages {
let message = message_for(dealer_identity_2.clone(), format!("sender-2->{i}"));
send_half_2.send(message).await.unwrap();
}
});

// Dealer receives the full set of messages from both senders
let dealer_task = async_rt::task::spawn(async move {
let mut received = Vec::new();
for _ in 0..(num_messages * 2) {
let reply = dealer.recv().await.unwrap();
received.push(String::try_from(reply).unwrap());
}
received.sort();
received
});

let timeout = Duration::from_secs(5);
async_rt::task::timeout(timeout, async {
send_task_1.await.unwrap();
send_task_2.await.unwrap();
assert_eq!(dealer_task.await.unwrap(), expected);
})
.await
.expect("test timed out");

Ok(())
}
}
Loading