Skip to content

Commit 0946cbd

Browse files
authored
Fix out-of-order verification of cycle head dependencies (#1061)
* Start working on maybe_changed_after * Add `maybe_changed_after` for tracked structs * Remove old cycle handling in maybe_changed_after * Add tests * Don't collect deps if there's an outer cycle * Remove `validate_provisional` * Compute outer most cycle * Set edges in more branches * More * Improve logging * Flatten dependencies of all heads, use lazy validation * Properly skip `complete_iteration` in the no cycle head case * Revert `struct_database_key_index` changes * Revert more changes * Revert even more * Rename * Fix memory leak * Clippy * Advoid recursing cycle heads * Add missing inventory gating * Only track queries in seen * Reuse allocations * Avoid copying current-revision input-outputs into `QueryRevisions` * Revert "Avoid copying current-revision input-outputs into `QueryRevisions`" This reverts commit e686db9. * Move more out of hot path * Reuse allocations more * Revert some Ingredient::maybe_changed_after changes * More cleanup * Back out disambiguator changes * Docs * Small reordering * Panic when using accumulator in fixpoint query * Revert unnecessary test changes * Simplify more * Code review feedback * Format
1 parent e1e8df6 commit 0946cbd

26 files changed

Lines changed: 584 additions & 390 deletions

.github/workflows/test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ jobs:
124124

125125
benchmarks:
126126
# https://github.com/CodSpeedHQ/action/issues/126
127-
if: github.event_name != 'merge_group'
127+
if: github.repository == 'salsa-rs/salsa' &&github.event_name != 'merge_group'
128128
name: Benchmarks
129129
runs-on: ubuntu-latest
130130
steps:

src/accumulator.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,15 @@ use std::panic::UnwindSafe;
77

88
use accumulated::{Accumulated, AnyAccumulated};
99

10-
use crate::function::{VerifyCycleHeads, VerifyResult};
10+
use crate::function::VerifyResult;
1111
use crate::hash::{FxHashSet, FxIndexSet};
1212
use crate::ingredient::{Ingredient, Jar};
1313
use crate::plumbing::ZalsaLocal;
1414
use crate::sync::Arc;
1515
use crate::table::memo::MemoTableTypes;
1616
use crate::zalsa::{IngredientIndex, JarKind, Zalsa};
1717
use crate::zalsa_local::QueryEdge;
18-
use crate::{Database, Id, Revision};
18+
use crate::{Database, DatabaseKeyIndex, Id, Revision};
1919

2020
mod accumulated;
2121
pub(crate) mod accumulated_map;
@@ -107,7 +107,6 @@ impl<A: Accumulator> Ingredient for IngredientImpl<A> {
107107
_db: crate::database::RawDatabase<'_>,
108108
_input: Id,
109109
_revision: Revision,
110-
_cycle_heads: &mut VerifyCycleHeads,
111110
) -> VerifyResult {
112111
panic!("nothing should ever depend on an accumulator directly")
113112
}
@@ -137,6 +136,16 @@ impl<A: Accumulator> Ingredient for IngredientImpl<A> {
137136
fn memo_table_types_mut(&mut self) -> &mut Arc<MemoTableTypes> {
138137
unreachable!("accumulator does not allocate pages")
139138
}
139+
140+
fn flatten_cycle_head_dependencies(
141+
&self,
142+
_zalsa: &Zalsa,
143+
_id: Id,
144+
_flattened_input_outputs: &mut FxIndexSet<QueryEdge>,
145+
_seen: &mut FxHashSet<DatabaseKeyIndex>,
146+
) {
147+
panic!("nothing should ever depend on an accumulator directly")
148+
}
140149
}
141150

142151
impl<A> std::fmt::Debug for IngredientImpl<A>

src/active_query.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@ use crate::key::DatabaseKeyIndex;
1010
use crate::runtime::Stamp;
1111
use crate::sync::atomic::AtomicBool;
1212
use crate::tracked_struct::{Disambiguator, DisambiguatorMap, IdentityHash, IdentityMap};
13-
use crate::zalsa_local::{QueryEdge, QueryOrigin, QueryRevisions, QueryRevisionsExtra};
13+
use crate::zalsa_local::{
14+
QueryEdge, QueryEdgeKind, QueryOrigin, QueryRevisions, QueryRevisionsExtra,
15+
};
1416
use crate::Revision;
1517
use crate::{
1618
cycle::{CycleHeads, IterationCount},
@@ -81,10 +83,19 @@ impl ActiveQuery {
8183
) {
8284
assert!(self.input_outputs.is_empty());
8385

84-
self.input_outputs.extend(edges.iter().cloned());
86+
// Copy over outputs for `diff_outputs`, don't copy inputs because cycle heads
87+
// flatten all input dependencies.
88+
self.input_outputs.extend(
89+
edges
90+
.iter()
91+
.filter(|edge| matches!(edge.kind(), QueryEdgeKind::Output(_)))
92+
.copied(),
93+
);
8594
self.durability = self.durability.min(durability);
8695
self.changed_at = self.changed_at.max(changed_at);
8796
self.untracked_read |= untracked_read;
97+
self.disambiguator_map
98+
.seed(active_tracked_ids.iter().map(|(id, _)| id));
8899

89100
// Mark all tracked structs from the previous iteration as active.
90101
self.tracked_struct_ids

src/cycle.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ impl AtomicIterationCount {
173173
self.0.store(value.0, Ordering::Release);
174174
}
175175

176-
pub(crate) fn store_mut(&mut self, value: IterationCount) {
176+
pub(crate) fn set(&mut self, value: IterationCount) {
177177
*self.0.get_mut() = value.0;
178178
}
179179
}
@@ -284,7 +284,7 @@ impl CycleHeads {
284284
.iter_mut()
285285
.find(|cycle_head| cycle_head.database_key_index == cycle_head_index)
286286
{
287-
cycle_head.iteration_count.store_mut(new_iteration_count);
287+
cycle_head.iteration_count.set(new_iteration_count);
288288
}
289289
}
290290

@@ -329,7 +329,7 @@ impl CycleHeads {
329329

330330
if *removed {
331331
*removed = false;
332-
existing.iteration_count.store_mut(iteration_count);
332+
existing.iteration_count.set(iteration_count);
333333

334334
true
335335
} else {
@@ -511,7 +511,7 @@ impl<'db> ProvisionalStatus<'db> {
511511
pub(crate) fn cycle_heads(&self) -> &'db CycleHeads {
512512
match self {
513513
ProvisionalStatus::Provisional { cycle_heads, .. } => cycle_heads,
514-
_ => empty_cycle_heads(),
514+
ProvisionalStatus::Final { .. } => empty_cycle_heads(),
515515
}
516516
}
517517

src/function.rs

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
pub(crate) use maybe_changed_after::{VerifyCycleHeads, VerifyResult};
1+
pub(crate) use maybe_changed_after::VerifyResult;
22
pub(crate) use sync::{ClaimGuard, ClaimResult, Reentrancy, SyncGuard, SyncOwner, SyncTable};
33

44
use std::any::Any;
@@ -322,11 +322,10 @@ where
322322
db: RawDatabase<'_>,
323323
input: Id,
324324
revision: Revision,
325-
cycle_heads: &mut VerifyCycleHeads,
326325
) -> VerifyResult {
327326
// SAFETY: The `db` belongs to the ingredient as per caller invariant
328327
let db = unsafe { self.view_caster().downcast_unchecked(db) };
329-
self.maybe_changed_after(db, input, revision, cycle_heads)
328+
self.maybe_changed_after(db, input, revision)
330329
}
331330

332331
fn collect_minimum_serialized_edges(
@@ -420,6 +419,55 @@ where
420419
memo.revisions.verified_final.store(true, Ordering::Release);
421420
}
422421

422+
fn flatten_cycle_head_dependencies(
423+
&self,
424+
zalsa: &Zalsa,
425+
id: Id,
426+
flattened_input_outputs: &mut FxIndexSet<QueryEdge>,
427+
seen: &mut FxHashSet<DatabaseKeyIndex>,
428+
) {
429+
let memo_index = self.memo_ingredient_index(zalsa, id);
430+
let Some(memo) = self.get_memo_from_table_for(zalsa, id, memo_index) else {
431+
return;
432+
};
433+
434+
let database_key_index = self.database_key_index(id);
435+
436+
// Only flatten dependencies of provisional queries, because only those
437+
// contain cyclic dependencies.
438+
if !memo.may_be_provisional() {
439+
flattened_input_outputs.insert(QueryEdge::input(database_key_index));
440+
return;
441+
}
442+
443+
// There's nothing to do if we've visited this query before.
444+
if !seen.insert(database_key_index) {
445+
return;
446+
}
447+
448+
let inputs = memo.revisions.origin.as_ref().inputs();
449+
450+
match C::CYCLE_STRATEGY {
451+
// For queries with cycle handling, simply extend the input/outputs, because
452+
// they already flattened their own dependencies when completing the query.
453+
CycleRecoveryStrategy::FallbackImmediate | CycleRecoveryStrategy::Fixpoint => {
454+
flattened_input_outputs.extend(inputs.map(QueryEdge::input));
455+
}
456+
// For regular queries, recurse
457+
CycleRecoveryStrategy::Panic => {
458+
for input in inputs {
459+
let ingredient = zalsa.lookup_ingredient(input.ingredient_index());
460+
ingredient.flatten_cycle_head_dependencies(
461+
zalsa,
462+
input.key_index(),
463+
flattened_input_outputs,
464+
seen,
465+
);
466+
}
467+
}
468+
}
469+
}
470+
423471
fn cycle_converged(&self, zalsa: &Zalsa, input: Id) -> bool {
424472
let Some(memo) =
425473
self.get_memo_from_table_for(zalsa, input, self.memo_ingredient_index(zalsa, input))

src/function/accumulated.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -79,12 +79,7 @@ where
7979
stack.reserve(edges.len());
8080
}
8181

82-
stack.extend(
83-
origin
84-
.inputs()
85-
.filter_map(|input| TryInto::<DatabaseKeyIndex>::try_into(input).ok())
86-
.rev(),
87-
);
82+
stack.extend(origin.inputs().rev());
8883

8984
visited.reserve(stack.len());
9085
}

src/function/backdate.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ where
2222
// right now whether backdating could be made safe for queries participating in queries.
2323
// TODO: Write a test that demonstrates that backdating queries participating in a cycle isn't safe
2424
// OR write many tests showing that it is (and fixing the case where it didn't correctly account for today).
25-
if !revisions.cycle_heads().is_empty() {
25+
if !revisions.cycle_heads().is_empty() || old_memo.may_be_provisional() {
2626
return;
2727
}
2828

0 commit comments

Comments
 (0)