Skip to content

Commit 380a1ea

Browse files
authored
Merge pull request #219 from zeromq/xpub
support xpub
2 parents 1a6d5c1 + da6eb3c commit 380a1ea

5 files changed

Lines changed: 599 additions & 1 deletion

File tree

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ Basic ZMTP implementation is working and tested against the reference implementa
1616
### Supported socket patterns:
1717

1818
* Request/Response (REQ, REP, DEALER, ROUTER)
19-
* Publish/Subscribe (PUB, SUB)
19+
* Publish/Subscribe (PUB, SUB, XPUB)
2020
* Pipeline (PUSH, PULL)
2121

2222
## Usage

examples/xpub_weather_server.rs

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
mod async_helpers;
2+
3+
use rand::Rng;
4+
use std::sync::Arc;
5+
use std::time::Duration;
6+
7+
use zeromq::*;
8+
9+
#[async_helpers::main]
10+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
11+
pretty_env_logger::try_init().ok();
12+
13+
println!("Starting XPUB weather server");
14+
println!("This example demonstrates the key difference between PUB and XPUB:");
15+
println!("XPUB exposes subscription/unsubscription messages to the application\n");
16+
17+
let mut socket = zeromq::XPubSocket::new();
18+
socket.bind("tcp://127.0.0.1:5557").await?;
19+
20+
println!("XPUB server bound to tcp://127.0.0.1:5557");
21+
println!(
22+
"Run the weather_client example and connect to port 5557 to see subscription messages\n"
23+
);
24+
25+
// Spawn a background task to handle subscription messages
26+
// This is the key difference between PUB and XPUB:
27+
// XPUB exposes subscription/unsubscription messages to the application
28+
let subscription_counter = Arc::new(std::sync::atomic::AtomicUsize::new(0));
29+
let _counter_clone = subscription_counter.clone();
30+
31+
#[cfg(feature = "tokio-runtime")]
32+
let _subscription_handle = tokio::spawn(async move {
33+
loop {
34+
match socket.recv().await {
35+
Ok(msg) => {
36+
let data = msg.get(0).unwrap();
37+
if !data.is_empty() {
38+
match data[0] {
39+
1 => {
40+
// Subscribe message (byte value 1)
41+
let topic = String::from_utf8_lossy(&data[1..]);
42+
if topic.is_empty() {
43+
println!("📥 Client subscribed to ALL topics");
44+
} else {
45+
println!("📥 Client subscribed to: '{}'", topic);
46+
}
47+
_counter_clone.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
48+
}
49+
0 => {
50+
// Unsubscribe message (byte value 0)
51+
let topic = String::from_utf8_lossy(&data[1..]);
52+
if topic.is_empty() {
53+
println!("📤 Client unsubscribed from ALL topics");
54+
} else {
55+
println!("📤 Client unsubscribed from: '{}'", topic);
56+
}
57+
}
58+
_ => {
59+
println!("⚠️ Unknown subscription message type: {}", data[0]);
60+
}
61+
}
62+
}
63+
}
64+
Err(e) => {
65+
eprintln!("Error receiving subscription: {:?}", e);
66+
break;
67+
}
68+
}
69+
}
70+
});
71+
72+
#[cfg(feature = "async-std-runtime")]
73+
let _subscription_handle = async_std::task::spawn(async move {
74+
loop {
75+
match socket.recv().await {
76+
Ok(msg) => {
77+
let data = msg.get(0).unwrap();
78+
if !data.is_empty() {
79+
match data[0] {
80+
1 => {
81+
let topic = String::from_utf8_lossy(&data[1..]);
82+
if topic.is_empty() {
83+
println!("📥 Client subscribed to ALL topics");
84+
} else {
85+
println!("📥 Client subscribed to: '{}'", topic);
86+
}
87+
_counter_clone.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
88+
}
89+
0 => {
90+
let topic = String::from_utf8_lossy(&data[1..]);
91+
if topic.is_empty() {
92+
println!("📤 Client unsubscribed from ALL topics");
93+
} else {
94+
println!("📤 Client unsubscribed from: '{}'", topic);
95+
}
96+
}
97+
_ => {
98+
println!("⚠️ Unknown subscription message type: {}", data[0]);
99+
}
100+
}
101+
}
102+
}
103+
Err(e) => {
104+
eprintln!("Error receiving subscription: {:?}", e);
105+
break;
106+
}
107+
}
108+
}
109+
});
110+
111+
// Give the subscription handler time to start
112+
async_helpers::sleep(Duration::from_millis(100)).await;
113+
114+
// Create a new socket for publishing (since we moved the first one into the task)
115+
let mut pub_socket = zeromq::XPubSocket::new();
116+
pub_socket.bind("tcp://127.0.0.1:5558").await?;
117+
118+
println!("Publishing weather updates on tcp://127.0.0.1:5558");
119+
println!("Format: <zipcode> <temperature> <humidity>\n");
120+
121+
let mut rng = rand::rng();
122+
let mut counter = 0;
123+
124+
loop {
125+
let zipcode = rng.random_range(10000..10010);
126+
let temperature = rng.random_range(-80..135);
127+
let relhumidity = rng.random_range(10..60);
128+
129+
let message = format!("{} {} {}", zipcode, temperature, relhumidity);
130+
pub_socket.send(message.into()).await?;
131+
132+
counter += 1;
133+
if counter % 50 == 0 {
134+
let subs = subscription_counter.load(std::sync::atomic::Ordering::Relaxed);
135+
println!(
136+
"📡 Sent {} weather updates (active subscriptions: {})",
137+
counter, subs
138+
);
139+
}
140+
141+
async_helpers::sleep(Duration::from_millis(100)).await;
142+
}
143+
}

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ mod sub;
1818
mod task_handle;
1919
mod transport;
2020
pub mod util;
21+
mod xpub;
2122

2223
#[doc(hidden)]
2324
pub mod __async_rt {
@@ -36,6 +37,7 @@ pub use crate::rep::*;
3637
pub use crate::req::*;
3738
pub use crate::router::*;
3839
pub use crate::sub::*;
40+
pub use crate::xpub::*;
3941

4042
use crate::codec::*;
4143
use crate::transport::AcceptStopHandle;

0 commit comments

Comments
 (0)