Skip to content

Commit 7295307

Browse files
sionsmithclaude
andauthored
feat(kafka): SaslMechanismPlugin extension trait (0.14.0) (#94)
* feat(kafka): add SaslMechanismPlugin extension trait (0.14.0) Adds a pluggable SASL mechanism extension point, unblocking downstream OAUTHBEARER / MSK IAM / OIDC implementations without bringing their dependencies into OSS. - SaslMechanismPlugin trait + KIP-368 re-authentication scheduler (80% fraction, 30s floor, ±5s jitter). - Default interpret_server_error handles both RFC 7628 JSON and Kafka 3.5+ free-form error_message bytes. - SecurityConfig.sasl_mechanism_plugin field with #[serde(skip)] — programmatic wiring only, no YAML surface. - Unified SASL dispatch: the four duplicated sasl_{plain,scram}_auth methods collapse into a single function called by both initial-connect and reconnect paths. - SCRAM/PLAIN integration baseline tests (tests/sasl-test-infra/). - OAUTHBEARER E2E fixture with Confluent cp-kafka 7.7.0 unsecured-JWS (tests/sasl-oauth-test-infra/). - In-process hand-rolled Kafka-wire mock for plugin dispatch tests — zero new dev-deps (cleaner than the PRD's rsasl target). - Example: examples/custom_sasl_plugin.rs — minimal static-token OAUTHBEARER reference implementation. Zero new runtime dependencies. Behaviour unchanged for existing PLAIN / SCRAM-* / mTLS / plaintext configurations. See docs/PRD-sasl-mechanism-plugin.md (v2.1). Known limitation: reauth task is not aborted on reconnect — followup tracked separately. Co-Authored-By: Claude Opus 4.7 <[email protected]> * fix(test): inline OAUTH JAAS via listener-scoped config Setting `java.security.auth.login.config` via `KAFKA_OPTS` caused the cp-kafka preflight ZK client to hang looking for a `Client` JAAS section (and, once that was bypassed with `-Dzookeeper.sasl.client=false`, to time out waiting for readiness despite a successful ZK session). Switch to `KAFKA_LISTENER_NAME_SASL_OAUTHBEARER_SASL_JAAS_CONFIG` inline so no JVM-level JAAS config is set and the ZK preflight isn't confused. Drop the `./jaas.conf` volume mount since it is now unused. Locally verified: `docker compose up -d --wait` reaches healthy, and `sasl_oauth_bearer_static_token_e2e` passes against the fixture. * chore(clippy): sort_by -> sort_by_key (Rust 1.95 lint) Rust 1.95 promotes `clippy::unnecessary_sort_by` to a standard lint under `-D warnings`, which made PR #94 CI red. The comparator here is a plain descending sort on a Copy field, so `sort_by_key` + `Reverse` is the idiomatic equivalent. No behaviour change. * chore(clippy): collapse nested if into match guard (Rust 1.95 lint) Rust 1.95 tightened `clippy::collapsible_match` to flag nested `if` inside a `match` arm. Local toolchain (1.94.1) does not reproduce. No behavioural change. Co-Authored-By: Claude Opus 4.7 <[email protected]> * ci: skip SASL integration tests that require Docker compose fixtures The integration_suite workflow runs `--include-ignored` to cover ignored suites, but CI does not spin up the SASL OAUTH / GSSAPI compose fixtures those tests need. Extending the existing `--skip tls` pattern to also skip `sasl_` keeps the job green while the fixtures remain dev-only. --------- Co-authored-by: Claude Opus 4.7 <[email protected]>
1 parent 7959488 commit 7295307

30 files changed

Lines changed: 2865 additions & 266 deletions

.github/workflows/test.yml

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,9 +131,13 @@ jobs:
131131
TESTCONTAINERS: "true"
132132

133133
- name: Run integration test suite
134-
# Note: --include-ignored runs ignored tests, but we skip TLS tests as they require
135-
# local certificate setup (see tests/tls-test-infra/README for manual TLS testing)
136-
run: cargo test --test integration_suite_tests --all-features -- --nocapture --include-ignored --skip tls
134+
# --include-ignored runs ignored tests. We skip suites whose fixtures
135+
# aren't brought up in this workflow:
136+
# - tls: requires local certificate setup (tests/tls-test-infra/)
137+
# - sasl_: requires SASL Docker compose fixtures
138+
# (tests/sasl-oauth-test-infra/, tests/sasl-gssapi-test-infra/)
139+
# These are run manually or in dedicated workflows.
140+
run: cargo test --test integration_suite_tests --all-features -- --nocapture --include-ignored --skip tls --skip sasl_
137141
env:
138142
DOCKER_HOST: unix:///var/run/docker.sock
139143
RUST_LOG: debug

CHANGELOG.md

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,37 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8+
## [0.14.0] - 2026-04-21
9+
10+
### Added
11+
- **Pluggable SASL mechanism extension point** (`SaslMechanismPlugin` trait)
12+
— lets downstream crates implement OAUTHBEARER, MSK IAM, or custom
13+
SASL mechanisms without forking `kafka-backup-core`.
14+
- Handshake + single- or multi-round `SaslAuthenticate` dispatch.
15+
- KIP-368 re-authentication scheduler: spawns a task post-handshake
16+
when the broker advertises `session_lifetime_ms > 0`; fires
17+
`reauth_payload` at 80 % of the advertised lifetime with a 30 s
18+
minimum floor and ±5 s jitter.
19+
- Default `interpret_server_error` handles both RFC 7628 JSON and
20+
Apache Kafka 3.5+ free-form `error_message` bytes.
21+
- New field `SecurityConfig.sasl_mechanism_plugin: Option<Arc<dyn SaslMechanismPlugin>>`
22+
(marked `#[serde(skip)]` — programmatic wiring only, no YAML surface).
23+
- 14 unit tests + 4 integration tests exercising single-round,
24+
multi-round, server-error, and scheduler paths against an
25+
in-process Kafka-wire mock (no Docker required).
26+
- `#[ignore]` E2E test against Confluent cp-kafka 7.7.0 configured for
27+
SASL_PLAINTEXT + OAUTHBEARER with the bundled unsecured-JWS validator.
28+
Fixture: `tests/sasl-oauth-test-infra/`.
29+
- Example: `examples/custom_sasl_plugin.rs` — minimal static-token
30+
OAUTHBEARER plugin (reference implementation).
31+
32+
### Changed
33+
- SASL dispatch in `KafkaClient` unified: the four duplicated
34+
`sasl_{plain,scram}_auth{,_raw}` methods collapse into a single
35+
dispatch function called by both initial-connect and reconnect.
36+
Behaviour for existing `PLAIN` / `SCRAM-SHA-256` / `SCRAM-SHA-512`
37+
configurations is unchanged.
38+
839
## [0.13.5] - 2026-04-16
940

1041
### Fixed

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ members = [
66
]
77

88
[workspace.package]
9-
version = "0.13.5"
9+
version = "0.14.0"
1010
edition = "2021"
1111
license = "MIT"
1212
authors = ["OSO"]

crates/kafka-backup-cli/src/commands/offset_reset.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,5 +354,6 @@ fn parse_security_config(protocol: Option<&str>) -> SecurityConfig {
354354
ssl_ca_location,
355355
ssl_certificate_location: None,
356356
ssl_key_location: None,
357+
sasl_mechanism_plugin: None,
357358
}
358359
}

crates/kafka-backup-cli/src/commands/offset_reset_bulk.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,5 +324,6 @@ fn parse_security_config(protocol: Option<&str>) -> SecurityConfig {
324324
ssl_ca_location,
325325
ssl_certificate_location: None,
326326
ssl_key_location: None,
327+
sasl_mechanism_plugin: None,
327328
}
328329
}

crates/kafka-backup-cli/src/commands/offset_rollback.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -507,5 +507,6 @@ fn parse_security_config(protocol: Option<&str>) -> SecurityConfig {
507507
ssl_ca_location,
508508
ssl_certificate_location: None,
509509
ssl_key_location: None,
510+
sasl_mechanism_plugin: None,
510511
}
511512
}

crates/kafka-backup-cli/src/commands/status_watch.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -232,11 +232,8 @@ fn parse_prometheus_metrics(text: &str) -> ParsedMetrics {
232232
| "kafka_backup_segments_written_total" => {
233233
metrics.segments_written = value as u64;
234234
}
235-
"kafka_backup_storage_write_bytes_total_total" => {
236-
// Use storage write bytes as backup bytes if not already set
237-
if metrics.bytes_total == 0 {
238-
metrics.bytes_total = value as u64;
239-
}
235+
"kafka_backup_storage_write_bytes_total_total" if metrics.bytes_total == 0 => {
236+
metrics.bytes_total = value as u64;
240237
}
241238
"kafka_backup_throughput_records_per_second" => {
242239
metrics.throughput_records_per_sec = value;
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
//! Minimal `SaslMechanismPlugin` reference: a static-token `OAUTHBEARER` plugin.
2+
//!
3+
//! This example shows the smallest useful implementation of the
4+
//! [`SaslMechanismPlugin`] extension trait: hand a static JWT (or any opaque
5+
//! bearer token) to the `KafkaClient`, and the handshake completes against
6+
//! any broker configured to accept that token.
7+
//!
8+
//! Real-world plugins typically wrap a token-refresh cache — an OIDC client,
9+
//! an AWS MSK IAM presigner, a Keycloak adapter, etc. The canonical
10+
//! production implementations live in `kafka-backup-enterprise-core` and
11+
//! cover MSK IAM (`aws-sigv4`), Confluent Cloud, and OIDC.
12+
//!
13+
//! Run it with:
14+
//!
15+
//! ```bash
16+
//! cargo build --example custom_sasl_plugin -p kafka-backup-core
17+
//! ```
18+
//!
19+
//! The example does not actually connect to a broker — it just proves the
20+
//! wiring compiles against the public trait surface.
21+
22+
use std::sync::Arc;
23+
24+
use async_trait::async_trait;
25+
use kafka_backup_core::config::{KafkaConfig, SecurityConfig, SecurityProtocol, TopicSelection};
26+
use kafka_backup_core::kafka::{
27+
KafkaClient, SaslMechanismPlugin, SaslMechanismPluginHandle, SaslPluginError,
28+
};
29+
30+
/// A static-token OAUTHBEARER plugin.
31+
///
32+
/// Constructs an RFC 7628 client-initial-response payload from a fixed
33+
/// principal and bearer token. Suitable for local testing against brokers
34+
/// running the Apache Kafka `OAuthBearerUnsecuredValidatorCallbackHandler`
35+
/// (unsecured-JWS mode) or any broker that accepts static tokens.
36+
#[derive(Debug)]
37+
struct StaticTokenOauthBearer {
38+
principal: String,
39+
token: String,
40+
}
41+
42+
impl StaticTokenOauthBearer {
43+
fn new(principal: impl Into<String>, token: impl Into<String>) -> Self {
44+
Self {
45+
principal: principal.into(),
46+
token: token.into(),
47+
}
48+
}
49+
50+
fn into_handle(self) -> SaslMechanismPluginHandle {
51+
Arc::new(self)
52+
}
53+
}
54+
55+
#[async_trait]
56+
impl SaslMechanismPlugin for StaticTokenOauthBearer {
57+
fn mechanism_name(&self) -> &str {
58+
"OAUTHBEARER"
59+
}
60+
61+
async fn initial_payload(&self) -> Result<Vec<u8>, SaslPluginError> {
62+
Ok(build_rfc7628_cir(&self.principal, &self.token))
63+
}
64+
}
65+
66+
/// Build an RFC 7628 §3.1 client initial response:
67+
/// `n,a=<authzid>,<0x01>auth=Bearer <token><0x01><0x01>`.
68+
fn build_rfc7628_cir(principal: &str, token: &str) -> Vec<u8> {
69+
let mut buf = Vec::with_capacity(principal.len() + token.len() + 16);
70+
buf.extend_from_slice(b"n,a=");
71+
buf.extend_from_slice(principal.as_bytes());
72+
buf.push(b',');
73+
buf.push(0x01);
74+
buf.extend_from_slice(b"auth=Bearer ");
75+
buf.extend_from_slice(token.as_bytes());
76+
buf.push(0x01);
77+
buf.push(0x01);
78+
buf
79+
}
80+
81+
fn main() {
82+
// An unsecured JWT (`alg: none`) for local testing. Production tokens
83+
// come from an IdP — Okta, Keycloak, AWS STS, etc.
84+
let unsecured_jwt = concat!(
85+
"eyJhbGciOiJub25lIn0",
86+
".",
87+
"eyJzdWIiOiJ0ZXN0LXVzZXIiLCJleHAiOjk5OTk5OTk5OTksImlhdCI6MTAwMH0",
88+
".",
89+
);
90+
91+
let plugin = StaticTokenOauthBearer::new("test-user", unsecured_jwt).into_handle();
92+
93+
let config = KafkaConfig {
94+
bootstrap_servers: vec!["localhost:9097".to_string()],
95+
security: SecurityConfig {
96+
security_protocol: SecurityProtocol::SaslPlaintext,
97+
sasl_mechanism_plugin: Some(plugin),
98+
..Default::default()
99+
},
100+
topics: TopicSelection::default(),
101+
connection: Default::default(),
102+
};
103+
104+
let _client = KafkaClient::new(config);
105+
println!(
106+
"OAUTHBEARER plugin wired into KafkaClient. \
107+
Connect would go to localhost:9097 — skipped (no broker in this example)."
108+
);
109+
}

crates/kafka-backup-core/src/config.rs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,15 @@ pub struct SecurityConfig {
207207
/// Path to client key file (for mTLS)
208208
#[serde(default)]
209209
pub ssl_key_location: Option<PathBuf>,
210+
211+
/// Optional pluggable SASL mechanism implementation.
212+
///
213+
/// When set, overrides [`sasl_mechanism`] and drives the handshake
214+
/// through the plugin (e.g. `OAUTHBEARER` for MSK IAM). Not YAML-
215+
/// configurable — downstream crates inject this programmatically
216+
/// after parsing the config.
217+
#[serde(skip)]
218+
pub sasl_mechanism_plugin: Option<crate::kafka::SaslMechanismPluginHandle>,
210219
}
211220

212221
/// Security protocol
@@ -960,6 +969,42 @@ bootstrap_servers:
960969
assert!(config.connection.tcp_nodelay);
961970
}
962971

972+
#[test]
973+
fn test_security_config_serde_skips_plugin_field() {
974+
// `sasl_mechanism_plugin` is `#[serde(skip)]` — programmatic only.
975+
// Regressions would leak a non-serializable trait object into
976+
// YAML and break config round-tripping. This test pins it.
977+
use std::sync::Arc;
978+
979+
#[derive(Debug)]
980+
struct NoopPlugin;
981+
#[async_trait::async_trait]
982+
impl crate::kafka::SaslMechanismPlugin for NoopPlugin {
983+
fn mechanism_name(&self) -> &str {
984+
"NOOP"
985+
}
986+
async fn initial_payload(&self) -> Result<Vec<u8>, crate::kafka::SaslPluginError> {
987+
Ok(vec![])
988+
}
989+
}
990+
991+
let cfg = SecurityConfig {
992+
security_protocol: SecurityProtocol::SaslPlaintext,
993+
sasl_mechanism_plugin: Some(Arc::new(NoopPlugin)),
994+
..Default::default()
995+
};
996+
let yaml = serde_yaml::to_string(&cfg).expect("serialize security config");
997+
assert!(
998+
!yaml.contains("sasl_mechanism_plugin"),
999+
"serde(skip) must keep the plugin field out of YAML; got:\n{yaml}"
1000+
);
1001+
1002+
// Round-trip: deserialization into a config without the field
1003+
// yields `None` — we never try to rehydrate a trait object.
1004+
let back: SecurityConfig = serde_yaml::from_str(&yaml).expect("deserialize");
1005+
assert!(back.sasl_mechanism_plugin.is_none());
1006+
}
1007+
9631008
#[test]
9641009
fn test_kafka_config_connection_custom() {
9651010
// Test that custom connection settings are properly parsed

0 commit comments

Comments
 (0)