Skip to content

Commit 33f6a7f

Browse files
committed
feat(starfish): GC-based eviction of stale missing-ancestor and suspended-tx state
Bounds BlockManager's in-memory bookkeeping by dropping refs the network's GC floor has already passed. An ancestor at or below `DagState::gc_round_for_last_commit()` cannot influence any not-yet-sequenced block, so retaining it as 'missing' or chasing it via HeaderSynchronizer serves no purpose. - Add `BlockSuspender::evict_below_round(round_floor)`: drops `headers_to_fetch` entries below the floor, then treats every `missing_ancestors` key below the floor as resolved and reuses `recursively_unsuspend_dependents` to cascade. Returns headers that became fully resolved. - Add `BlockManager::maybe_evict_below_gc_floor` called at the top of both `try_accept_blocks` and `try_accept_block_headers`, gated on `consensus_block_restrictions` and on the floor having advanced since the last call (`last_gc_floor_applied` field). - In `find_missing_ancestors`, skip ancestors at or below the floor. - In `filter_out_already_processed_and_sort`, drop incoming headers whose own round is at or below the floor. - Also retain `suspended_transactions` entries above the floor, addressing the existing unbounded-growth TODO. Same floor as headers — a block at or below `gc_round_for_last_commit` cannot be sequenced, so its txs cannot reach DagState. - Five new metrics under the `block_manager_gc_*` prefix to make sweep activity visible on Grafana. Without the hardening flag, behavior is unchanged: `Option<Round>`-typed gates make the round filters true no-ops in legacy mode. Closes #11401
1 parent 58b3b6a commit 33f6a7f

3 files changed

Lines changed: 479 additions & 5 deletions

File tree

crates/starfish/core/src/block_manager/block_suspender.rs

Lines changed: 159 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,14 @@ use itertools::Itertools;
1010
use starfish_config::AuthorityIndex;
1111
use tracing::debug;
1212

13-
use crate::{BlockHeaderAPI, BlockRef, VerifiedBlockHeader, context::Context};
13+
use crate::{BlockHeaderAPI, BlockRef, Round, VerifiedBlockHeader, context::Context};
14+
15+
/// Outcome of [`BlockSuspender::evict_below_round`].
16+
pub(crate) struct EvictBelowRoundOutcome {
17+
pub(crate) unsuspended_headers: Vec<VerifiedBlockHeader>,
18+
pub(crate) ancestors_evicted: usize,
19+
pub(crate) fetch_entries_evicted: usize,
20+
}
1421

1522
struct SuspendedBlockHeader {
1623
block_header: VerifiedBlockHeader,
@@ -387,6 +394,45 @@ impl BlockSuspender {
387394
pub(crate) fn is_block_ref_suspended(&self, block_ref: &BlockRef) -> bool {
388395
self.suspended_headers.contains_key(block_ref)
389396
}
397+
398+
/// Drops missing-ancestor and fetch entries whose round is at or below
399+
/// `round_floor`, and cascades any resulting unsuspensions.
400+
///
401+
/// An ancestor at or below `round_floor` cannot influence a
402+
/// not-yet-sequenced block (linearizer GC rule), so suspending blocks
403+
/// that wait on it and fetching it from peers serves no purpose.
404+
/// Treating those keys as "resolved" is equivalent to admitting we will
405+
/// never receive them and reusing the existing cascade machinery to
406+
/// release everything that was only waiting on them.
407+
///
408+
/// Returns headers that became fully resolved (and any further headers that
409+
/// cascaded from them) so the caller can write them to `DagState`.
410+
pub(crate) fn evict_below_round(&mut self, round_floor: Round) -> EvictBelowRoundOutcome {
411+
let fetch_before = self.headers_to_fetch.len();
412+
self.headers_to_fetch.retain(|r, _| r.round > round_floor);
413+
let fetch_evicted = fetch_before - self.headers_to_fetch.len();
414+
415+
let evicted_ancestor_refs: Vec<BlockRef> = self
416+
.missing_ancestors
417+
.keys()
418+
.filter(|r| r.round <= round_floor)
419+
.copied()
420+
.collect();
421+
let ancestors_evicted = evicted_ancestor_refs.len();
422+
423+
let mut unsuspended = vec![];
424+
for evicted_ref in evicted_ancestor_refs {
425+
unsuspended.extend(self.recursively_unsuspend_dependents(evicted_ref));
426+
}
427+
428+
self.update_stats(0);
429+
430+
EvictBelowRoundOutcome {
431+
unsuspended_headers: unsuspended,
432+
ancestors_evicted,
433+
fetch_entries_evicted: fetch_evicted,
434+
}
435+
}
390436
pub(crate) fn headers_to_fetch(&self) -> BTreeMap<BlockRef, BTreeSet<AuthorityIndex>> {
391437
self.headers_to_fetch.clone()
392438
}
@@ -438,6 +484,118 @@ impl BlockSuspender {
438484
#[cfg(test)]
439485
pub(crate) mod tests {
440486
use super::*;
487+
use crate::{
488+
block_header::{BlockHeaderDigest, TestBlockHeader},
489+
context::Context,
490+
};
491+
492+
fn block_ref(round: Round, author: u8) -> BlockRef {
493+
BlockRef::new(round, author.into(), BlockHeaderDigest::default())
494+
}
495+
496+
fn header(round: Round, author: u8, ancestors: Vec<BlockRef>) -> VerifiedBlockHeader {
497+
let bh = TestBlockHeader::new(round, author)
498+
.set_ancestors(ancestors)
499+
.build();
500+
VerifiedBlockHeader::new_for_test(bh)
501+
}
502+
503+
fn new_suspender() -> BlockSuspender {
504+
let (context, _) = Context::new_for_test(4);
505+
BlockSuspender::new(Arc::new(context))
506+
}
507+
508+
#[test]
509+
fn evict_below_round_unsuspends_single_dep() {
510+
let mut suspender = new_suspender();
511+
let a = block_ref(5, 0); // missing ancestor at floor
512+
let b = header(10, 1, vec![a]);
513+
let mut input = BTreeMap::new();
514+
input.insert(b.clone(), BTreeSet::from([a]));
515+
516+
let (accepted, _) = suspender.accept_or_suspend_received_headers(input);
517+
assert!(accepted.is_empty());
518+
assert_eq!(suspender.suspended_blocks_refs().len(), 1);
519+
assert!(suspender.headers_to_fetch.contains_key(&a));
520+
521+
let outcome = suspender.evict_below_round(5);
522+
523+
assert_eq!(outcome.unsuspended_headers, vec![b]);
524+
assert_eq!(outcome.ancestors_evicted, 1);
525+
assert_eq!(outcome.fetch_entries_evicted, 1);
526+
assert!(suspender.is_empty());
527+
}
528+
529+
#[test]
530+
fn evict_below_round_keeps_dep_above_floor() {
531+
let mut suspender = new_suspender();
532+
let a_low = block_ref(5, 0);
533+
let c_high = block_ref(20, 0);
534+
let b = header(25, 1, vec![a_low, c_high]);
535+
let mut input = BTreeMap::new();
536+
input.insert(b, BTreeSet::from([a_low, c_high]));
537+
538+
let (_accepted, _) = suspender.accept_or_suspend_received_headers(input);
539+
540+
let outcome = suspender.evict_below_round(5);
541+
542+
assert!(outcome.unsuspended_headers.is_empty());
543+
assert_eq!(outcome.ancestors_evicted, 1);
544+
assert_eq!(outcome.fetch_entries_evicted, 1);
545+
assert_eq!(suspender.suspended_blocks_refs().len(), 1);
546+
assert!(suspender.missing_ancestors.contains_key(&c_high));
547+
assert!(suspender.headers_to_fetch.contains_key(&c_high));
548+
assert!(!suspender.missing_ancestors.contains_key(&a_low));
549+
assert!(!suspender.headers_to_fetch.contains_key(&a_low));
550+
}
551+
552+
#[test]
553+
fn evict_below_round_cascades_chain() {
554+
// Chain: a (round 5, evicted) ← b (round 10, suspended on a)
555+
// ← c (round 20, suspended on b)
556+
let mut suspender = new_suspender();
557+
let a = block_ref(5, 0);
558+
let b = header(10, 1, vec![a]);
559+
let b_ref = b.reference();
560+
let c = header(20, 2, vec![b_ref]);
561+
562+
let c_ref = c.reference();
563+
let mut input_b = BTreeMap::new();
564+
input_b.insert(b, BTreeSet::from([a]));
565+
suspender.accept_or_suspend_received_headers(input_b);
566+
567+
let mut input_c = BTreeMap::new();
568+
input_c.insert(c, BTreeSet::from([b_ref]));
569+
suspender.accept_or_suspend_received_headers(input_c);
570+
571+
let outcome = suspender.evict_below_round(5);
572+
573+
let unsuspended_refs: BTreeSet<BlockRef> = outcome
574+
.unsuspended_headers
575+
.iter()
576+
.map(|h| h.reference())
577+
.collect();
578+
assert_eq!(unsuspended_refs, BTreeSet::from([b_ref, c_ref]));
579+
assert!(suspender.is_empty());
580+
}
581+
582+
#[test]
583+
fn evict_below_round_prunes_only_fetch_below_floor() {
584+
let mut suspender = new_suspender();
585+
let r3 = block_ref(3, 0);
586+
let r7 = block_ref(7, 1);
587+
let r15 = block_ref(15, 2);
588+
suspender.insert_block_to_fetch(r3, BTreeSet::from([0u8.into()]));
589+
suspender.insert_block_to_fetch(r7, BTreeSet::from([1u8.into()]));
590+
suspender.insert_block_to_fetch(r15, BTreeSet::from([2u8.into()]));
591+
592+
let outcome = suspender.evict_below_round(7);
593+
594+
assert_eq!(outcome.fetch_entries_evicted, 2);
595+
let remaining: BTreeSet<BlockRef> = suspender.headers_to_fetch.keys().copied().collect();
596+
assert_eq!(remaining, BTreeSet::from([r15]));
597+
}
598+
441599
/// Evaluates a set of verified block headers to determine which blocks
442600
/// should be suspended and which are still missing.
443601
///

0 commit comments

Comments
 (0)