Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions src/dealer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,94 @@ impl SocketSend for DealerSocket {
}

impl CaptureSocket for DealerSocket {}

impl DealerSocket {
/// Splits the socket into separate send and recv halves, allowing concurrent
/// sending and receiving from independent async tasks.
///
/// The underlying socket stays alive until both halves are dropped.
pub fn split(mut self) -> (DealerSendHalf, DealerRecvHalf) {
// Swap the real fields out before self drops. The dummy backend's
// shutdown() is a no-op on an empty peer map.
let backend = std::mem::replace(
&mut self.backend,
Arc::new(GenericSocketBackend::with_options(
None,
SocketType::DEALER,
SocketOptions::default(),
)),
);
let fair_queue = std::mem::replace(&mut self.fair_queue, FairQueue::new(true));
let binds = std::mem::take(&mut self.binds);

let inner = Arc::new(DealerSocketInner {
backend,
_binds: binds,
});

(
DealerSendHalf {
inner: inner.clone(),
},
DealerRecvHalf {
_inner: inner,
fair_queue,
},
)
}
}

struct DealerSocketInner {
backend: Arc<GenericSocketBackend>,
_binds: HashMap<Endpoint, AcceptStopHandle>,
}

impl Drop for DealerSocketInner {
fn drop(&mut self) {
self.backend.shutdown();
}
}

/// The send half of a [`DealerSocket`] produced by [`DealerSocket::split`].
pub struct DealerSendHalf {
inner: Arc<DealerSocketInner>,
}

/// The recv half of a [`DealerSocket`] produced by [`DealerSocket::split`].
pub struct DealerRecvHalf {
_inner: Arc<DealerSocketInner>,
fair_queue: FairQueue<ZmqFramedRead, PeerIdentity>,
}

#[async_trait]
impl SocketSend for DealerSendHalf {
async fn send(&mut self, message: ZmqMessage) -> ZmqResult<()> {
self.inner
.backend
.send_round_robin(Message::Message(message))
.await?;
Ok(())
}
}

impl CaptureSocket for DealerSendHalf {}

#[async_trait]
impl SocketRecv for DealerRecvHalf {
async fn recv(&mut self) -> ZmqResult<ZmqMessage> {
loop {
match self.fair_queue.next().await {
Some((_peer_id, Ok(Message::Message(message)))) => {
return Ok(message);
}
Some((_peer_id, Ok(_))) => {}
Some((_peer_id, Err(e))) => {
return Err(e.into());
}
None => {
return Err(ZmqError::NoMessage);
}
};
}
}
}
71 changes: 71 additions & 0 deletions tests/dealer_split.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#[cfg(test)]
mod test {

use zeromq::__async_rt as async_rt;
use zeromq::prelude::*;
use zeromq::ZmqMessage;

use std::error::Error;
use std::time::Duration;

fn assert_send<T: Send>() {}

#[test]
fn split_halves_are_send() {
assert_send::<zeromq::DealerSendHalf>();
assert_send::<zeromq::DealerRecvHalf>();
}

#[async_rt::test]
async fn test_dealer_split_concurrent_send_recv() -> Result<(), Box<dyn Error>> {
pretty_env_logger::try_init().ok();

// Use DEALER-to-DEALER (compatible pair, no envelope framing needed)
let mut server = zeromq::DealerSocket::new();
let endpoint = server.bind("tcp://localhost:0").await?;

let mut client = zeromq::DealerSocket::new();
client.connect(endpoint.to_string().as_str()).await?;

// Give connection time to establish
async_rt::task::sleep(Duration::from_millis(100)).await;

let (mut send_half, mut recv_half) = client.split();

let num_messages: u32 = 5;

// Server echo loop
let server_task = async_rt::task::spawn(async move {
for _ in 0..num_messages {
let msg = server.recv().await.unwrap();
server.send(msg).await.unwrap();
}
});

// Sender in its own task
let send_task = async_rt::task::spawn(async move {
for i in 0..num_messages {
let msg = ZmqMessage::from(format!("msg-{}", i));
send_half.send(msg).await.unwrap();
}
});

// Receiver in its own task
let recv_task = async_rt::task::spawn(async move {
for _ in 0..num_messages {
let _reply = recv_half.recv().await.unwrap();
}
});

let timeout = Duration::from_secs(5);
async_rt::task::timeout(timeout, async {
send_task.await.unwrap();
recv_task.await.unwrap();
server_task.await.unwrap();
})
.await
.expect("test timed out");

Ok(())
}
}
Loading