Skip to content

Commit 86fda88

Browse files
authored
feat: add Pulsar Admin REST API client for topic policy management (#403)
Adds optional `admin-api` feature backed by reqwest/native-tls that exposes an AdminClient for setting broker-side topic policies (e.g. maxUnackedMessagesOnConsumer). Policies can be applied manually via Pulsar.admin() and its associated methods. Updates CI to enable the feature and set topicLevelPoliciesEnabled=true on the test broker. Also, drop Pulsar 2.10.6 from test matrix because that version does not support topicLevelPoliciesEnabled via Docker env vars. The code will still work on 2.10.6; this is just a test concern. Fixes: #402 Fixes: #143
1 parent 7bea637 commit 86fda88

7 files changed

Lines changed: 428 additions & 8 deletions

File tree

.github/workflows/rust.yml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ jobs:
2525
run: cargo build --features protobuf-src
2626

2727
- name: Clippy (feature set A)
28-
run: cargo clippy --tests --features telemetry,protobuf-src -- -D warnings
28+
run: cargo clippy --tests --features telemetry,protobuf-src,admin-api -- -D warnings
2929

3030
- name: Clippy (feature set B)
3131
run: cargo clippy --tests --no-default-features --features compression,tokio-rustls-runtime,async-std-rustls-runtime,auth-oauth2,telemetry,protobuf-src -- -D warnings
@@ -39,7 +39,7 @@ jobs:
3939
strategy:
4040
fail-fast: false
4141
matrix:
42-
pulsar-version: [2.10.6, 2.11.4, 3.0.8, 3.2.4, 3.3.3, 4.0.1, 4.1.2]
42+
pulsar-version: [2.11.4, 3.0.8, 3.2.4, 3.3.3, 4.0.1, 4.1.2]
4343
steps:
4444
- uses: actions/checkout@v4
4545
- uses: dtolnay/rust-toolchain@stable
@@ -55,6 +55,7 @@ jobs:
5555
-e PULSAR_PREFIX_advertisedListeners=pulsar://127.0.0.1:6650 \
5656
-e PULSAR_PREFIX_brokerServicePort=6650 \
5757
-e PULSAR_PREFIX_webServicePort=8080 \
58+
-e PULSAR_PREFIX_topicLevelPoliciesEnabled=true \
5859
apachepulsar/pulsar:${{ matrix.pulsar-version }} \
5960
bin/pulsar standalone
6061
@@ -89,7 +90,7 @@ jobs:
8990
RUST_BACKTRACE: 1
9091
RUST_LOG: pulsar=debug
9192
RUST_TEST_THREADS: 1
92-
run: cargo test --features protobuf-src -- --nocapture
93+
run: cargo test --features protobuf-src,admin-api -- --nocapture
9394

9495
- name: Dump Pulsar logs on failure
9596
if: failure()

Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ pem = "^3.0.4"
3939
prost = "^0.13.4"
4040
prost-derive = "^0.13.4"
4141
rand = "^0.8.5"
42+
reqwest = { version = "0.13", optional = true, default-features = false }
4243
regex = "^1.11.1"
4344
rustls = { version = "^0.23.27", default-features = false, features = ["log", "std"] , optional = true }
4445
snap = { version = "^1.1.1", optional = true }
@@ -60,7 +61,7 @@ serde = { version = "^1.0.216", features = ["derive"] }
6061
serde_json = "^1.0.133"
6162
tokio = { version = "^1.42.0", features = ["macros", "rt-multi-thread"] }
6263
assert_matches = "1.5.0"
63-
reqwest = { version = "0.12.23", features = ["json"] }
64+
reqwest = { version = "0.13", features = ["json"] }
6465

6566
[build-dependencies]
6667
prost-build = "^0.13.4"
@@ -75,6 +76,7 @@ auth-oauth2 = ["openidconnect", "oauth2", "serde", "serde_json", "data-url"]
7576
compression = ["lz4", "flate2", "zstd", "snap"]
7677
default = ["compression", "tokio-runtime", "async-std-runtime", "auth-oauth2"]
7778
protobuf-src = ["dep:protobuf-src"]
79+
admin-api = ["dep:reqwest", "reqwest/native-tls"]
7880
telemetry = ["tracing"]
7981
tokio-runtime = ["tokio", "tokio-util", "native-tls", "tokio-native-tls"]
8082
tokio-rustls-runtime = ["tokio-rustls-runtime-aws-lc-rs"]

src/admin.rs

Lines changed: 259 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,259 @@
1+
//! Pulsar Admin REST API client.
2+
//!
3+
//! Enabled by the `admin-api` feature flag. Requires a tokio runtime.
4+
5+
use std::{sync::Arc, time::Duration};
6+
7+
use futures::lock::Mutex;
8+
9+
use crate::{
10+
authentication::Authentication,
11+
connection_manager::TlsOptions,
12+
error::{AdminError, Error},
13+
};
14+
15+
/// Parses a Pulsar topic URL into (scheme, tenant, namespace, topic_name).
16+
/// Accepts `persistent://` and `non-persistent://` prefixes, or a bare
17+
/// `tenant/namespace/topic` string which defaults to `persistent://`.
18+
fn parse_topic(topic: &str) -> Result<(&str, &str, &str, &str), Error> {
19+
let invalid = || {
20+
Error::Admin(AdminError::InvalidTopic(format!(
21+
"expected tenant/namespace/topic or a fully-qualified topic URL, got: {topic}"
22+
)))
23+
};
24+
25+
let (scheme, rest) = if let Some(rest) = topic.strip_prefix("persistent://") {
26+
("persistent", rest)
27+
} else if let Some(rest) = topic.strip_prefix("non-persistent://") {
28+
("non-persistent", rest)
29+
} else {
30+
("persistent", topic)
31+
};
32+
33+
let mut parts = rest.splitn(3, '/');
34+
let tenant = parts.next().filter(|s| !s.is_empty()).ok_or_else(invalid)?;
35+
let namespace = parts.next().filter(|s| !s.is_empty()).ok_or_else(invalid)?;
36+
let name = parts.next().filter(|s| !s.is_empty()).ok_or_else(invalid)?;
37+
38+
Ok((scheme, tenant, namespace, name))
39+
}
40+
41+
/// Client for the Pulsar Admin REST API.
42+
///
43+
/// Obtain an instance via [`Pulsar::admin()`][crate::Pulsar::admin].
44+
///
45+
/// # Example
46+
///
47+
/// ```rust,no_run
48+
/// # async fn run(pulsar: pulsar::Pulsar<pulsar::TokioExecutor>) -> Result<(), pulsar::Error> {
49+
/// let admin = pulsar.admin("http://localhost:8080")?;
50+
/// admin
51+
/// .set_max_unacked_messages_per_consumer(
52+
/// "persistent://public/default/my-topic",
53+
/// 500,
54+
/// )
55+
/// .await?;
56+
/// # Ok(())
57+
/// # }
58+
/// ```
59+
pub struct AdminClient {
60+
client: reqwest::Client,
61+
admin_url: String,
62+
auth: Option<Arc<Mutex<Box<dyn Authentication>>>>,
63+
}
64+
65+
impl AdminClient {
66+
/// Creates a new `AdminClient`.
67+
///
68+
/// Reuses the TLS and authentication configuration already present on the
69+
/// [`Pulsar`][crate::Pulsar] client. Called internally by
70+
/// [`Pulsar::admin()`][crate::Pulsar::admin].
71+
pub(crate) fn new(
72+
admin_url: String,
73+
tls_options: &TlsOptions,
74+
auth: Option<Arc<Mutex<Box<dyn Authentication>>>>,
75+
) -> Result<Self, Error> {
76+
let mut builder = reqwest::ClientBuilder::new()
77+
.timeout(Duration::from_secs(30))
78+
.danger_accept_invalid_certs(tls_options.allow_insecure_connection);
79+
80+
builder = builder.danger_accept_invalid_hostnames(
81+
tls_options.allow_insecure_connection || !tls_options.tls_hostname_verification_enabled,
82+
);
83+
84+
if let Some(pem_bytes) = &tls_options.certificate_chain {
85+
let certs = pem::parse_many(pem_bytes).map_err(|e| {
86+
Error::Admin(AdminError::TlsConfig(format!(
87+
"failed to parse certificate chain: {e}"
88+
)))
89+
})?;
90+
for cert in certs.iter().rev() {
91+
let reqwest_cert = reqwest::Certificate::from_der(cert.contents())
92+
.map_err(|e| Error::Admin(AdminError::Request(e)))?;
93+
builder = builder.add_root_certificate(reqwest_cert);
94+
}
95+
}
96+
97+
Ok(AdminClient {
98+
client: builder
99+
.build()
100+
.map_err(|e| Error::Admin(AdminError::Request(e)))?,
101+
admin_url: admin_url.trim_end_matches('/').to_string(),
102+
auth,
103+
})
104+
}
105+
106+
async fn apply_auth(
107+
&self,
108+
req: reqwest::RequestBuilder,
109+
) -> Result<reqwest::RequestBuilder, Error> {
110+
let Some(auth) = &self.auth else {
111+
return Ok(req);
112+
};
113+
let mut auth = auth.lock().await;
114+
let method = auth.auth_method_name();
115+
let data = auth.auth_data().await.map_err(Error::Authentication)?;
116+
let data_str = String::from_utf8(data)
117+
.map_err(|e| Error::Custom(format!("auth data is not valid UTF-8: {e}")))?;
118+
Ok(match method.as_str() {
119+
"token" => req.bearer_auth(data_str),
120+
"basic" => match data_str.split_once(':') {
121+
Some((user, pass)) => req.basic_auth(user, Some(pass)),
122+
None => req.basic_auth(&data_str, None::<&str>),
123+
},
124+
_ => req,
125+
})
126+
}
127+
128+
fn topic_policy_url(&self, topic: &str, policy: &str) -> Result<String, Error> {
129+
let (scheme, tenant, namespace, name) = parse_topic(topic)?;
130+
Ok(format!(
131+
"{}/admin/v2/{}/{}/{}/{}/{policy}",
132+
self.admin_url, scheme, tenant, namespace, name
133+
))
134+
}
135+
136+
async fn check_response(&self, resp: reqwest::Response) -> Result<(), Error> {
137+
if resp.status().is_success() {
138+
return Ok(());
139+
}
140+
let status = resp.status().as_u16();
141+
let body = resp.text().await.unwrap_or_default();
142+
Err(Error::Admin(AdminError::Http { status, body }))
143+
}
144+
145+
/// Sets the maximum number of unacknowledged messages allowed per consumer
146+
/// on a topic.
147+
///
148+
/// This is a persistent broker-side topic policy. The topic must already
149+
/// exist when this is called (subscribe a consumer first, then call this).
150+
/// Requires `topicLevelPoliciesEnabled=true` in the broker configuration.
151+
pub async fn set_max_unacked_messages_per_consumer(
152+
&self,
153+
topic: &str,
154+
max_unacked: u32,
155+
) -> Result<(), Error> {
156+
let url = self.topic_policy_url(topic, "maxUnackedMessagesOnConsumer")?;
157+
let req = self
158+
.client
159+
.post(&url)
160+
.header("Content-Type", "application/json")
161+
.body(max_unacked.to_string());
162+
let req = self.apply_auth(req).await?;
163+
let resp = req
164+
.send()
165+
.await
166+
.map_err(|e| Error::Admin(AdminError::Request(e)))?;
167+
self.check_response(resp).await
168+
}
169+
170+
/// Removes the per-topic max unacked messages override, reverting to the
171+
/// broker or namespace default.
172+
///
173+
/// To disable the limit without removing the topic-level override, call
174+
/// [`set_max_unacked_messages_per_consumer`][Self::set_max_unacked_messages_per_consumer]
175+
/// with a value of `0` (unlimited).
176+
pub async fn remove_max_unacked_messages_per_consumer(&self, topic: &str) -> Result<(), Error> {
177+
let url = self.topic_policy_url(topic, "maxUnackedMessagesOnConsumer")?;
178+
let req = self.client.delete(&url);
179+
let req = self.apply_auth(req).await?;
180+
let resp = req
181+
.send()
182+
.await
183+
.map_err(|e| Error::Admin(AdminError::Request(e)))?;
184+
self.check_response(resp).await
185+
}
186+
}
187+
188+
#[cfg(test)]
189+
mod tests {
190+
use super::*;
191+
192+
#[test]
193+
fn test_parse_topic_persistent() {
194+
let (scheme, tenant, ns, name) =
195+
parse_topic("persistent://my-tenant/my-namespace/my-topic").unwrap();
196+
assert_eq!(scheme, "persistent");
197+
assert_eq!(tenant, "my-tenant");
198+
assert_eq!(ns, "my-namespace");
199+
assert_eq!(name, "my-topic");
200+
}
201+
202+
#[test]
203+
fn test_parse_topic_non_persistent() {
204+
let (scheme, tenant, ns, name) = parse_topic("non-persistent://tenant/ns/topic").unwrap();
205+
assert_eq!(scheme, "non-persistent");
206+
assert_eq!(tenant, "tenant");
207+
assert_eq!(ns, "ns");
208+
assert_eq!(name, "topic");
209+
}
210+
211+
#[test]
212+
fn test_parse_topic_bare() {
213+
// No prefix defaults to persistent://
214+
let (scheme, tenant, ns, name) = parse_topic("tenant/ns/topic").unwrap();
215+
assert_eq!(scheme, "persistent");
216+
assert_eq!(tenant, "tenant");
217+
assert_eq!(ns, "ns");
218+
assert_eq!(name, "topic");
219+
}
220+
221+
#[test]
222+
fn test_parse_topic_missing_parts() {
223+
assert!(parse_topic("").is_err());
224+
assert!(parse_topic("tenant").is_err());
225+
assert!(parse_topic("tenant/ns").is_err());
226+
// trailing slash = empty topic name
227+
assert!(parse_topic("tenant/ns/").is_err());
228+
assert!(parse_topic("persistent://").is_err());
229+
assert!(parse_topic("persistent://tenant").is_err());
230+
assert!(parse_topic("persistent://tenant/ns").is_err());
231+
assert!(parse_topic("persistent://tenant/ns/").is_err());
232+
}
233+
234+
#[test]
235+
fn test_topic_policy_url() {
236+
let client = AdminClient {
237+
client: reqwest::Client::new(),
238+
admin_url: "http://localhost:8080".to_string(),
239+
auth: None,
240+
};
241+
assert_eq!(
242+
client
243+
.topic_policy_url(
244+
"persistent://public/default/my-topic",
245+
"maxUnackedMessagesOnConsumer"
246+
)
247+
.unwrap(),
248+
"http://localhost:8080/admin/v2/persistent/public/default/my-topic/maxUnackedMessagesOnConsumer"
249+
);
250+
}
251+
252+
#[test]
253+
fn test_admin_url_trailing_slash_stripped() {
254+
// Trailing slash on admin_url should be normalized away
255+
let tls = TlsOptions::default();
256+
let client = AdminClient::new("http://localhost:8080/".to_string(), &tls, None).unwrap();
257+
assert_eq!(client.admin_url, "http://localhost:8080");
258+
}
259+
}

src/client.rs

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -447,6 +447,40 @@ impl<Exe: Executor> Pulsar<Exe> {
447447
.map_err(|_| Error::Custom("producer unexpectedly disconnected".into()))?;
448448
Ok(SendFuture(future))
449449
}
450+
451+
/// Creates an [`AdminClient`][crate::AdminClient] for this cluster.
452+
///
453+
/// The admin client reuses the TLS and authentication configuration
454+
/// already present on this `Pulsar` instance. Requires one of the
455+
/// `admin-api` feature flags and a tokio runtime.
456+
///
457+
/// # Arguments
458+
///
459+
/// * `admin_url` — base URL of the Pulsar admin HTTP endpoint, e.g.
460+
/// `"http://pulsar-proxy"`.
461+
///
462+
/// # Example
463+
///
464+
/// ```rust,no_run
465+
/// # async fn run(pulsar: pulsar::Pulsar<pulsar::TokioExecutor>) -> Result<(), pulsar::Error> {
466+
/// let admin = pulsar.admin("http://pulsar-proxy")?;
467+
/// admin
468+
/// .set_max_unacked_messages_per_consumer(
469+
/// "persistent://public/default/my-topic",
470+
/// 500,
471+
/// )
472+
/// .await?;
473+
/// # Ok(())
474+
/// # }
475+
/// ```
476+
#[cfg(feature = "admin-api")]
477+
pub fn admin(&self, admin_url: impl Into<String>) -> Result<crate::AdminClient, Error> {
478+
crate::admin::AdminClient::new(
479+
admin_url.into(),
480+
&self.manager.tls_options,
481+
self.manager.auth.clone(),
482+
)
483+
}
450484
}
451485

452486
/// Helper structure to generate a [Pulsar] client
@@ -574,7 +608,7 @@ impl<Exe: Executor> PulsarBuilder<Exe> {
574608
executor,
575609
} = self;
576610

577-
Pulsar::new(
611+
let pulsar = Pulsar::new(
578612
url,
579613
auth_provider.map(|p| Arc::new(Mutex::new(p))),
580614
connection_retry_options,
@@ -583,7 +617,9 @@ impl<Exe: Executor> PulsarBuilder<Exe> {
583617
outbound_channel_size,
584618
executor,
585619
)
586-
.await
620+
.await?;
621+
622+
Ok(pulsar)
587623
}
588624
}
589625

src/connection_manager.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,12 +172,12 @@ enum ConnectionStatus<Exe: Executor> {
172172
#[derive(Clone)]
173173
pub struct ConnectionManager<Exe: Executor> {
174174
pub url: Url,
175-
auth: Option<Arc<Mutex<Box<dyn crate::authentication::Authentication>>>>,
175+
pub(crate) auth: Option<Arc<Mutex<Box<dyn crate::authentication::Authentication>>>>,
176176
pub(crate) executor: Arc<Exe>,
177177
connections: Arc<Mutex<HashMap<BrokerAddress, ConnectionStatus<Exe>>>>,
178178
connection_retry_options: ConnectionRetryOptions,
179179
pub(crate) operation_retry_options: OperationRetryOptions,
180-
tls_options: TlsOptions,
180+
pub(crate) tls_options: TlsOptions,
181181
certificate_chain: Vec<Certificate>,
182182
outbound_channel_size: usize,
183183
}

0 commit comments

Comments
 (0)