Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 185 additions & 3 deletions crates/starfish/core/src/block_manager/block_suspender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,17 @@ use itertools::Itertools;
use starfish_config::AuthorityIndex;
use tracing::debug;

use crate::{BlockHeaderAPI, BlockRef, VerifiedBlockHeader, context::Context};
use crate::{
BlockHeaderAPI, BlockRef, Round, VerifiedBlockHeader, block_header::BlockHeaderDigest,
context::Context,
};

/// Outcome of [`BlockSuspender::evict_below_round`].
pub(crate) struct EvictBelowRoundOutcome {
pub(crate) unsuspended_headers: Vec<VerifiedBlockHeader>,
pub(crate) ancestors_evicted: usize,
pub(crate) fetch_entries_evicted: usize,
}

struct SuspendedBlockHeader {
block_header: VerifiedBlockHeader,
Expand Down Expand Up @@ -206,7 +216,7 @@ impl BlockSuspender {
(fully_resolved_headers, ancestors_to_fetch)
}
/// Recursively unsuspends all blocks that were dependent on a now-accepted
/// block.
/// or now-gc-evicted block.
///
/// Starting from `resolved_block`, this function walks the dependency graph
/// and attempts to unsuspend any suspended blocks that were blocked on
Expand Down Expand Up @@ -387,12 +397,72 @@ impl BlockSuspender {
pub(crate) fn is_block_ref_suspended(&self, block_ref: &BlockRef) -> bool {
self.suspended_headers.contains_key(block_ref)
}

/// Drops missing-ancestor and fetch entries whose round is at or below
/// `round_floor`, and cascades any resulting unsuspensions.
///
/// An ancestor at or below `round_floor` cannot influence a
/// not-yet-sequenced block (linearizer GC rule), so suspending blocks
/// that wait on it and fetching it from peers serves no purpose.
/// Treating those keys as "resolved" is equivalent to admitting we will
/// never receive them and reusing the existing cascade machinery to
/// release everything that was only waiting on them.
///
/// Returns headers that became fully resolved (and any further headers that
/// cascaded from them) so the caller can write them to `DagState`.
pub(crate) fn evict_below_round(&mut self, round_floor: Round) -> EvictBelowRoundOutcome {
// Smallest BlockRef with round > round_floor — split keeps everything
// from this key onward (round > floor) in the original map.
let pivot = round_floor
.checked_add(1)
.map(|round| BlockRef::new(round, AuthorityIndex::MIN, BlockHeaderDigest::MIN));

let fetch_entries_evicted = if let Some(pivot) = pivot {
let kept_to_fetch = self.headers_to_fetch.split_off(&pivot);
std::mem::replace(&mut self.headers_to_fetch, kept_to_fetch).len()
} else {
std::mem::take(&mut self.headers_to_fetch).len()
};

// Collect evicted keys via an O(log n + k) range scan; leave them in
// `self.missing_ancestors` so `recursively_unsuspend_dependents` can
// remove and cascade through them as usual.
let evicted_ancestor_refs: Vec<BlockRef> = if let Some(pivot) = pivot {
self.missing_ancestors
.range(..pivot)
.map(|(k, _)| *k)
.collect()
} else {
self.missing_ancestors.keys().copied().collect()
};
let ancestors_evicted = evicted_ancestor_refs.len();

let mut unsuspended = vec![];
for evicted_ref in evicted_ancestor_refs {
unsuspended.extend(self.recursively_unsuspend_dependents(evicted_ref));
}

self.refresh_state_gauges();

EvictBelowRoundOutcome {
unsuspended_headers: unsuspended,
ancestors_evicted,
fetch_entries_evicted,
}
}
pub(crate) fn headers_to_fetch(&self) -> BTreeMap<BlockRef, BTreeSet<AuthorityIndex>> {
self.headers_to_fetch.clone()
}
fn update_stats(&mut self, blocks_to_fetch: u64) {
self.context
.metrics
.node_metrics
.missing_block_headers_total
.inc_by(blocks_to_fetch);
self.refresh_state_gauges();
}
fn refresh_state_gauges(&self) {
let metrics = &self.context.metrics.node_metrics;
metrics.missing_block_headers_total.inc_by(blocks_to_fetch);
metrics
.block_manager_suspended_block_headers
.set(self.suspended_headers.len() as i64);
Expand Down Expand Up @@ -438,6 +508,118 @@ impl BlockSuspender {
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use crate::{
block_header::{BlockHeaderDigest, TestBlockHeader},
context::Context,
};

fn block_ref(round: Round, author: u8) -> BlockRef {
BlockRef::new(round, author.into(), BlockHeaderDigest::default())
}

fn header(round: Round, author: u8, ancestors: Vec<BlockRef>) -> VerifiedBlockHeader {
let bh = TestBlockHeader::new(round, author)
.set_ancestors(ancestors)
.build();
VerifiedBlockHeader::new_for_test(bh)
}

fn new_suspender() -> BlockSuspender {
let (context, _) = Context::new_for_test(4);
BlockSuspender::new(Arc::new(context))
}

#[tokio::test]
async fn evict_below_round_unsuspends_single_dep() {
let mut suspender = new_suspender();
let a = block_ref(5, 0); // missing ancestor at floor
let b = header(10, 1, vec![a]);
let mut input = BTreeMap::new();
input.insert(b.clone(), BTreeSet::from([a]));

let (accepted, _) = suspender.accept_or_suspend_received_headers(input);
assert!(accepted.is_empty());
assert_eq!(suspender.suspended_blocks_refs().len(), 1);
assert!(suspender.headers_to_fetch.contains_key(&a));

let outcome = suspender.evict_below_round(5);

assert_eq!(outcome.unsuspended_headers, vec![b]);
assert_eq!(outcome.ancestors_evicted, 1);
assert_eq!(outcome.fetch_entries_evicted, 1);
assert!(suspender.is_empty());
}

#[tokio::test]
async fn evict_below_round_keeps_dep_above_floor() {
let mut suspender = new_suspender();
let a_low = block_ref(5, 0);
let c_high = block_ref(20, 0);
let b = header(25, 1, vec![a_low, c_high]);
let mut input = BTreeMap::new();
input.insert(b, BTreeSet::from([a_low, c_high]));

let (_accepted, _) = suspender.accept_or_suspend_received_headers(input);

let outcome = suspender.evict_below_round(5);

assert!(outcome.unsuspended_headers.is_empty());
assert_eq!(outcome.ancestors_evicted, 1);
assert_eq!(outcome.fetch_entries_evicted, 1);
assert_eq!(suspender.suspended_blocks_refs().len(), 1);
assert!(suspender.missing_ancestors.contains_key(&c_high));
assert!(suspender.headers_to_fetch.contains_key(&c_high));
assert!(!suspender.missing_ancestors.contains_key(&a_low));
assert!(!suspender.headers_to_fetch.contains_key(&a_low));
}

#[tokio::test]
async fn evict_below_round_cascades_chain() {
// Chain: a (round 5, evicted) ← b (round 10, suspended on a)
// ← c (round 20, suspended on b)
let mut suspender = new_suspender();
let a = block_ref(5, 0);
let b = header(10, 1, vec![a]);
let b_ref = b.reference();
let c = header(20, 2, vec![b_ref]);

let c_ref = c.reference();
let mut input_b = BTreeMap::new();
input_b.insert(b, BTreeSet::from([a]));
suspender.accept_or_suspend_received_headers(input_b);

let mut input_c = BTreeMap::new();
input_c.insert(c, BTreeSet::from([b_ref]));
suspender.accept_or_suspend_received_headers(input_c);

let outcome = suspender.evict_below_round(5);

let unsuspended_refs: BTreeSet<BlockRef> = outcome
.unsuspended_headers
.iter()
.map(|h| h.reference())
.collect();
assert_eq!(unsuspended_refs, BTreeSet::from([b_ref, c_ref]));
assert!(suspender.is_empty());
}

#[tokio::test]
async fn evict_below_round_prunes_only_fetch_below_floor() {
let mut suspender = new_suspender();
let r3 = block_ref(3, 0);
let r7 = block_ref(7, 1);
let r15 = block_ref(15, 2);
suspender.insert_block_to_fetch(r3, BTreeSet::from([0u8.into()]));
suspender.insert_block_to_fetch(r7, BTreeSet::from([1u8.into()]));
suspender.insert_block_to_fetch(r15, BTreeSet::from([2u8.into()]));

let outcome = suspender.evict_below_round(7);

assert_eq!(outcome.fetch_entries_evicted, 2);
let remaining: BTreeSet<BlockRef> = suspender.headers_to_fetch.keys().copied().collect();
assert_eq!(remaining, BTreeSet::from([r15]));
}

/// Evaluates a set of verified block headers to determine which blocks
/// should be suspended and which are still missing.
///
Expand Down
Loading
Loading