diff --git a/tonic-xds/src/client/endpoint.rs b/tonic-xds/src/client/endpoint.rs index 81767414d..ec23012bb 100644 --- a/tonic-xds/src/client/endpoint.rs +++ b/tonic-xds/src/client/endpoint.rs @@ -5,7 +5,7 @@ use std::task::{Context, Poll}; use tower::{Service, load::Load}; /// Represents the host part of an endpoint address -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] enum EndpointHost { Ipv4(std::net::Ipv4Addr), Ipv6(std::net::Ipv6Addr), @@ -25,7 +25,7 @@ impl From for EndpointHost { } /// Represents a validated endpoint address extracted from xDS -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub(crate) struct EndpointAddress { /// The IP address or hostname host: EndpointHost, diff --git a/tonic-xds/src/client/loadbalance/channel_state.rs b/tonic-xds/src/client/loadbalance/channel_state.rs index 7916c9bb8..7885705a1 100644 --- a/tonic-xds/src/client/loadbalance/channel_state.rs +++ b/tonic-xds/src/client/loadbalance/channel_state.rs @@ -26,16 +26,228 @@ use std::future::Future; use std::pin::Pin; use std::sync::Arc; +use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering}; use std::task::{Context, Poll}; -use std::time::Duration; +use std::time::{Duration, Instant}; use pin_project_lite::pin_project; +use tokio::sync::watch; use tower::Service; use tower::load::Load; use crate::client::endpoint::{Connector, EndpointAddress}; use crate::common::async_util::BoxFuture; +// --------------------------------------------------------------------------- +// EndpointCounters / OutlierChannelState +// --------------------------------------------------------------------------- + +/// Lock-free success/failure counter for one endpoint. Records RPC +/// outcomes from the data path; the outlier-detection actor reads and +/// resets between intervals. +#[derive(Debug, Default)] +pub(crate) struct EndpointCounters { + success: AtomicU64, + failure: AtomicU64, +} + +impl EndpointCounters { + pub(crate) fn record_success(&self) { + self.success.fetch_add(1, Ordering::Relaxed); + } + + pub(crate) fn record_failure(&self) { + self.failure.fetch_add(1, Ordering::Relaxed); + } + + /// Read and zero both counters. The two swaps are not atomic against + /// each other — RPCs landing between them may bias the snapshot by + /// a small number of events, well below the precision of the + /// failure-percentage threshold. + pub(crate) fn snapshot_and_reset(&self) -> (u64, u64) { + let s = self.success.swap(0, Ordering::Relaxed); + let f = self.failure.swap(0, Ordering::Relaxed); + (s, f) + } +} + +/// Per-channel outlier-detection state, shared (via `Arc`) between +/// the data path (per-RPC outcome recording + threshold-based ejection) +/// and the outlier-detection actor (interval-based housekeeping). +/// +/// Ejection is edge-triggered: callers flip the flag via [`eject`] / +/// [`uneject`]; observers poll `Receiver::changed()` (typically inside +/// a `FuturesUnordered`) to react in O(1) on each transition. +/// +/// All fields are atomics or wrapped in lock-free primitives so the +/// data path can mutate them without locking. +/// +/// [`eject`]: Self::eject +/// [`uneject`]: Self::uneject +#[derive(Debug)] +pub(crate) struct OutlierChannelState { + counters: EndpointCounters, + eject_tx: watch::Sender, + /// Whether this channel currently contributes to the registry's + /// `qualifying_count`. Set when `total` first reaches + /// `request_volume` in the current interval; cleared on counter + /// reset. + is_qualifying: AtomicBool, + /// Number of times this channel has been ejected. Bumped on each + /// ejection; decremented (saturating) on each healthy interval. + ejection_multiplier: AtomicU32, + /// `0` when not ejected. Otherwise nanos since [`Self::epoch`] of + /// the current ejection's start. + ejected_at_nanos: AtomicU64, + /// Reference instant used as the origin for `ejected_at_nanos`. + /// Established at construction and never changes. + epoch: Instant, +} + +impl Default for OutlierChannelState { + fn default() -> Self { + Self::new() + } +} + +impl OutlierChannelState { + pub(crate) fn new() -> Self { + let (eject_tx, _) = watch::channel(false); + Self { + counters: EndpointCounters::default(), + eject_tx, + is_qualifying: AtomicBool::new(false), + ejection_multiplier: AtomicU32::new(0), + ejected_at_nanos: AtomicU64::new(0), + epoch: Instant::now(), + } + } + + pub(crate) fn record_success(&self) { + self.counters.record_success(); + } + + pub(crate) fn record_failure(&self) { + self.counters.record_failure(); + } + + /// Read the current counter values without resetting. Returns + /// `(success, failure)`. The two reads are not atomic against + /// each other but the difference is bounded by concurrent in-flight + /// RPCs and is below the precision of the failure-percentage check. + pub(crate) fn counters(&self) -> (u64, u64) { + let s = self.counters.success.load(Ordering::Relaxed); + let f = self.counters.failure.load(Ordering::Relaxed); + (s, f) + } + + /// Read and zero the counters. Returns `(success, failure)`. + pub(crate) fn snapshot_and_reset(&self) -> (u64, u64) { + self.counters.snapshot_and_reset() + } + + /// Try to set `is_qualifying` to `true`. Returns `true` if this + /// call performed the false → true transition, so callers can + /// increment a registry-level counter exactly once per crossing. + pub(crate) fn mark_qualifying(&self) -> bool { + !self.is_qualifying.swap(true, Ordering::AcqRel) + } + + /// Clear `is_qualifying`. Returns the previous value. + pub(crate) fn clear_qualifying(&self) -> bool { + self.is_qualifying.swap(false, Ordering::AcqRel) + } + + /// Flip the ejection flag to `true`. Returns `true` if this call + /// performed the false → true transition (so callers can update + /// registry-level counters exactly once per ejection). + /// Records the ejection timestamp and bumps the multiplier. + pub(crate) fn try_eject(&self, now: Instant) -> bool { + let won = self.eject_tx.send_if_modified(|state| { + if *state { + false + } else { + *state = true; + true + } + }); + if !won { + return false; + } + let nanos = now + .saturating_duration_since(self.epoch) + .as_nanos() + .min(u64::MAX as u128) as u64; + // Use 1 as a sentinel if the channel was created at exactly + // `now`, since 0 means "not ejected". + self.ejected_at_nanos.store(nanos.max(1), Ordering::Relaxed); + self.ejection_multiplier.fetch_add(1, Ordering::Relaxed); + true + } + + /// Flip the ejection flag back to `false`. Returns `true` if this + /// call performed the true → false transition. + pub(crate) fn try_uneject(&self) -> bool { + let won = self.eject_tx.send_if_modified(|state| { + if *state { + *state = false; + true + } else { + false + } + }); + if won { + self.ejected_at_nanos.store(0, Ordering::Relaxed); + } + won + } + + /// Current ejection state. + pub(crate) fn is_ejected(&self) -> bool { + *self.eject_tx.borrow() + } + + /// Returns the elapsed time since this channel was ejected, or + /// `None` if it is not currently ejected. + pub(crate) fn ejected_duration(&self, now: Instant) -> Option { + let nanos = self.ejected_at_nanos.load(Ordering::Relaxed); + if nanos == 0 { + return None; + } + let ejected_at = self.epoch + Duration::from_nanos(nanos); + Some(now.saturating_duration_since(ejected_at)) + } + + /// Current ejection multiplier. + pub(crate) fn ejection_multiplier(&self) -> u32 { + self.ejection_multiplier.load(Ordering::Relaxed) + } + + /// Decrement the multiplier saturating at zero. Called by the + /// actor on healthy intervals. + pub(crate) fn decrement_multiplier(&self) { + let prev = self.ejection_multiplier.load(Ordering::Relaxed); + if prev > 0 { + self.ejection_multiplier.store(prev - 1, Ordering::Relaxed); + } + } + + /// Subscribe to ejection-state changes. The returned receiver's + /// `changed()` future resolves on each transition; consumers + /// typically push it into a `FuturesUnordered`. + #[allow(dead_code)] // wired by the LoadBalancer in a follow-up PR. + pub(crate) fn subscribe(&self) -> watch::Receiver { + self.eject_tx.subscribe() + } + + /// Test-only setter for the ejection multiplier; lets tests drive + /// housekeeping behavior without going through `try_eject`. + #[cfg(test)] + pub(crate) fn set_ejection_multiplier(&self, value: u32) { + self.ejection_multiplier.store(value, Ordering::Relaxed); + } +} + /// Configuration for an ejected channel. #[derive(Debug, Clone)] pub(crate) struct EjectionConfig { @@ -92,12 +304,27 @@ pub(crate) struct ConnectingChannel { } impl ConnectingChannel { + /// Start a connection, generating a fresh per-channel outlier + /// state. Used for first-time connects from `IdleChannel`. pub(crate) fn new(fut: BoxFuture, addr: EndpointAddress) -> Self { + Self::with_outlier(fut, addr, Arc::new(OutlierChannelState::new())) + } + + /// Start a connection that inherits an existing + /// [`OutlierChannelState`]. Used by reconnect paths so the + /// per-channel counters and ejection signal survive across the + /// connection cycle. + pub(crate) fn with_outlier( + fut: BoxFuture, + addr: EndpointAddress, + outlier: Arc, + ) -> Self { Self { inner: Box::pin(async move { ReadyChannel { addr, inner: fut.await, + outlier, } }), } @@ -119,14 +346,23 @@ impl Future for ConnectingChannel { /// A channel that is connected and ready to serve requests. /// /// Holds the raw service `S` and delegates [`Service`] calls directly, -/// preserving `S::Future` and `S::Error` with no wrapping or type erasure. +/// preserving `S::Future` and `S::Error` with no wrapping or type +/// erasure. The `Arc` is shared with the outlier- +/// detection actor for stats accumulation and edge-triggered ejection. #[derive(Clone)] pub(crate) struct ReadyChannel { addr: EndpointAddress, inner: S, + outlier: Arc, } impl ReadyChannel { + /// Per-channel outlier-detection state. Cloned cheaply via `Arc`. + #[allow(dead_code)] // consumed by the LoadBalancer in a follow-up PR. + pub(crate) fn outlier(&self) -> &Arc { + &self.outlier + } + /// Eject this channel (e.g., due to outlier detection). Consumes self. pub(crate) fn eject(self, config: EjectionConfig, connector: Arc) -> EjectedChannel where @@ -136,13 +372,15 @@ impl ReadyChannel { EjectedChannel { addr: self.addr, inner: self.inner, + outlier: self.outlier, config, connector, ejection_timer, } } - /// Start reconnecting. Consumes self, dropping the old connection. + /// Start reconnecting. Consumes self, dropping the old connection + /// but preserving the outlier-detection state. pub(crate) fn reconnect>( self, connector: Arc, @@ -150,7 +388,7 @@ impl ReadyChannel { where S: Send + 'static, { - ConnectingChannel::new(connector.connect(&self.addr), self.addr) + ConnectingChannel::with_outlier(connector.connect(&self.addr), self.addr, self.outlier) } } @@ -193,6 +431,7 @@ pin_project! { pub(crate) struct EjectedChannel { addr: EndpointAddress, inner: S, + outlier: Arc, config: EjectionConfig, connector: Arc + Send + Sync>, #[pin] @@ -209,14 +448,18 @@ impl Future for EjectedChannel { Poll::Ready(()) => { if this.config.needs_reconnect { let fut = this.connector.connect(this.addr); - Poll::Ready(UnejectedChannel::Connecting(ConnectingChannel::new( - fut, - this.addr.clone(), - ))) + Poll::Ready(UnejectedChannel::Connecting( + ConnectingChannel::with_outlier( + fut, + this.addr.clone(), + this.outlier.clone(), + ), + )) } else { Poll::Ready(UnejectedChannel::Ready(ReadyChannel { addr: this.addr.clone(), inner: this.inner.clone(), + outlier: this.outlier.clone(), })) } } diff --git a/tonic-xds/src/client/loadbalance/mod.rs b/tonic-xds/src/client/loadbalance/mod.rs index 66ccb1772..1c4ffa395 100644 --- a/tonic-xds/src/client/loadbalance/mod.rs +++ b/tonic-xds/src/client/loadbalance/mod.rs @@ -3,4 +3,5 @@ pub(crate) mod channel_state; pub(crate) mod errors; pub(crate) mod keyed_futures; pub(crate) mod loadbalancer; +pub(crate) mod outlier_detection; pub(crate) mod pickers; diff --git a/tonic-xds/src/client/loadbalance/outlier_detection.rs b/tonic-xds/src/client/loadbalance/outlier_detection.rs new file mode 100644 index 000000000..5295a09c7 --- /dev/null +++ b/tonic-xds/src/client/loadbalance/outlier_detection.rs @@ -0,0 +1,580 @@ +//! gRFC A50 outlier detection. +//! +//! The algorithm is split between the data path and a spawned actor: +//! +//! - **Per-RPC detection** runs inline on each call completion via +//! [`OutlierStatsRegistry::record_outcome`]. The wrapper records the +//! outcome on the channel's [`OutlierChannelState`], evaluates the +//! failure-percentage threshold against the channel's local +//! counters, and ejects the channel directly by flipping its +//! `watch::Sender`. Cluster-wide gates (`minimum_hosts`, +//! `max_ejection_percent`) are enforced via two atomic counters on +//! the registry, kept in sync as channels cross thresholds. +//! - **Interval-based housekeeping** runs in a spawned actor (see +//! [`spawn_actor`]). It resets per-channel counters at the +//! `config.interval` boundary, un-ejects channels whose +//! `base × multiplier` backoff has elapsed, and decrements +//! multipliers for non-ejected channels. The actor never makes +//! ejection decisions. +//! +//! `LoadBalancer::poll_ready` observes ejections in O(1) per +//! transition by polling a `FuturesUnordered` +//! over each channel's signal. +//! +//! Only the failure-percentage algorithm is dispatched. The +//! success-rate algorithm (cross-endpoint mean/stdev) is left to a +//! follow-up. +//! +//! [gRFC A50]: https://github.com/grpc/proposal/blob/master/A50-xds-outlier-detection.md + +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::Instant; + +use dashmap::DashMap; + +use crate::client::endpoint::EndpointAddress; +use crate::client::loadbalance::channel_state::OutlierChannelState; +use crate::common::async_util::AbortOnDrop; +use crate::xds::resource::outlier_detection::OutlierDetectionConfig; + +/// Probability source for `enforcing_*` rolls. +pub(crate) trait Rng: Send + Sync + 'static { + /// Return a uniform random `u32` in `0..100`. + fn pct_roll(&self) -> u32; +} + +/// Default RNG backed by `fastrand`. +struct FastRandRng; + +impl Rng for FastRandRng { + fn pct_roll(&self) -> u32 { + fastrand::u32(0..100) + } +} + +/// Shared outlier-detection state, owned by `Arc` and accessed +/// concurrently by: +/// - The load balancer's call wrapper, which calls +/// [`Self::record_outcome`] after each RPC completion. +/// - The spawned actor task, which calls [`Self::run_housekeeping`] +/// on every `config.interval` tick. +/// - The load balancer's `poll_ready`, which subscribes to per-channel +/// ejection signals via [`OutlierChannelState::subscribe`]. +pub(crate) struct OutlierStatsRegistry { + /// Per-endpoint state, keyed by address. Inserted by the LB on + /// channel creation and removed on disconnect. + channels: DashMap>, + /// Number of channels currently with `total >= request_volume` in + /// the active interval. Drives the `minimum_hosts` gate. + qualifying_count: AtomicU64, + /// Number of channels currently ejected. Drives the + /// `max_ejection_percent` cap. + ejected_count: AtomicU64, + config: OutlierDetectionConfig, + rng: Box, +} + +impl OutlierStatsRegistry { + /// Build a registry with the default RNG. + pub(crate) fn new(config: OutlierDetectionConfig) -> Arc { + Self::with_rng(config, Box::new(FastRandRng)) + } + + /// Build a registry with a custom [`Rng`]. + pub(crate) fn with_rng(config: OutlierDetectionConfig, rng: Box) -> Arc { + Arc::new(Self { + channels: DashMap::new(), + qualifying_count: AtomicU64::new(0), + ejected_count: AtomicU64::new(0), + config, + rng, + }) + } + + /// Register a new channel. Returns the `Arc` + /// the load balancer wires into the channel; the same `Arc` is + /// retained in the registry so the actor can iterate it. + pub(crate) fn add_channel(&self, addr: EndpointAddress) -> Arc { + let state = Arc::new(OutlierChannelState::new()); + self.channels.insert(addr, state.clone()); + state + } + + /// Forget a channel. Drops the registry's reference; cluster-wide + /// counters are decremented if the channel was qualifying or + /// ejected. + pub(crate) fn remove_channel(&self, addr: &EndpointAddress) { + if let Some((_, state)) = self.channels.remove(addr) { + if state.clear_qualifying() { + self.qualifying_count.fetch_sub(1, Ordering::Relaxed); + } + if state.is_ejected() { + self.ejected_count.fetch_sub(1, Ordering::Relaxed); + } + } + } + + /// Number of registered channels. + pub(crate) fn len(&self) -> usize { + self.channels.len() + } + + /// Per-RPC entry point. Called by the load balancer's call wrapper + /// after each RPC completion. Increments the channel's success or + /// failure counter and then evaluates the failure-percentage + /// threshold; if all gates pass, ejects the channel inline. + pub(crate) fn record_outcome(&self, state: &OutlierChannelState, success: bool) { + if success { + state.record_success(); + } else { + state.record_failure(); + } + + let Some(fp) = self.config.failure_percentage.as_ref() else { + return; + }; + + let (s, f) = state.counters(); + let total = s + f; + let request_volume = u64::from(fp.request_volume); + + // Track when each channel first qualifies in the current + // interval, so the `minimum_hosts` gate can be checked with a + // single atomic load. + if total >= request_volume && state.mark_qualifying() { + self.qualifying_count.fetch_add(1, Ordering::Relaxed); + } + + if state.is_ejected() { + return; + } + if total < request_volume { + return; + } + if self.qualifying_count.load(Ordering::Relaxed) < u64::from(fp.minimum_hosts) { + return; + } + if self.ejected_count.load(Ordering::Relaxed) >= self.max_ejections() { + return; + } + + // failure_pct = 100 * failure / total. A50 uses strict ">". + let failure_pct = 100 * f / total; + if failure_pct <= u64::from(fp.threshold.get()) { + return; + } + if !roll(&*self.rng, fp.enforcing_failure_percentage.get()) { + return; + } + + if state.try_eject(Instant::now()) { + self.ejected_count.fetch_add(1, Ordering::Relaxed); + } + } + + /// Interval-boundary housekeeping. Called by the spawned actor on + /// each `config.interval` tick. Resets counters, un-ejects + /// channels whose backoff has elapsed, and decrements multipliers + /// for non-ejected channels. + pub(crate) fn run_housekeeping(&self, now: Instant) { + // Cap the un-ejection backoff at `max(base, max_ejection_time)`. + let cap = self + .config + .base_ejection_time + .max(self.config.max_ejection_time); + + for entry in self.channels.iter() { + let state = entry.value(); + + // Reset counters; clear `is_qualifying` and adjust the + // registry-level counter in lockstep. + state.snapshot_and_reset(); + if state.clear_qualifying() { + self.qualifying_count.fetch_sub(1, Ordering::Relaxed); + } + + if state.is_ejected() { + let multiplier = state.ejection_multiplier(); + let elapsed = state.ejected_duration(now).unwrap_or_default(); + if let Some(scaled) = self.config.base_ejection_time.checked_mul(multiplier) + && elapsed >= scaled.min(cap) + && state.try_uneject() + { + self.ejected_count.fetch_sub(1, Ordering::Relaxed); + } + } else { + state.decrement_multiplier(); + } + } + } + + /// `max_ejection_percent` resolved against the current channel + /// count. Updated as channels come and go. + fn max_ejections(&self) -> u64 { + self.channels.len() as u64 * u64::from(self.config.max_ejection_percent.get()) / 100 + } +} + +/// Spawn the housekeeping actor. The task ticks every +/// `config.interval` and calls +/// [`OutlierStatsRegistry::run_housekeeping`]. Dropping the returned +/// [`AbortOnDrop`] stops the task. +pub(crate) fn spawn_actor(registry: Arc) -> AbortOnDrop { + let interval = registry.config.interval; + let task = tokio::spawn(async move { + let mut ticker = tokio::time::interval(interval); + ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + loop { + ticker.tick().await; + registry.run_housekeeping(Instant::now()); + } + }); + AbortOnDrop(task) +} + +/// Return true with probability `pct / 100` (clamped at 100 ⇒ always). +fn roll(rng: &dyn Rng, pct: u8) -> bool { + if pct >= 100 { + return true; + } + if pct == 0 { + return false; + } + rng.pct_roll() < u32::from(pct) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::xds::resource::outlier_detection::{ + FailurePercentageConfig, OutlierDetectionConfig, Percentage, + }; + use std::sync::atomic::{AtomicU32, Ordering}; + use std::time::Duration; + + fn addr(port: u16) -> EndpointAddress { + EndpointAddress::new("10.0.0.1", port) + } + + fn pct(v: u32) -> Percentage { + Percentage::new(v).unwrap() + } + + fn base_config() -> OutlierDetectionConfig { + OutlierDetectionConfig { + interval: Duration::from_secs(1), + base_ejection_time: Duration::from_secs(30), + max_ejection_time: Duration::from_secs(300), + max_ejection_percent: pct(100), + success_rate: None, + failure_percentage: None, + } + } + + fn fp_config( + threshold: u32, + request_volume: u32, + minimum_hosts: u32, + ) -> OutlierDetectionConfig { + let mut c = base_config(); + c.failure_percentage = Some(FailurePercentageConfig { + threshold: pct(threshold), + enforcing_failure_percentage: pct(100), + minimum_hosts, + request_volume, + }); + c + } + + /// Deterministic RNG: `pct_roll()` returns a fixed value. + struct FixedRng(AtomicU32); + + impl FixedRng { + fn boxed(value: u32) -> Box { + Box::new(Self(AtomicU32::new(value))) + } + } + + impl Rng for FixedRng { + fn pct_roll(&self) -> u32 { + self.0.load(Ordering::Relaxed) + } + } + + /// Drive `n` outcomes through `record_outcome` for one channel. + fn drive( + registry: &OutlierStatsRegistry, + state: &OutlierChannelState, + successes: u64, + failures: u64, + ) { + for _ in 0..successes { + registry.record_outcome(state, true); + } + for _ in 0..failures { + registry.record_outcome(state, false); + } + } + + // ----- record_outcome: failure-percentage detection ----- + + #[test] + fn ejects_above_threshold_inline() { + let registry = OutlierStatsRegistry::with_rng(fp_config(50, 10, 3), FixedRng::boxed(99)); + let bad = registry.add_channel(addr(8084)); + for port in 8080..=8083 { + let s = registry.add_channel(addr(port)); + drive(®istry, &s, 100, 0); + } + drive(®istry, &bad, 10, 90); + assert!(bad.is_ejected()); + assert_eq!(registry.ejected_count.load(Ordering::Relaxed), 1); + } + + #[test] + fn skips_below_threshold() { + let registry = OutlierStatsRegistry::with_rng(fp_config(50, 10, 3), FixedRng::boxed(99)); + let mut all = vec![]; + for port in 8080..=8084 { + let s = registry.add_channel(addr(port)); + // 30% failure → below 50% threshold. + drive(®istry, &s, 70, 30); + all.push(s); + } + for s in &all { + assert!(!s.is_ejected()); + } + } + + #[test] + fn at_threshold_does_not_eject() { + // A50 specifies a strict "greater than" comparison. + let registry = OutlierStatsRegistry::with_rng(fp_config(50, 10, 3), FixedRng::boxed(0)); + let mut all = vec![]; + for port in 8080..=8084 { + let s = registry.add_channel(addr(port)); + drive(®istry, &s, 50, 50); + all.push(s); + } + for s in &all { + assert!(!s.is_ejected()); + } + } + + #[test] + fn minimum_hosts_gates_ejection() { + let registry = OutlierStatsRegistry::with_rng(fp_config(50, 10, 5), FixedRng::boxed(99)); + // Only 2 hosts have request_volume ≥ 10; minimum_hosts is 5 ⇒ skip. + let mut all = vec![]; + for port in 8080..=8081 { + let s = registry.add_channel(addr(port)); + drive(®istry, &s, 0, 100); + all.push(s); + } + for s in &all { + assert!(!s.is_ejected()); + } + } + + #[test] + fn request_volume_filters_low_traffic() { + let registry = OutlierStatsRegistry::with_rng(fp_config(50, 100, 3), FixedRng::boxed(99)); + let bad = registry.add_channel(addr(8080)); + drive(®istry, &bad, 0, 5); + for port in 8081..=8084 { + let s = registry.add_channel(addr(port)); + drive(®istry, &s, 200, 0); + } + assert!(!bad.is_ejected()); + } + + #[test] + fn enforcement_zero_percent_never_ejects() { + let mut config = fp_config(50, 10, 3); + config + .failure_percentage + .as_mut() + .unwrap() + .enforcing_failure_percentage = pct(0); + let registry = OutlierStatsRegistry::with_rng(config, FixedRng::boxed(0)); + let mut all = vec![]; + for port in 8080..=8084 { + let s = registry.add_channel(addr(port)); + drive(®istry, &s, 0, 100); + all.push(s); + } + for s in &all { + assert!(!s.is_ejected()); + } + } + + #[test] + fn max_ejection_percent_caps_concurrent_ejections() { + let mut config = fp_config(50, 10, 3); + config.max_ejection_percent = pct(20); + let registry = OutlierStatsRegistry::with_rng(config, FixedRng::boxed(99)); + + let mut all = vec![]; + for port in 8080..=8084 { + let s = registry.add_channel(addr(port)); + all.push(s); + } + // Drive all hosts to bad state in parallel pseudo-order. + for s in &all { + drive(®istry, s, 0, 100); + } + + let ejected = all.iter().filter(|s| s.is_ejected()).count(); + // 5 hosts × 20% = 1 max ejection. + assert_eq!(ejected, 1); + } + + #[test] + fn remove_channel_decrements_counters() { + let registry = OutlierStatsRegistry::with_rng(fp_config(50, 10, 3), FixedRng::boxed(99)); + let mut all = vec![]; + for port in 8080..=8083 { + let s = registry.add_channel(addr(port)); + drive(®istry, &s, 100, 0); + all.push(s); + } + let bad = registry.add_channel(addr(8084)); + drive(®istry, &bad, 0, 100); + assert!(bad.is_ejected()); + assert_eq!(registry.ejected_count.load(Ordering::Relaxed), 1); + // Each healthy host crossed request_volume; bad too. So + // qualifying_count = 5. + assert_eq!(registry.qualifying_count.load(Ordering::Relaxed), 5); + + registry.remove_channel(&addr(8084)); + assert_eq!(registry.ejected_count.load(Ordering::Relaxed), 0); + assert_eq!(registry.qualifying_count.load(Ordering::Relaxed), 4); + } + + // ----- Housekeeping ----- + + #[test] + fn housekeeping_resets_counters_and_qualifying() { + let registry = OutlierStatsRegistry::with_rng(fp_config(50, 10, 3), FixedRng::boxed(99)); + for port in 8080..=8083 { + let s = registry.add_channel(addr(port)); + drive(®istry, &s, 100, 0); + } + assert_eq!(registry.qualifying_count.load(Ordering::Relaxed), 4); + + registry.run_housekeeping(Instant::now()); + assert_eq!(registry.qualifying_count.load(Ordering::Relaxed), 0); + for port in 8080..=8083 { + let s = registry.channels.get(&addr(port)).unwrap(); + assert_eq!(s.counters(), (0, 0)); + } + } + + #[test] + fn housekeeping_unejects_after_base_time() { + let mut config = fp_config(50, 10, 3); + config.base_ejection_time = Duration::from_secs(10); + config.max_ejection_time = Duration::from_secs(60); + let registry = OutlierStatsRegistry::with_rng(config, FixedRng::boxed(99)); + + let bad = registry.add_channel(addr(8084)); + for port in 8080..=8083 { + let s = registry.add_channel(addr(port)); + drive(®istry, &s, 100, 0); + } + drive(®istry, &bad, 0, 100); + assert!(bad.is_ejected()); + + // Advance fewer than base_ejection_time ⇒ stays ejected. + let t0 = Instant::now(); + registry.run_housekeeping(t0 + Duration::from_secs(9)); + assert!(bad.is_ejected()); + + // After base_ejection_time × 1 elapsed ⇒ uneject. + registry.run_housekeeping(t0 + Duration::from_secs(20)); + assert!(!bad.is_ejected()); + assert_eq!(registry.ejected_count.load(Ordering::Relaxed), 0); + } + + #[test] + fn housekeeping_decrements_multiplier_on_healthy_interval() { + let registry = OutlierStatsRegistry::with_rng(base_config(), FixedRng::boxed(99)); + let s = registry.add_channel(addr(8080)); + // Force multiplier to 3 directly (no traffic, no eject). + s.set_ejection_multiplier(3); + + registry.run_housekeeping(Instant::now()); + assert_eq!(s.ejection_multiplier(), 2); + } + + #[test] + fn housekeeping_caps_ejection_at_max_ejection_time() { + let mut config = fp_config(50, 10, 3); + config.base_ejection_time = Duration::from_secs(10); + config.max_ejection_time = Duration::from_secs(15); + let registry = OutlierStatsRegistry::with_rng(config, FixedRng::boxed(99)); + + let s = registry.add_channel(addr(8080)); + // Pretend 8080 was ejected long ago with a huge multiplier. + s.try_eject(Instant::now()); + s.set_ejection_multiplier(10); + registry.ejected_count.fetch_add(0, Ordering::Relaxed); // try_eject already added 1 + + // base * multiplier = 100s, but cap = 15s. Sweep at 16s ⇒ uneject. + let t0 = Instant::now(); + registry.run_housekeeping(t0 + Duration::from_secs(16)); + assert!(!s.is_ejected()); + } + + // ----- Spawned actor ----- + // + // The actor's algorithmic behavior is fully exercised by the + // synchronous `housekeeping_*` tests above; here we only verify + // that dropping the `AbortOnDrop` handle reliably stops the task. + + #[tokio::test(start_paused = true)] + async fn dropping_abort_stops_actor() { + let mut config = base_config(); + config.interval = Duration::from_millis(50); + let registry = OutlierStatsRegistry::with_rng(config, FixedRng::boxed(99)); + let s = registry.add_channel(addr(8080)); + s.set_ejection_multiplier(5); + + let abort = spawn_actor(registry.clone()); + drop(abort); + + // Even with several tick periods elapsed, no housekeeping + // should have run because the task was aborted. + tokio::time::advance(Duration::from_millis(500)).await; + tokio::task::yield_now().await; + + assert_eq!(s.ejection_multiplier(), 5); + } + + // ----- OutlierChannelState sanity (kept in this file as it is the + // primary consumer of the type) ----- + + #[test] + fn channel_state_records_and_resets() { + let s = OutlierChannelState::new(); + s.record_success(); + s.record_success(); + s.record_failure(); + assert_eq!(s.snapshot_and_reset(), (2, 1)); + assert_eq!(s.snapshot_and_reset(), (0, 0)); + } + + #[test] + fn channel_state_try_eject_uneject_flips_signal() { + let s = OutlierChannelState::new(); + assert!(!s.is_ejected()); + assert!(s.try_eject(Instant::now())); + assert!(s.is_ejected()); + // Second call is a no-op. + assert!(!s.try_eject(Instant::now())); + assert!(s.try_uneject()); + assert!(!s.is_ejected()); + assert!(!s.try_uneject()); + } +} diff --git a/tonic-xds/src/xds/resource/outlier_detection.rs b/tonic-xds/src/xds/resource/outlier_detection.rs index a31fd6c60..159ff7735 100644 --- a/tonic-xds/src/xds/resource/outlier_detection.rs +++ b/tonic-xds/src/xds/resource/outlier_detection.rs @@ -4,12 +4,17 @@ //! algorithm. The two sub-configs gate which ejection algorithms run. //! //! Note: A50 specifies outlier detection as a load-balancing policy -//! wrapping a `child_policy`. `tonic-xds` currently runs P2C as its only -//! load balancer and integrates outlier detection as a filter on the -//! `Discover` stream feeding it, so there is no `child_policy` field -//! here yet. It will be added when more balancers are supported. +//! wrapping a `child_policy`. `tonic-xds` currently runs P2C as its +//! only load balancer, so there is no `child_policy` field here yet — +//! it will be added when more balancers are supported. Integration +//! with the data path is via an mpsc channel of ejection decisions +//! polled by the [`LoadBalancer`] tower service, which marks the +//! corresponding [`ReadyChannel`] as ejected via [`EjectedChannel`]. //! //! [gRFC A50]: https://github.com/grpc/proposal/blob/master/A50-xds-outlier-detection.md +//! [`LoadBalancer`]: crate::client::loadbalance::loadbalancer::LoadBalancer +//! [`ReadyChannel`]: crate::client::loadbalance::channel_state::ReadyChannel +//! [`EjectedChannel`]: crate::client::loadbalance::channel_state::EjectedChannel use std::time::Duration; @@ -68,7 +73,9 @@ pub(crate) struct SuccessRateConfig { /// An endpoint is a candidate for ejection when its success rate falls /// below `mean - stdev * (stdev_factor / 1000.0)`. pub stdev_factor: u32, - /// Probability that a candidate is actually ejected. + /// Probability that a flagged candidate is actually ejected — *not* + /// the success-rate threshold (which is derived from `stdev_factor`). + /// Set to 0 to disable enforcement while still computing statistics. pub enforcing_success_rate: Percentage, /// Minimum number of candidate endpoints required to run the algorithm. pub minimum_hosts: u32, @@ -83,7 +90,9 @@ pub(crate) struct FailurePercentageConfig { /// Failure rate at or above which an endpoint is a candidate for /// ejection. pub threshold: Percentage, - /// Probability that a candidate is actually ejected. + /// Probability that a flagged candidate is actually ejected — *not* + /// the failure-rate threshold (that is `threshold` above). Set to 0 + /// to disable enforcement while still computing statistics. pub enforcing_failure_percentage: Percentage, /// Minimum number of candidate endpoints required to run the algorithm. pub minimum_hosts: u32,