Skip to content

Commit 2fdcab6

Browse files
authored
Merge pull request #229 from zeromq/libzmq-conformance-tests
Add libzmq conformance tests
2 parents 499b50d + 3e30b73 commit 2fdcab6

6 files changed

Lines changed: 682 additions & 2 deletions

File tree

tests/compliance/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::convert::TryInto;
22

33
/// NOTE: This will block. Careful when using in async code.
4+
#[allow(dead_code)]
45
pub fn get_monitor_event(monitor: &zmq2::Socket) -> (zmq2::SocketEvent, u32, String) {
56
assert_eq!(monitor.get_socket_type().unwrap(), zmq2::PAIR);
67
let mut msgs = monitor.recv_multipart(0).expect("Monitor couldn't recv");
@@ -18,6 +19,7 @@ pub fn get_monitor_event(monitor: &zmq2::Socket) -> (zmq2::SocketEvent, u32, Str
1819
}
1920

2021
/// Configures `their_sock` with a socket monitor, and returns the monitor
22+
#[allow(dead_code)]
2123
pub fn setup_monitor(
2224
ctx: &zmq2::Context,
2325
their_sock: &zmq2::Socket,

tests/pub_sub.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
#[cfg(test)]
22
mod test {
3+
use zeromq::__async_rt as async_rt;
34
use zeromq::prelude::*;
45
use zeromq::Endpoint;
56
use zeromq::ZmqMessage;
6-
use zeromq::__async_rt as async_rt;
77

88
use futures::channel::{mpsc, oneshot};
99
use futures::{SinkExt, StreamExt};

tests/pub_sub_compliant_reverse.rs

Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
//! Conformance tests for our PUB with their SUB (reverse of `pub_sub_compliant.rs`).
2+
//!
3+
//! Tests that our zmq.rs PUB socket correctly broadcasts to libzmq SUB sockets.
4+
5+
mod compliance;
6+
7+
use zeromq::__async_rt as async_rt;
8+
use zeromq::prelude::*;
9+
use zeromq::ZmqMessage;
10+
11+
use std::time::Duration;
12+
13+
async fn setup_our_pub(bind_endpoint: &str) -> (zeromq::PubSocket, String) {
14+
let mut our_pub = zeromq::PubSocket::new();
15+
let endpoint = our_pub.bind(bind_endpoint).await.expect("Failed to bind");
16+
(our_pub, endpoint.to_string())
17+
}
18+
19+
fn setup_their_subs(
20+
ctx: &zmq2::Context,
21+
connect_endpoint: &str,
22+
n_subs: usize,
23+
subscription: &[u8],
24+
) -> Vec<zmq2::Socket> {
25+
let mut subs = Vec::new();
26+
for _ in 0..n_subs {
27+
let their_sub = ctx.socket(zmq2::SUB).expect("Couldn't make sub socket");
28+
their_sub.set_ipv6(true).expect("Failed to enable IPV6");
29+
their_sub
30+
.connect(connect_endpoint)
31+
.expect("Failed to connect");
32+
their_sub
33+
.set_subscribe(subscription)
34+
.expect("Failed to subscribe");
35+
subs.push(their_sub);
36+
}
37+
subs
38+
}
39+
40+
#[cfg(test)]
41+
mod test {
42+
use super::*;
43+
44+
#[async_rt::test]
45+
async fn test_our_pub_their_sub() {
46+
pretty_env_logger::try_init().ok();
47+
48+
const N_SUBS: usize = 4;
49+
50+
async fn do_test(our_endpoint: &str) {
51+
let (mut our_pub, bind_endpoint) = setup_our_pub(our_endpoint).await;
52+
println!("Our PUB bound to {}", bind_endpoint);
53+
54+
let ctx = zmq2::Context::new();
55+
let their_subs = setup_their_subs(&ctx, &bind_endpoint, N_SUBS, b"");
56+
57+
// Set receive timeout to avoid blocking forever
58+
for sub in &their_subs {
59+
sub.set_rcvtimeo(2000).expect("Failed to set timeout");
60+
}
61+
62+
// Slow joiner: wait for subscriptions to propagate
63+
async_rt::task::sleep(Duration::from_millis(300)).await;
64+
65+
const NUM_MSGS: u32 = 16;
66+
67+
// Run their subs in threads - collect messages with timeout
68+
let sub_handles: Vec<_> = their_subs
69+
.into_iter()
70+
.enumerate()
71+
.map(|(idx, sub)| {
72+
std::thread::spawn(move || {
73+
let mut received = Vec::new();
74+
while let Ok(Ok(msg)) = sub.recv_string(0) {
75+
received.push(msg);
76+
}
77+
(idx, received)
78+
})
79+
})
80+
.collect();
81+
82+
// Our pub sends
83+
for i in 0..NUM_MSGS {
84+
let msg = ZmqMessage::from(format!("Message: {}", i));
85+
our_pub.send(msg).await.expect("Failed to send");
86+
}
87+
88+
// Wait a bit for messages to propagate, then verify
89+
async_rt::task::sleep(Duration::from_millis(500)).await;
90+
91+
// Join all subscriber threads and verify they received messages
92+
for handle in sub_handles {
93+
let (idx, received) = handle.join().expect("Sub thread panicked");
94+
// Each sub should receive at least some messages (slow joiner may miss early ones)
95+
assert!(!received.is_empty(), "Sub {} received no messages", idx);
96+
println!("Sub {} received {} messages", idx, received.len());
97+
}
98+
}
99+
100+
let endpoints = vec![
101+
"tcp://127.0.0.1:0",
102+
"tcp://[::1]:0",
103+
"ipc://our_pub_test.sock",
104+
];
105+
106+
for e in endpoints {
107+
println!("Testing with endpoint {}", e);
108+
do_test(e).await;
109+
110+
// Clean up IPC socket files
111+
if let Some(path) = e.strip_prefix("ipc://") {
112+
std::fs::remove_file(path).ok();
113+
}
114+
}
115+
}
116+
117+
#[async_rt::test]
118+
async fn test_our_pub_their_sub_topic_filtering() {
119+
pretty_env_logger::try_init().ok();
120+
121+
let (mut our_pub, bind_endpoint) = setup_our_pub("tcp://127.0.0.1:0").await;
122+
println!("Our PUB bound to {}", bind_endpoint);
123+
124+
let ctx = zmq2::Context::new();
125+
126+
// Create subs with different topic filters
127+
let topic1_sub = ctx.socket(zmq2::SUB).expect("Couldn't make sub");
128+
topic1_sub
129+
.connect(&bind_endpoint)
130+
.expect("Failed to connect");
131+
topic1_sub
132+
.set_subscribe(b"topic1")
133+
.expect("Failed to subscribe");
134+
135+
let topic2_sub = ctx.socket(zmq2::SUB).expect("Couldn't make sub");
136+
topic2_sub
137+
.connect(&bind_endpoint)
138+
.expect("Failed to connect");
139+
topic2_sub
140+
.set_subscribe(b"topic2")
141+
.expect("Failed to subscribe");
142+
143+
let all_sub = ctx.socket(zmq2::SUB).expect("Couldn't make sub");
144+
all_sub.connect(&bind_endpoint).expect("Failed to connect");
145+
all_sub.set_subscribe(b"").expect("Failed to subscribe");
146+
147+
// Wait for subscriptions
148+
async_rt::task::sleep(Duration::from_millis(200)).await;
149+
150+
// Spawn receiver threads with RCVTIMEO to avoid blocking forever
151+
topic1_sub
152+
.set_rcvtimeo(1000)
153+
.expect("Failed to set timeout");
154+
topic2_sub
155+
.set_rcvtimeo(1000)
156+
.expect("Failed to set timeout");
157+
all_sub.set_rcvtimeo(1000).expect("Failed to set timeout");
158+
159+
let topic1_handle = std::thread::spawn(move || {
160+
let mut received = Vec::new();
161+
while let Ok(msg) = topic1_sub.recv_string(0) {
162+
received.push(msg.unwrap());
163+
}
164+
received
165+
});
166+
167+
let topic2_handle = std::thread::spawn(move || {
168+
let mut received = Vec::new();
169+
while let Ok(msg) = topic2_sub.recv_string(0) {
170+
received.push(msg.unwrap());
171+
}
172+
received
173+
});
174+
175+
let all_handle = std::thread::spawn(move || {
176+
let mut received = Vec::new();
177+
while let Ok(msg) = all_sub.recv_string(0) {
178+
received.push(msg.unwrap());
179+
}
180+
received
181+
});
182+
183+
// Send messages with different topics
184+
for msg in &[
185+
"topic1-message-a",
186+
"topic2-message-b",
187+
"topic1-message-c",
188+
"other-message-d",
189+
"topic2-message-e",
190+
] {
191+
our_pub
192+
.send(ZmqMessage::from(*msg))
193+
.await
194+
.expect("Failed to send");
195+
}
196+
197+
// Wait and check results
198+
let topic1_msgs = topic1_handle.join().expect("topic1 thread panicked");
199+
let topic2_msgs = topic2_handle.join().expect("topic2 thread panicked");
200+
let all_msgs = all_handle.join().expect("all thread panicked");
201+
202+
assert_eq!(
203+
topic1_msgs,
204+
vec!["topic1-message-a", "topic1-message-c"],
205+
"topic1 sub should only receive topic1 messages"
206+
);
207+
assert_eq!(
208+
topic2_msgs,
209+
vec!["topic2-message-b", "topic2-message-e"],
210+
"topic2 sub should only receive topic2 messages"
211+
);
212+
assert_eq!(all_msgs.len(), 5, "all sub should receive all messages");
213+
}
214+
}

0 commit comments

Comments
 (0)