diff --git a/crates/starfish/core/src/block_manager/block_suspender.rs b/crates/starfish/core/src/block_manager/block_suspender.rs index f219a5634ad..e5cdab7b167 100644 --- a/crates/starfish/core/src/block_manager/block_suspender.rs +++ b/crates/starfish/core/src/block_manager/block_suspender.rs @@ -10,7 +10,17 @@ use itertools::Itertools; use starfish_config::AuthorityIndex; use tracing::debug; -use crate::{BlockHeaderAPI, BlockRef, VerifiedBlockHeader, context::Context}; +use crate::{ + BlockHeaderAPI, BlockRef, Round, VerifiedBlockHeader, block_header::BlockHeaderDigest, + context::Context, +}; + +/// Outcome of [`BlockSuspender::evict_below_round`]. +pub(crate) struct EvictBelowRoundOutcome { + pub(crate) unsuspended_headers: Vec, + pub(crate) ancestors_evicted: usize, + pub(crate) fetch_entries_evicted: usize, +} struct SuspendedBlockHeader { block_header: VerifiedBlockHeader, @@ -206,7 +216,7 @@ impl BlockSuspender { (fully_resolved_headers, ancestors_to_fetch) } /// Recursively unsuspends all blocks that were dependent on a now-accepted - /// block. + /// or now-gc-evicted block. /// /// Starting from `resolved_block`, this function walks the dependency graph /// and attempts to unsuspend any suspended blocks that were blocked on @@ -387,12 +397,72 @@ impl BlockSuspender { pub(crate) fn is_block_ref_suspended(&self, block_ref: &BlockRef) -> bool { self.suspended_headers.contains_key(block_ref) } + + /// Drops missing-ancestor and fetch entries whose round is at or below + /// `round_floor`, and cascades any resulting unsuspensions. + /// + /// An ancestor at or below `round_floor` cannot influence a + /// not-yet-sequenced block (linearizer GC rule), so suspending blocks + /// that wait on it and fetching it from peers serves no purpose. + /// Treating those keys as "resolved" is equivalent to admitting we will + /// never receive them and reusing the existing cascade machinery to + /// release everything that was only waiting on them. + /// + /// Returns headers that became fully resolved (and any further headers that + /// cascaded from them) so the caller can write them to `DagState`. + pub(crate) fn evict_below_round(&mut self, round_floor: Round) -> EvictBelowRoundOutcome { + // Smallest BlockRef with round > round_floor — split keeps everything + // from this key onward (round > floor) in the original map. + let pivot = round_floor + .checked_add(1) + .map(|round| BlockRef::new(round, AuthorityIndex::MIN, BlockHeaderDigest::MIN)); + + let fetch_entries_evicted = if let Some(pivot) = pivot { + let kept_to_fetch = self.headers_to_fetch.split_off(&pivot); + std::mem::replace(&mut self.headers_to_fetch, kept_to_fetch).len() + } else { + std::mem::take(&mut self.headers_to_fetch).len() + }; + + // Collect evicted keys via an O(log n + k) range scan; leave them in + // `self.missing_ancestors` so `recursively_unsuspend_dependents` can + // remove and cascade through them as usual. + let evicted_ancestor_refs: Vec = if let Some(pivot) = pivot { + self.missing_ancestors + .range(..pivot) + .map(|(k, _)| *k) + .collect() + } else { + self.missing_ancestors.keys().copied().collect() + }; + let ancestors_evicted = evicted_ancestor_refs.len(); + + let mut unsuspended = vec![]; + for evicted_ref in evicted_ancestor_refs { + unsuspended.extend(self.recursively_unsuspend_dependents(evicted_ref)); + } + + self.refresh_state_gauges(); + + EvictBelowRoundOutcome { + unsuspended_headers: unsuspended, + ancestors_evicted, + fetch_entries_evicted, + } + } pub(crate) fn headers_to_fetch(&self) -> BTreeMap> { self.headers_to_fetch.clone() } fn update_stats(&mut self, blocks_to_fetch: u64) { + self.context + .metrics + .node_metrics + .missing_block_headers_total + .inc_by(blocks_to_fetch); + self.refresh_state_gauges(); + } + fn refresh_state_gauges(&self) { let metrics = &self.context.metrics.node_metrics; - metrics.missing_block_headers_total.inc_by(blocks_to_fetch); metrics .block_manager_suspended_block_headers .set(self.suspended_headers.len() as i64); @@ -438,6 +508,118 @@ impl BlockSuspender { #[cfg(test)] pub(crate) mod tests { use super::*; + use crate::{ + block_header::{BlockHeaderDigest, TestBlockHeader}, + context::Context, + }; + + fn block_ref(round: Round, author: u8) -> BlockRef { + BlockRef::new(round, author.into(), BlockHeaderDigest::default()) + } + + fn header(round: Round, author: u8, ancestors: Vec) -> VerifiedBlockHeader { + let bh = TestBlockHeader::new(round, author) + .set_ancestors(ancestors) + .build(); + VerifiedBlockHeader::new_for_test(bh) + } + + fn new_suspender() -> BlockSuspender { + let (context, _) = Context::new_for_test(4); + BlockSuspender::new(Arc::new(context)) + } + + #[tokio::test] + async fn evict_below_round_unsuspends_single_dep() { + let mut suspender = new_suspender(); + let a = block_ref(5, 0); // missing ancestor at floor + let b = header(10, 1, vec![a]); + let mut input = BTreeMap::new(); + input.insert(b.clone(), BTreeSet::from([a])); + + let (accepted, _) = suspender.accept_or_suspend_received_headers(input); + assert!(accepted.is_empty()); + assert_eq!(suspender.suspended_blocks_refs().len(), 1); + assert!(suspender.headers_to_fetch.contains_key(&a)); + + let outcome = suspender.evict_below_round(5); + + assert_eq!(outcome.unsuspended_headers, vec![b]); + assert_eq!(outcome.ancestors_evicted, 1); + assert_eq!(outcome.fetch_entries_evicted, 1); + assert!(suspender.is_empty()); + } + + #[tokio::test] + async fn evict_below_round_keeps_dep_above_floor() { + let mut suspender = new_suspender(); + let a_low = block_ref(5, 0); + let c_high = block_ref(20, 0); + let b = header(25, 1, vec![a_low, c_high]); + let mut input = BTreeMap::new(); + input.insert(b, BTreeSet::from([a_low, c_high])); + + let (_accepted, _) = suspender.accept_or_suspend_received_headers(input); + + let outcome = suspender.evict_below_round(5); + + assert!(outcome.unsuspended_headers.is_empty()); + assert_eq!(outcome.ancestors_evicted, 1); + assert_eq!(outcome.fetch_entries_evicted, 1); + assert_eq!(suspender.suspended_blocks_refs().len(), 1); + assert!(suspender.missing_ancestors.contains_key(&c_high)); + assert!(suspender.headers_to_fetch.contains_key(&c_high)); + assert!(!suspender.missing_ancestors.contains_key(&a_low)); + assert!(!suspender.headers_to_fetch.contains_key(&a_low)); + } + + #[tokio::test] + async fn evict_below_round_cascades_chain() { + // Chain: a (round 5, evicted) ← b (round 10, suspended on a) + // ← c (round 20, suspended on b) + let mut suspender = new_suspender(); + let a = block_ref(5, 0); + let b = header(10, 1, vec![a]); + let b_ref = b.reference(); + let c = header(20, 2, vec![b_ref]); + + let c_ref = c.reference(); + let mut input_b = BTreeMap::new(); + input_b.insert(b, BTreeSet::from([a])); + suspender.accept_or_suspend_received_headers(input_b); + + let mut input_c = BTreeMap::new(); + input_c.insert(c, BTreeSet::from([b_ref])); + suspender.accept_or_suspend_received_headers(input_c); + + let outcome = suspender.evict_below_round(5); + + let unsuspended_refs: BTreeSet = outcome + .unsuspended_headers + .iter() + .map(|h| h.reference()) + .collect(); + assert_eq!(unsuspended_refs, BTreeSet::from([b_ref, c_ref])); + assert!(suspender.is_empty()); + } + + #[tokio::test] + async fn evict_below_round_prunes_only_fetch_below_floor() { + let mut suspender = new_suspender(); + let r3 = block_ref(3, 0); + let r7 = block_ref(7, 1); + let r15 = block_ref(15, 2); + suspender.insert_block_to_fetch(r3, BTreeSet::from([0u8.into()])); + suspender.insert_block_to_fetch(r7, BTreeSet::from([1u8.into()])); + suspender.insert_block_to_fetch(r15, BTreeSet::from([2u8.into()])); + + let outcome = suspender.evict_below_round(7); + + assert_eq!(outcome.fetch_entries_evicted, 2); + let remaining: BTreeSet = suspender.headers_to_fetch.keys().copied().collect(); + assert_eq!(remaining, BTreeSet::from([r15])); + } + /// Evaluates a set of verified block headers to determine which blocks /// should be suspended and which are still missing. /// diff --git a/crates/starfish/core/src/block_manager/mod.rs b/crates/starfish/core/src/block_manager/mod.rs index 5dca6d5f7de..2a4dbf2f26a 100644 --- a/crates/starfish/core/src/block_manager/mod.rs +++ b/crates/starfish/core/src/block_manager/mod.rs @@ -25,13 +25,43 @@ pub(crate) mod block_suspender; use crate::{ Round, block_header::{ - BlockHeaderAPI, BlockRef, VerifiedBlock, VerifiedBlockHeader, VerifiedTransactions, + BlockHeaderAPI, BlockHeaderDigest, BlockRef, VerifiedBlock, VerifiedBlockHeader, + VerifiedTransactions, }, block_manager::block_suspender::BlockSuspender, context::Context, dag_state::{DagState, DataSource}, }; +/// Combine headers accepted via the regular path with headers unsuspended by +/// the GC sweep, deduplicating on `BlockRef` and returning the result in +/// `BlockRef` ascending order (which is `(round, author, digest)`-ascending, +/// preserving the public "round ascending" guarantee of `try_accept_*`). +/// +/// Both inputs can name the same header: the regular path accepts a +/// freshly-arrived copy at the same time the GC sweep promotes a +/// previously-suspended copy. Producing the same header twice would corrupt +/// downstream metrics and DagState's accept assertions. +/// +/// Regular-path entries take precedence on duplicate keys (they're the +/// version we just verified for this batch). +fn merge_accepted_round_ascending( + regular: Vec, + gc_unsuspended: Vec, +) -> Vec { + if gc_unsuspended.is_empty() { + return regular; + } + let mut by_ref: BTreeMap = BTreeMap::new(); + for h in gc_unsuspended { + by_ref.insert(h.reference(), h); + } + for h in regular { + by_ref.insert(h.reference(), h); + } + by_ref.into_values().collect() +} + /// Block manager suspends incoming blocks until they are connected to the /// existing graph, returning newly connected blocks. /// TODO: As it is possible to have Byzantine validators who produce Blocks @@ -42,13 +72,19 @@ pub(crate) struct BlockManager { dag_state: Arc>, /// Keeps VerifiedTransactions of blocks whose headers have been suspended. - /// TODO: this set can grow to become too big, need to add some eviction + /// Bounded by the GC sweep in `maybe_evict_below_gc_floor`: any entry whose + /// block round is at or below `gc_round_for_last_commit` cannot be + /// sequenced and is dropped. suspended_transactions: BTreeMap, block_suspender: BlockSuspender, /// A vector that holds a tuple of (lowest_round, highest_round) of received /// blocks per authority. This is used for metrics reporting purposes /// and resets during restarts. received_block_rounds: Vec>, + /// Highest GC round floor we've already swept against. Initialized to 0; + /// monotonically non-decreasing. When `gc_round_for_last_commit()` advances + /// past this value, the next `try_accept_*` call runs an eviction sweep. + last_gc_floor_applied: Round, } impl BlockManager { @@ -59,6 +95,7 @@ impl BlockManager { suspended_transactions: BTreeMap::new(), block_suspender: BlockSuspender::new(context.clone()), received_block_rounds: vec![None; context.committee.size()], + last_gc_floor_applied: 0, } } @@ -68,6 +105,53 @@ impl BlockManager { self.suspended_transactions.clear(); self.block_suspender.reinitialize(); self.received_block_rounds = vec![None; self.context.committee.size()]; + self.last_gc_floor_applied = 0; + } + + /// Drops suspended state at or below the current GC floor and returns + /// any headers that became fully resolved as a result. + /// + /// The floor is `DagState::gc_round_for_last_commit()` — the same horizon + /// `DagState` itself uses for header eviction. Anything at or below it + /// cannot be sequenced and so cannot help any not-yet-accepted block. + /// + /// Cheap when the floor has not advanced since the last call: a single + /// read-locked field access on `DagState` and a comparison. + fn maybe_evict_below_gc_floor(&mut self) -> Vec { + // Gated on `consensus_block_restrictions`. Off the flag, BlockManager + // retains its original "fetch every missing ancestor forever" behavior. + if !self.context.protocol_config.consensus_block_restrictions() { + return vec![]; + } + let gc_floor = self.dag_state.read().gc_round_for_last_commit(); + if gc_floor <= self.last_gc_floor_applied { + return vec![]; + } + self.last_gc_floor_applied = gc_floor; + + let metrics = &self.context.metrics.node_metrics; + metrics.block_manager_gc_floor.set(gc_floor as i64); + + let pivot = BlockRef::new(gc_floor + 1, AuthorityIndex::MIN, BlockHeaderDigest::MIN); + let kept_txs = self.suspended_transactions.split_off(&pivot); + let txs_evicted = + std::mem::replace(&mut self.suspended_transactions, kept_txs).len() as u64; + metrics + .block_manager_gc_evicted_suspended_transactions_total + .inc_by(txs_evicted); + + let outcome = self.block_suspender.evict_below_round(gc_floor); + metrics + .block_manager_gc_evicted_missing_ancestors_total + .inc_by(outcome.ancestors_evicted as u64); + metrics + .block_manager_gc_evicted_fetch_entries_total + .inc_by(outcome.fetch_entries_evicted as u64); + metrics + .block_manager_gc_unsuspended_total + .inc_by(outcome.unsuspended_headers.len() as u64); + + outcome.unsuspended_headers } /// Does all the same things as try_accept_block_headers and additionally @@ -79,6 +163,8 @@ impl BlockManager { source: DataSource, ) -> (Vec, BTreeSet) { let _s = monitored_scope("BlockManager::try_accept_blocks"); + let gc_unsuspended = self.maybe_evict_below_gc_floor(); + let block_headers: Vec<_> = blocks .iter() .map(|b| b.verified_block_header.clone()) @@ -90,6 +176,8 @@ impl BlockManager { &present_header_and_ancestor_refs_in_dag_state, source, ); + let block_headers_to_accept = + merge_accepted_round_ascending(block_headers_to_accept, gc_unsuspended); // collect suspended transactions for accepted headers. let accepted_transactions = self.resolve_transactions( &block_headers_to_accept, @@ -119,6 +207,8 @@ impl BlockManager { source: DataSource, ) -> (Vec, BTreeSet) { let _s = monitored_scope("BlockManager::try_accept_block_headers"); + let gc_unsuspended = self.maybe_evict_below_gc_floor(); + // Headers are added through synchronizer, commit syncer and cordial // dissemination. let present_header_and_ancestor_refs_in_dag_state = @@ -128,6 +218,8 @@ impl BlockManager { &present_header_and_ancestor_refs_in_dag_state, source, ); + let block_headers_to_accept = + merge_accepted_round_ascending(block_headers_to_accept, gc_unsuspended); // collect transactions we already have for accepted headers. let accepted_transactions = self.resolve_transactions( &block_headers_to_accept, @@ -211,13 +303,26 @@ impl BlockManager { } if let Some(blocks) = blocks { + // Mirrors the gate in `filter_out_already_processed_and_sort`: when + // the hardening flag is on, a block at or below the GC floor cannot + // be sequenced and its header is dropped on arrival. Suspending its + // transactions would leave them stranded until the floor advanced + // again, allowing the map to grow between sweeps. + let gc_filter_round: Option = + if self.context.protocol_config.consensus_block_restrictions() { + Some(self.last_gc_floor_applied) + } else { + None + }; let mut accepted_transactions_from_blocks = vec![]; for block in blocks { if block_refs_to_be_accepted.contains(&block.reference()) || present_headers_and_ancestor_refs_in_dag_state.contains(&block.reference()) { accepted_transactions_from_blocks.push(block.verified_transactions); - } else if block.verified_transactions.has_transactions() { + } else if block.verified_transactions.has_transactions() + && gc_filter_round.is_none_or(|f| block.round() > f) + { // optimization to avoid suspending 0 set verified transactions. self.suspended_transactions .insert(block.reference(), block.verified_transactions); @@ -384,13 +489,23 @@ impl BlockManager { incoming_headers: Vec, present_header_and_ancestor_refs_in_dag_state: &BTreeSet, ) -> BTreeMap> { + // Off the hardening flag, every absent ancestor is treated as missing + // (legacy behavior). With the flag on, ancestors at or below the GC + // floor cannot affect any not-yet-sequenced block and are skipped. + let gc_filter_round: Option = + if self.context.protocol_config.consensus_block_restrictions() { + Some(self.last_gc_floor_applied) + } else { + None + }; let mut missing_ancestors = BTreeMap::new(); for incoming_header in incoming_headers { let ancestors: &[BlockRef] = incoming_header.ancestors(); let mut missing_ancestors_set = BTreeSet::new(); for ancestor in ancestors { let found = present_header_and_ancestor_refs_in_dag_state.contains(ancestor); - if !found { + let below_gc = gc_filter_round.is_some_and(|f| ancestor.round <= f); + if !found && !below_gc { missing_ancestors_set.insert(*ancestor); } } @@ -406,9 +521,26 @@ impl BlockManager { present_header_and_ancestor_refs_in_dag_state: &BTreeSet, source: DataSource, ) -> Vec { + let gc_filter_round: Option = + if self.context.protocol_config.consensus_block_restrictions() { + Some(self.last_gc_floor_applied) + } else { + None + }; let mut filtered = block_headers .into_iter() .filter_map(|block_header| { + // With the hardening flag on, drop incoming headers whose own + // round is at or below the GC floor; nothing they carry can be + // sequenced anymore. + if gc_filter_round.is_some_and(|f| block_header.round() <= f) { + self.context + .metrics + .node_metrics + .block_manager_gc_evicted_old_headers_total + .inc(); + return None; + } let found = present_header_and_ancestor_refs_in_dag_state .contains(&block_header.reference()); if found @@ -445,8 +577,9 @@ mod tests { use starfish_config::AuthorityIndex; use crate::{ + Round, block_header::{BlockHeaderAPI, BlockRef, VerifiedBlockHeader}, - block_manager::BlockManager, + block_manager::{BlockManager, merge_accepted_round_ascending}, context::Context, dag_state::{DagState, DataSource}, storage::mem_store::MemStore, @@ -1022,4 +1155,282 @@ mod tests { ); } } + + /// Helpers for the GC-eviction integration tests below. + mod gc_eviction_helpers { + use super::*; + use crate::{ + Round, + block_header::{ + BlockHeaderDigest, BlockTimestampMs, TestBlockHeader, VerifiedBlockHeader, + }, + commit::{CommitDigest, TrustedCommit}, + }; + + pub(super) fn header( + round: Round, + author: u8, + ancestors: Vec, + ) -> VerifiedBlockHeader { + let bh = TestBlockHeader::new(round, author) + .set_ancestors(ancestors) + .build(); + VerifiedBlockHeader::new_for_test(bh) + } + + pub(super) fn block_ref(round: Round, author: u8) -> BlockRef { + BlockRef::new(round, author.into(), BlockHeaderDigest::default()) + } + + /// Plant a `last_commit` in DagState whose leader round is + /// `commit_leader_round`, so `gc_round_for_last_commit()` returns + /// `commit_leader_round - gc_depth*2`. + pub(super) fn plant_last_commit( + dag_state: &Arc>, + context: &Arc, + commit_leader_round: Round, + ) { + let leader = block_ref(commit_leader_round, 0); + let commit = TrustedCommit::new_for_test( + context, + // commit index + 1, + CommitDigest::MIN, + // timestamp + 0 as BlockTimestampMs, + leader, + vec![leader], + vec![], + ); + dag_state.write().set_last_commit(commit); + } + } + + /// With the hardening flag on and a non-zero gc_floor, an incoming header + /// whose only missing ancestor is below the floor is accepted directly, + /// not suspended, and is not registered for fetching. + #[tokio::test] + async fn gc_eviction_accepts_header_with_only_old_missing_ancestors() { + use gc_eviction_helpers::*; + + let (mut context, _) = Context::new_for_test(4); + context + .protocol_config + .set_consensus_block_restrictions_for_testing(true); + let context = Arc::new(context); + let gc_depth = context.protocol_config.gc_depth(); + + let store = Arc::new(MemStore::new(context.clone())); + let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store))); + + // Plant a commit with leader round large enough that gc_floor > 0. + let commit_leader_round = gc_depth * 2 + 200; + plant_last_commit(&dag_state, &context, commit_leader_round); + let gc_floor = dag_state.read().gc_round_for_last_commit(); + assert!(gc_floor > 0); + + let mut block_manager = BlockManager::new(context, dag_state); + + // Header at gc_floor + 50 with one missing ancestor at gc_floor - 10. + let old_ancestor = block_ref(gc_floor.saturating_sub(10), 0); + let h = header(gc_floor + 50, 1, vec![old_ancestor]); + + let (accepted, missing) = + block_manager.try_accept_block_headers(vec![h.clone()], DataSource::Test); + + assert_eq!(accepted, vec![h]); + assert!( + missing.is_empty(), + "old ancestor below gc_floor should not be reported as missing" + ); + assert!( + block_manager.blocks_to_fetch().is_empty(), + "old ancestor below gc_floor should not be queued for fetching" + ); + } + + /// A full block whose own round is at or below the GC floor must not be + /// suspended in `suspended_transactions`: its header is dropped on arrival + /// (`filter_out_already_processed_and_sort`) and will never be accepted, so + /// the entry would sit forever and accumulate between sweeps. + #[tokio::test] + async fn gc_eviction_does_not_suspend_old_block_transactions() { + use gc_eviction_helpers::*; + + let (mut context, _) = Context::new_for_test(4); + context + .protocol_config + .set_consensus_block_restrictions_for_testing(true); + let context = Arc::new(context); + let gc_depth = context.protocol_config.gc_depth(); + + let store = Arc::new(MemStore::new(context.clone())); + let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store))); + let commit_leader_round = gc_depth * 2 + 200; + plant_last_commit(&dag_state, &context, commit_leader_round); + + let mut block_manager = BlockManager::new(context, dag_state); + + // Trigger the first sweep so `last_gc_floor_applied` is set without + // accepting anything. + block_manager.try_accept_block_headers(vec![], DataSource::Test); + let gc_floor = block_manager.last_gc_floor_applied; + assert!(gc_floor > 0); + + // Now feed a full block at gc_floor (i.e. at the floor — too old). + // Build a header at that round and a non-empty transactions payload so + // the existing "skip empty" optimization isn't what saves us. + let h = header(gc_floor, 1, vec![]); + let txs = crate::block_header::VerifiedTransactions::new_for_test( + &h, + vec![crate::block_header::Transaction::new(vec![1u8; 16])], + ); + let block = crate::block_header::VerifiedBlock::new(h, txs); + let (accepted, _) = block_manager.try_accept_blocks(vec![block], DataSource::Test); + + assert!(accepted.is_empty(), "header at GC floor must be dropped"); + assert_eq!( + block_manager.suspended_transactions.len(), + 0, + "transactions for a too-old block must not be suspended" + ); + } + + /// `merge_accepted_round_ascending` deduplicates by `BlockRef` and emits + /// in round-ascending order, even when the GC-unsuspended list arrives + /// out of order and overlaps with the regular-path list. + #[test] + fn merge_accepted_round_ascending_dedups_and_sorts() { + use gc_eviction_helpers::*; + + let h_round_5 = header(5, 0, vec![]); + let h_round_8 = header(8, 1, vec![]); + let h_round_3 = header(3, 2, vec![]); + let h_round_5_dup = h_round_5.clone(); + + // Regular-path output is already round-ascending per `process_block_headers`. + let regular = vec![h_round_3, h_round_5]; + // GC-unsuspended is in stack-walk order — not sorted, and may overlap. + let gc = vec![h_round_8, h_round_5_dup]; + + let merged = merge_accepted_round_ascending(regular, gc); + + let rounds: Vec = merged + .iter() + .map(|h: &VerifiedBlockHeader| h.round()) + .collect(); + assert_eq!(rounds, vec![3, 5, 8]); + + // No duplicates by reference. + let mut refs: Vec = merged + .iter() + .map(|h: &VerifiedBlockHeader| h.reference()) + .collect(); + let dedup_len = { + refs.sort(); + refs.dedup(); + refs.len() + }; + assert_eq!(dedup_len, 3); + } + + /// `suspended_transactions` entries with round below the floor are dropped + /// by the sweep when `gc_floor` advances. + #[tokio::test] + async fn gc_eviction_drops_suspended_transactions_below_floor() { + use gc_eviction_helpers::*; + + let (mut context, _) = Context::new_for_test(4); + context + .protocol_config + .set_consensus_block_restrictions_for_testing(true); + let context = Arc::new(context); + let gc_depth = context.protocol_config.gc_depth(); + + let store = Arc::new(MemStore::new(context.clone())); + let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store))); + let mut block_manager = BlockManager::new(context.clone(), dag_state.clone()); + + // Manually put an entry into suspended_transactions at a low round. + let stale_header = header(50, 0, vec![]); + let stale_ref = stale_header.reference(); + block_manager.suspended_transactions.insert( + stale_ref, + crate::block_header::VerifiedTransactions::new_for_test(&stale_header, vec![]), + ); + assert_eq!(block_manager.suspended_transactions.len(), 1); + + // Advance the floor well past round 50 and trigger a sweep via + // try_accept_block_headers with an empty input. + let commit_leader_round = gc_depth * 2 + 500; + plant_last_commit(&dag_state, &context, commit_leader_round); + block_manager.try_accept_block_headers(vec![], DataSource::Test); + + assert_eq!(block_manager.suspended_transactions.len(), 0); + } + + /// The sweep is a no-op when `gc_floor` does not advance between calls. + #[tokio::test] + async fn gc_eviction_sweep_is_idempotent_when_floor_unchanged() { + use gc_eviction_helpers::*; + + let (mut context, _) = Context::new_for_test(4); + context + .protocol_config + .set_consensus_block_restrictions_for_testing(true); + let context = Arc::new(context); + let gc_depth = context.protocol_config.gc_depth(); + + let store = Arc::new(MemStore::new(context.clone())); + let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store))); + let commit_leader_round = gc_depth * 2 + 200; + plant_last_commit(&dag_state, &context, commit_leader_round); + + let mut block_manager = BlockManager::new(context, dag_state); + + // First call applies the floor. + block_manager.try_accept_block_headers(vec![], DataSource::Test); + let first_floor = block_manager.last_gc_floor_applied; + assert!(first_floor > 0); + + // Second call at the same floor should not change anything. + block_manager.try_accept_block_headers(vec![], DataSource::Test); + assert_eq!(block_manager.last_gc_floor_applied, first_floor); + } + + /// With the hardening flag off, the sweep is fully disabled: no eviction, + /// no floor advance, no filtering of low-round ancestors. + #[tokio::test] + async fn gc_eviction_disabled_when_flag_off() { + use gc_eviction_helpers::*; + + let (mut context, _) = Context::new_for_test(4); + context + .protocol_config + .set_consensus_block_restrictions_for_testing(false); + let context = Arc::new(context); + let gc_depth = context.protocol_config.gc_depth(); + + let store = Arc::new(MemStore::new(context.clone())); + let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store))); + let commit_leader_round = gc_depth * 2 + 200; + plant_last_commit(&dag_state, &context, commit_leader_round); + + let mut block_manager = BlockManager::new(context, dag_state.clone()); + + // A header with a missing ancestor far below the would-be gc_floor + // should still be suspended (legacy behavior). + let gc_floor = dag_state.read().gc_round_for_last_commit(); + assert!(gc_floor > 0); + let old_ancestor = block_ref(gc_floor.saturating_sub(10), 0); + let h = header(gc_floor + 50, 1, vec![old_ancestor]); + + let (accepted, missing) = block_manager.try_accept_block_headers(vec![h], DataSource::Test); + assert!( + accepted.is_empty(), + "header should be suspended when flag off" + ); + assert_eq!(missing, BTreeSet::from([old_ancestor])); + assert_eq!(block_manager.last_gc_floor_applied, 0); + } } diff --git a/crates/starfish/core/src/metrics.rs b/crates/starfish/core/src/metrics.rs index f43513de128..08e739f4585 100644 --- a/crates/starfish/core/src/metrics.rs +++ b/crates/starfish/core/src/metrics.rs @@ -220,6 +220,12 @@ pub(crate) struct NodeMetrics { pub(crate) block_manager_missing_block_headers: IntGauge, pub(crate) block_manager_missing_block_headers_by_authority: IntCounterVec, pub(crate) block_manager_missing_ancestors_by_authority: IntCounterVec, + pub(crate) block_manager_gc_floor: IntGauge, + pub(crate) block_manager_gc_evicted_missing_ancestors_total: IntCounter, + pub(crate) block_manager_gc_evicted_fetch_entries_total: IntCounter, + pub(crate) block_manager_gc_unsuspended_total: IntCounter, + pub(crate) block_manager_gc_evicted_suspended_transactions_total: IntCounter, + pub(crate) block_manager_gc_evicted_old_headers_total: IntCounter, pub(crate) threshold_clock_round: IntGauge, pub(crate) subscriber_connection_attempts: IntCounterVec, pub(crate) subscribed_to: IntGaugeVec, @@ -931,6 +937,36 @@ impl NodeMetrics { &["authority"], registry, ).unwrap(), + block_manager_gc_floor: register_int_gauge_with_registry!( + "block_manager_gc_floor", + "The last GC round floor applied by the block manager. Suspended state at or below this round has been evicted.", + registry, + ).unwrap(), + block_manager_gc_evicted_missing_ancestors_total: register_int_counter_with_registry!( + "block_manager_gc_evicted_missing_ancestors_total", + "Total number of missing-ancestor entries dropped by the block manager GC sweep", + registry, + ).unwrap(), + block_manager_gc_evicted_fetch_entries_total: register_int_counter_with_registry!( + "block_manager_gc_evicted_fetch_entries_total", + "Total number of headers_to_fetch entries dropped by the block manager GC sweep", + registry, + ).unwrap(), + block_manager_gc_unsuspended_total: register_int_counter_with_registry!( + "block_manager_gc_unsuspended_total", + "Total number of headers unsuspended by the block manager GC sweep because all their missing ancestors fell below the GC floor", + registry, + ).unwrap(), + block_manager_gc_evicted_suspended_transactions_total: register_int_counter_with_registry!( + "block_manager_gc_evicted_suspended_transactions_total", + "Total number of suspended_transactions entries dropped by the block manager GC sweep", + registry, + ).unwrap(), + block_manager_gc_evicted_old_headers_total: register_int_counter_with_registry!( + "block_manager_gc_evicted_old_headers_total", + "Total number of incoming block headers dropped because their round is at or below the GC floor", + registry, + ).unwrap(), threshold_clock_round: register_int_gauge_with_registry!( "threshold_clock_round", "The current threshold clock round. We only advance to a new round when a quorum of parents have been synced.",