diff --git a/crates/iota-core/src/authority/authority_store.rs b/crates/iota-core/src/authority/authority_store.rs index b3a595bb720..3beef7700b5 100644 --- a/crates/iota-core/src/authority/authority_store.rs +++ b/crates/iota-core/src/authority/authority_store.rs @@ -48,7 +48,10 @@ use crate::{ AuthorityStorePruner, AuthorityStorePruningMetrics, EPOCH_DURATION_MS_FOR_TESTING, }, authority_store_tables::TotalIotaSupplyCheck, - authority_store_types::{StoreObject, StoreObjectWrapper, get_store_object}, + authority_store_types::{ + SENTINEL_PREVIOUS_TRANSACTION_CHECKPOINT, StoreObject, StoreObjectWrapper, + get_store_object, + }, epoch_start_configuration::{EpochFlag, EpochStartConfiguration}, }, global_state_hasher::GlobalStateHashStore, @@ -707,8 +710,10 @@ impl AuthorityStore { fn insert_object_direct(&self, object_ref: ObjectRef, object: &Object) -> IotaResult { let mut write_batch = self.perpetual_tables.objects.batch(); - // Insert object - let store_object = get_store_object(object.clone()); + // Genesis objects are produced by the genesis checkpoint (sequence 0), + // so 0 is the real `previous_transaction_checkpoint` value here, not a + // sentinel. + let store_object = get_store_object(object.clone(), 0); write_batch.insert_batch( &self.perpetual_tables.objects, std::iter::once((ObjectKey::from(object_ref), store_object)), @@ -737,11 +742,12 @@ impl AuthorityStore { .map(|o| (o.compute_object_reference(), o)) .collect(); + // Genesis objects are produced by the genesis checkpoint (sequence 0). batch.insert_batch( &self.perpetual_tables.objects, ref_and_objects .iter() - .map(|(oref, o)| (ObjectKey::from(oref), get_store_object((*o).clone()))), + .map(|(oref, o)| (ObjectKey::from(oref), get_store_object((*o).clone(), 0))), )?; let non_child_object_refs: Vec<_> = ref_and_objects @@ -766,33 +772,27 @@ impl AuthorityStore { let mut batch = perpetual_db.objects.batch(); for object in live_objects { hasher.update(object.object_reference().digest.inner()); - match object { - LiveObject::Normal(object) => { - let store_object_wrapper = get_store_object(object.clone()); - batch.insert_batch( - &perpetual_db.objects, - std::iter::once(( - ObjectKey::from(object.compute_object_reference()), - store_object_wrapper, - )), - )?; - if !object.is_child_object() { - Self::initialize_live_object_markers( - &perpetual_db.live_owned_object_markers, - &mut batch, - &[object.compute_object_reference()], - )?; - } - } - LiveObject::Wrapped(object_key) => { - batch.insert_batch( - &perpetual_db.objects, - std::iter::once::<(ObjectKey, StoreObjectWrapper)>(( - object_key, - StoreObject::Wrapped.into(), - )), - )?; - } + let LiveObject::Normal(object) = object; + // V2 ref records carry an 8-byte `previous_transaction_checkpoint` + // trailer, but `ObjectRefIter` reads past it without surfacing the + // value, so stamp the sentinel here. + // TODO(snapshot-v2-backfill): rewrite these rows once the iterator + // surfaces the trailer or a backfill is run. + let store_object_wrapper = + get_store_object(object.clone(), SENTINEL_PREVIOUS_TRANSACTION_CHECKPOINT); + batch.insert_batch( + &perpetual_db.objects, + std::iter::once(( + ObjectKey::from(object.compute_object_reference()), + store_object_wrapper, + )), + )?; + if !object.is_child_object() { + Self::initialize_live_object_markers( + &perpetual_db.live_owned_object_markers, + &mut batch, + &[object.compute_object_reference()], + )?; } } let sha3_digest = hasher.finalize().digest; @@ -825,10 +825,18 @@ impl AuthorityStore { /// Internally it checks that all locks for active inputs are at the correct /// version, and then writes objects, certificates, parents and clean up /// locks atomically. + /// + /// `checkpoint_seq` is stamped onto each newly written object's + /// `previous_transaction_checkpoint` field. Callers that buffer execution + /// outputs until checkpoint commit time (e.g. `WritebackCache`) pass the + /// containing checkpoint's sequence number; callers that flush at + /// execution time before the containing checkpoint is known (e.g. + /// `PassthroughCache`) pass `SENTINEL_PREVIOUS_TRANSACTION_CHECKPOINT`. #[instrument(level = "debug", skip_all)] pub fn build_db_batch( &self, epoch_id: EpochId, + checkpoint_seq: CheckpointSequenceNumber, tx_outputs: &[Arc], ) -> IotaResult { let mut written = Vec::with_capacity(tx_outputs.len()); @@ -838,7 +846,12 @@ impl AuthorityStore { let mut write_batch = self.perpetual_tables.transactions.batch(); for outputs in tx_outputs { - self.write_one_transaction_outputs(&mut write_batch, epoch_id, outputs)?; + self.write_one_transaction_outputs( + &mut write_batch, + epoch_id, + checkpoint_seq, + outputs, + )?; } // test crashing before writing the batch fail_point!("crash"); @@ -861,6 +874,7 @@ impl AuthorityStore { &self, write_batch: &mut DBBatch, epoch_id: EpochId, + checkpoint_seq: CheckpointSequenceNumber, tx_outputs: &TransactionOutputs, ) -> IotaResult { let TransactionOutputs { @@ -906,7 +920,7 @@ impl AuthorityStore { let new_objects = written.iter().map(|(id, new_object)| { let version = new_object.version(); trace!(?id, ?version, "writing object"); - let store_object = get_store_object(new_object.clone()); + let store_object = get_store_object(new_object.clone(), checkpoint_seq); (ObjectKey(*id, version), store_object) }); @@ -1465,38 +1479,31 @@ impl AuthorityStore { let (mut total_iota, mut total_storage_rebate) = thread::scope(|s| { let pending_tasks = FuturesUnordered::new(); for o in self.iter_live_object_set() { - match o { - LiveObject::Normal(object) => { - size += object.object_size_for_gas_metering(); - count += 1; - pending_objects.push(object); - if count % 1_000_000 == 0 { - let mut task_objects = vec![]; - mem::swap(&mut pending_objects, &mut task_objects); - pending_tasks.push(s.spawn(move || { - let mut layout_resolver = - executor.type_layout_resolver(Box::new(type_layout_store)); - let mut total_storage_rebate = 0; - let mut total_iota = 0; - for object in task_objects { - total_storage_rebate += object.storage_rebate; - // get_total_iota includes storage rebate, however all storage - // rebate is also stored in - // the storage fund, so we need to subtract it here. - total_iota += - object.get_total_iota(layout_resolver.as_mut()).unwrap() - - object.storage_rebate; - } - if count % 50_000_000 == 0 { - info!("Processed {} objects", count); - } - (total_iota, total_storage_rebate) - })); + let LiveObject::Normal(object) = o; + size += object.object_size_for_gas_metering(); + count += 1; + pending_objects.push(object); + if count % 1_000_000 == 0 { + let mut task_objects = vec![]; + mem::swap(&mut pending_objects, &mut task_objects); + pending_tasks.push(s.spawn(move || { + let mut layout_resolver = + executor.type_layout_resolver(Box::new(type_layout_store)); + let mut total_storage_rebate = 0; + let mut total_iota = 0; + for object in task_objects { + total_storage_rebate += object.storage_rebate; + // get_total_iota includes storage rebate, however all storage + // rebate is also stored in + // the storage fund, so we need to subtract it here. + total_iota += object.get_total_iota(layout_resolver.as_mut()).unwrap() + - object.storage_rebate; } - } - LiveObject::Wrapped(_) => { - unreachable!("Explicitly asked to not include wrapped tombstones") - } + if count % 50_000_000 == 0 { + info!("Processed {} objects", count); + } + (total_iota, total_storage_rebate) + })); } } pending_tasks.into_iter().fold((0, 0), |init, result| { diff --git a/crates/iota-core/src/authority/authority_store_pruner.rs b/crates/iota-core/src/authority/authority_store_pruner.rs index d42e3ad711c..c58cf977215 100644 --- a/crates/iota-core/src/authority/authority_store_pruner.rs +++ b/crates/iota-core/src/authority/authority_store_pruner.rs @@ -952,7 +952,10 @@ mod tests { use crate::authority::{ authority_store_pruner::AuthorityStorePruningMetrics, authority_store_tables::AuthorityPerpetualTables, - authority_store_types::{StoreObject, StoreObjectWrapper, get_store_object}, + authority_store_types::{ + SENTINEL_PREVIOUS_TRANSACTION_CHECKPOINT, StoreObject, StoreObjectWrapper, + get_store_object, + }, }; fn get_keys_after_pruning(path: &Path) -> anyhow::Result> { @@ -1008,7 +1011,10 @@ mod tests { } else { to_delete.push(object_key); } - let obj = get_store_object(Object::immutable_with_id_for_testing(id)); + let obj = get_store_object( + Object::immutable_with_id_for_testing(id), + SENTINEL_PREVIOUS_TRANSACTION_CHECKPOINT, + ); batch.insert_batch( &db.objects, [(ObjectKey(id, SequenceNumber::from(seq)), obj.clone())], @@ -1021,7 +1027,7 @@ mod tests { println!("Adding tombstone object {tombstone_key:?}"); batch.insert_batch( &db.objects, - [(tombstone_key, StoreObjectWrapper::V1(StoreObject::Deleted))], + [(tombstone_key, StoreObjectWrapper::V2(StoreObject::Deleted))], )?; tombstones.push(tombstone_key); } @@ -1124,7 +1130,10 @@ mod tests { if i < num_versions_per_object - 2 { to_delete.push((id, SequenceNumber::from(i))); } - let obj = get_store_object(Object::immutable_with_id_for_testing(id)); + let obj = get_store_object( + Object::immutable_with_id_for_testing(id), + SENTINEL_PREVIOUS_TRANSACTION_CHECKPOINT, + ); perpetual_db .objects .insert(&ObjectKey(id, SequenceNumber::from(i)), &obj)?; diff --git a/crates/iota-core/src/authority/authority_store_tables.rs b/crates/iota-core/src/authority/authority_store_tables.rs index 213908fdbe2..cff1943b1a4 100644 --- a/crates/iota-core/src/authority/authority_store_tables.rs +++ b/crates/iota-core/src/authority/authority_store_tables.rs @@ -28,7 +28,8 @@ use super::*; use crate::authority::{ authority_store_pruner::ObjectsCompactionFilter, authority_store_types::{ - StoreObject, StoreObjectValue, StoreObjectWrapper, get_store_object, try_construct_object, + SENTINEL_PREVIOUS_TRANSACTION_CHECKPOINT, StoreObject, StoreObjectValueV2, + StoreObjectWrapper, get_store_object, try_construct_object, }, epoch_start_configuration::EpochStartConfiguration, }; @@ -261,7 +262,7 @@ impl AuthorityPerpetualTables { fn construct_object( &self, object_key: &ObjectKey, - store_object: StoreObjectValue, + store_object: StoreObjectValueV2, ) -> Result { try_construct_object(object_key, store_object) } @@ -447,11 +448,7 @@ impl AuthorityPerpetualTables { } pub fn iter_live_object_set(&self) -> LiveSetIter<'_> { - LiveSetIter { - iter: Box::new(self.objects.safe_iter()), - tables: self, - prev: None, - } + LiveSetIter(self.iter_live_object_set_v2()) } pub fn range_iter_live_object_set( @@ -462,10 +459,21 @@ impl AuthorityPerpetualTables { let lower_bound = lower_bound.as_ref().map(ObjectKey::min_for_id); let upper_bound = upper_bound.as_ref().map(ObjectKey::max_for_id); - LiveSetIter { + LiveSetIter(LiveSetIterV2 { iter: Box::new(self.objects.safe_iter_with_bounds(lower_bound, upper_bound)), tables: self, prev: None, + }) + } + + /// Like `iter_live_object_set` but additionally surfaces each live + /// object's `previous_transaction_checkpoint`. Used by the snapshot V2 + /// writer to populate the per-object trailer of the reference file. + pub fn iter_live_object_set_v2(&self) -> LiveSetIterV2<'_> { + LiveSetIterV2 { + iter: Box::new(self.objects.safe_iter()), + tables: self, + prev: None, } } @@ -494,7 +502,7 @@ impl AuthorityPerpetualTables { pub fn insert_object_test_only(&self, object: Object) -> IotaResult { let object_reference = object.compute_object_reference(); - let wrapper = get_store_object(object); + let wrapper = get_store_object(object, SENTINEL_PREVIOUS_TRANSACTION_CHECKPOINT); let mut wb = self.objects.batch(); wb.insert_batch( &self.objects, @@ -541,69 +549,104 @@ impl ObjectStore for AuthorityPerpetualTables { } } -pub struct LiveSetIter<'a> { - iter: DbIterator<'a, (ObjectKey, StoreObjectWrapper)>, - tables: &'a AuthorityPerpetualTables, - prev: Option<(ObjectKey, StoreObjectWrapper)>, -} - +/// Yields the same live-object set as `LiveSetIterV2` but strips the +/// per-object `previous_transaction_checkpoint` trailer, for callers that do +/// not need it. +pub struct LiveSetIter<'a>(LiveSetIterV2<'a>); + +/// A row that the live-set iterator surfaces. The previous `Wrapped` variant +/// was removed: `LiveSetIter` filters `StoreObject::Wrapped` and +/// `StoreObject::Deleted` from its output, so wrapped objects never reach +/// downstream consumers (snapshot writer, state-hash accumulator, restore +/// path). +/// +/// Kept as a single-variant enum to bound the blast radius of this PR — +/// every consumer site uses `let LiveObject::Normal(o) = ...;` today, and +/// collapsing to `pub struct LiveObject(pub Object);` (or +/// `pub type LiveObject = Object;`) would touch every such site. The +/// `Serialize`/`Deserialize`/`Hash`/`Eq`/`PartialEq` derives have no +/// current consumer (no call site BCS-encodes, hashes, or equality- +/// compares a `LiveObject`); a follow-up can collapse the type and prune +/// both the derives and the trivial wrapper methods. #[derive(Eq, PartialEq, Debug, Clone, Deserialize, Serialize, Hash)] pub enum LiveObject { Normal(Object), - Wrapped(ObjectKey), } impl LiveObject { pub fn object_id(&self) -> ObjectID { match self { LiveObject::Normal(obj) => obj.id(), - LiveObject::Wrapped(key) => key.0, } } pub fn version(&self) -> SequenceNumber { match self { LiveObject::Normal(obj) => obj.version(), - LiveObject::Wrapped(key) => key.1, } } pub fn object_reference(&self) -> ObjectRef { match self { LiveObject::Normal(obj) => obj.compute_object_reference(), - LiveObject::Wrapped(key) => ObjectRef::new(key.0, key.1, ObjectDigest::OBJECT_WRAPPED), } } pub fn to_normal(self) -> Option { match self { LiveObject::Normal(object) => Some(object), - LiveObject::Wrapped(_) => None, } } } -impl LiveSetIter<'_> { +impl Iterator for LiveSetIter<'_> { + type Item = LiveObject; + + fn next(&mut self) -> Option { + self.0.next().map(|v| v.live) + } +} + +/// A live object together with the checkpoint sequence number that contained +/// the transaction whose effects produced this object version. Yielded by +/// `LiveSetIterV2`. +#[derive(Eq, PartialEq, Debug, Clone)] +pub struct LiveObjectV2 { + pub live: LiveObject, + pub previous_transaction_checkpoint: CheckpointSequenceNumber, +} + +pub struct LiveSetIterV2<'a> { + iter: DbIterator<'a, (ObjectKey, StoreObjectWrapper)>, + tables: &'a AuthorityPerpetualTables, + prev: Option<(ObjectKey, StoreObjectWrapper)>, +} + +impl LiveSetIterV2<'_> { fn store_object_wrapper_to_live_object( &self, object_key: ObjectKey, store_object: StoreObjectWrapper, - ) -> Option { + ) -> Option { match store_object.migrate().into_inner() { - StoreObject::Value(object) => { + StoreObject::Value(value) => { + let previous_transaction_checkpoint = value.previous_transaction_checkpoint; let object = self .tables - .construct_object(&object_key, *object) + .construct_object(&object_key, *value) .expect("Constructing object from store cannot fail"); - Some(LiveObject::Normal(object)) + Some(LiveObjectV2 { + live: LiveObject::Normal(object), + previous_transaction_checkpoint, + }) } StoreObject::Wrapped | StoreObject::Deleted => None, } } } -impl Iterator for LiveSetIter<'_> { - type Item = LiveObject; +impl Iterator for LiveSetIterV2<'_> { + type Item = LiveObjectV2; fn next(&mut self) -> Option { loop { @@ -688,3 +731,105 @@ fn events_table_config(db_options: DBOptions) -> DBOptions { .optimize_for_write_throughput() .optimize_for_read(read_size_from_env(ENV_VAR_EVENTS_BLOCK_CACHE_SIZE).unwrap_or(1024)) } + +#[cfg(test)] +mod tests { + use iota_types::base_types::ObjectID; + + use super::*; + use crate::authority::authority_store_types::StoreObjectV2; + + /// Combined into one `#[tokio::test]` to sidestep the + /// `typed_store::DBMetrics` global Prometheus registry race (concurrent + /// `AuthorityPerpetualTables::open` calls hit `AlreadyReg`). The two cases + /// are independent; do not split until the metrics registry is made + /// re-entrant. + #[tokio::test] + async fn live_set_iter_invariants() { + live_set_iter_filters_wrapped_and_deleted_store_rows(); + live_set_iter_v2_propagates_previous_transaction_checkpoint(); + } + + /// `LiveSetIter` must filter `StoreObject::Wrapped` and + /// `StoreObject::Deleted` rows at the source so downstream consumers + /// (snapshot writer, state-hash accumulator, restore path) only ever + /// observe live `Normal` objects. This invariant is what lets + /// `LiveObject` carry only the `Normal` variant. + fn live_set_iter_filters_wrapped_and_deleted_store_rows() { + let tmp_dir = iota_common::tempdir(); + let perpetual_db = AuthorityPerpetualTables::open(tmp_dir.path(), None); + + // A live `Normal` row alongside `Wrapped` and `Deleted` tombstones for + // distinct object IDs. + let live_id = ObjectID::random(); + let wrapped_id = ObjectID::random(); + let deleted_id = ObjectID::random(); + + let live_object = Object::immutable_with_id_for_testing(live_id); + perpetual_db.insert_object_test_only(live_object).unwrap(); + + let mut wb = perpetual_db.objects.batch(); + let wrapped_key = ObjectKey(wrapped_id, SequenceNumber::from_u64(1)); + wb.insert_batch( + &perpetual_db.objects, + std::iter::once::<(ObjectKey, StoreObjectWrapper)>(( + wrapped_key, + StoreObjectV2::Wrapped.into(), + )), + ) + .unwrap(); + let deleted_key = ObjectKey(deleted_id, SequenceNumber::from_u64(1)); + wb.insert_batch( + &perpetual_db.objects, + std::iter::once::<(ObjectKey, StoreObjectWrapper)>(( + deleted_key, + StoreObjectV2::Deleted.into(), + )), + ) + .unwrap(); + wb.write().unwrap(); + + let yielded: Vec<_> = perpetual_db.iter_live_object_set().collect(); + assert_eq!(yielded.len(), 1, "wrapped/deleted rows must be filtered"); + let LiveObject::Normal(only) = yielded.into_iter().next().unwrap(); + assert_eq!(only.id(), live_id); + } + + /// `LiveSetIterV2` must surface the exact `previous_transaction_checkpoint` + /// stored on `StoreObjectValueV2` — it is the load-bearing input to the + /// snapshot V2 writer's per-record trailer. A bug that, e.g., always + /// stamped `0` here would silently corrupt every snapshot's per-record + /// trailer; this is the focused canary for that contract. + fn live_set_iter_v2_propagates_previous_transaction_checkpoint() { + let tmp_dir = iota_common::tempdir(); + let perpetual_db = AuthorityPerpetualTables::open(tmp_dir.path(), None); + + // Insert a live object via the standard test path. This stamps the + // sentinel checkpoint by default; we then overwrite the row with a + // hand-built value carrying a distinct, recognizable checkpoint. + let object = Object::immutable_with_id_for_testing(ObjectID::random()); + let object_ref = object.compute_object_reference(); + let object_key = ObjectKey::from(object_ref); + let distinct_checkpoint: u64 = 0xCAFE_F00D_BEEF_1234; + + let store_object_value = match get_store_object(object, distinct_checkpoint).into_inner() { + StoreObject::Value(value) => value, + other => panic!("expected StoreObject::Value, got {other:?}"), + }; + let wrapper: StoreObjectWrapper = StoreObjectV2::Value(store_object_value).into(); + let mut wb = perpetual_db.objects.batch(); + wb.insert_batch( + &perpetual_db.objects, + std::iter::once((object_key, wrapper)), + ) + .unwrap(); + wb.write().unwrap(); + + let yielded: Vec<_> = perpetual_db.iter_live_object_set_v2().collect(); + assert_eq!(yielded.len(), 1); + assert_eq!( + yielded[0].previous_transaction_checkpoint, distinct_checkpoint, + "LiveSetIterV2 must surface the on-row checkpoint, not a default" + ); + } +} diff --git a/crates/iota-core/src/authority/authority_store_types.rs b/crates/iota-core/src/authority/authority_store_types.rs index 635a6e9674d..2ec81650356 100644 --- a/crates/iota-core/src/authority/authority_store_types.rs +++ b/crates/iota-core/src/authority/authority_store_types.rs @@ -6,52 +6,67 @@ use iota_types::{ base_types::{StructTag, TransactionDigest}, coin::Coin, error::IotaError, + messages_checkpoint::CheckpointSequenceNumber, move_package::MovePackage, object::{Data, MoveObject, MoveObjectExt, Object, ObjectInner, Owner}, storage::ObjectKey, }; use serde::{Deserialize, Serialize}; +/// Sentinel placed on a `StoreObjectValueV2.previous_transaction_checkpoint` +/// when a `StoreObjectV1` row is lifted to `StoreObjectV2` by `migrate()` +/// before the migration backfill has rewritten it with the real value. +/// +/// `u64::MAX` is unambiguous because no real checkpoint will ever reach it. +/// Post-backfill, no live-set row should hold this value; this is assertable +/// in tests and operational checks. +/// +/// TODO(snapshot-v2-backfill): a one-time backfill must rewrite every row +/// that holds this sentinel — V1 rows lifted via `migrate()`, rows stamped +/// on the `PassthroughCache` synchronous-write path, and rows stamped on +/// the snapshot restore path — with the real +/// `previous_transaction_checkpoint`. +pub const SENTINEL_PREVIOUS_TRANSACTION_CHECKPOINT: CheckpointSequenceNumber = u64::MAX; + // Versioning process: // -// Object storage versioning is done lazily (at read time) - therefore we must +// Object storage versioning is done lazily (at read time) — therefore we must // always preserve the code for reading the very first storage version. For all // versions, a migration function // // f(V_n) -> V_(n+1) // // must be defined. This way we can iteratively migrate the very oldest version -// to the very newest version at any point in the future. +// to the very newest version at any point in the future. Reads call +// `StoreObjectWrapper::migrate().into_inner()` to lift older versions to the +// latest before use; see `AuthorityPerpetualTables::object`. // // To change the format of the object table value types (StoreObject and // StoreMoveObject), use the following process: -// - Add a new variant to the enum to store the new version type. -// - Extend the `migrate` functions to migrate from the previous version to the -// new version. -// - Change `From for StoreObjectPair` to create the newest version -// only. -// -// Additionally, the first time we version these formats, we will need to: -// - Add a check in the `TryFrom for Object` to see if the -// object that was just read is the latest version. -// - If it is not, use the migration function (as explained above) to migrate it -// to the next version. -// - Repeat until we have arrive at the current version. +// - Add a new variant `StoreObjectV{N+1}` to the `StoreObjectWrapper` enum. +// - Define `From for StoreObjectV{N+1}` for the lift, and +// extend `migrate()` to chain `V{N}` → `V{N+1}`. +// - Advance `pub type StoreObject = StoreObjectV{N+1}` and update +// `From for StoreObjectWrapper` to wrap the new variant. +// - Update `get_store_object` (and any other writers) to construct the new +// value type directly. /// Enum wrapper for versioning #[derive(Eq, PartialEq, Debug, Clone, Deserialize, Serialize, Hash)] pub enum StoreObjectWrapper { V1(StoreObjectV1), + V2(StoreObjectV2), } // always points to latest version. -pub type StoreObject = StoreObjectV1; +pub type StoreObject = StoreObjectV2; impl StoreObjectWrapper { pub fn migrate(self) -> Self { - // TODO: when there are multiple versions, we must iteratively migrate from - // version N to N+1 until we arrive at the latest version - self + match self { + Self::V1(v1) => Self::V2(v1.into()), + v2 @ Self::V2(_) => v2, + } } // Always returns the most recent version. Older versions are migrated to the @@ -59,27 +74,25 @@ impl StoreObjectWrapper { // versions. pub fn inner(&self) -> &StoreObject { match self { - Self::V1(v1) => v1, - - // can remove #[expect] when there are multiple versions - #[expect(unreachable_patterns)] - _ => panic!("object should have been migrated to latest version at read time"), + Self::V2(v2) => v2, + Self::V1(_) => { + panic!("object should have been migrated to latest version at read time") + } } } pub fn into_inner(self) -> StoreObject { match self { - Self::V1(v1) => v1, - - // can remove #[expect] when there are multiple versions - #[expect(unreachable_patterns)] - _ => panic!("object should have been migrated to latest version at read time"), + Self::V2(v2) => v2, + Self::V1(_) => { + panic!("object should have been migrated to latest version at read time") + } } } } impl From for StoreObjectWrapper { fn from(o: StoreObject) -> Self { - StoreObjectWrapper::V1(o) + StoreObjectWrapper::V2(o) } } @@ -100,6 +113,50 @@ pub struct StoreObjectValue { pub storage_rebate: u64, } +/// Latest stored object format. Adds `previous_transaction_checkpoint` to +/// `Value`, recording the checkpoint sequence number that contained the +/// transaction whose effects produced this object version. A future snapshot +/// V2 writer will read this field and emit it on each reference record so +/// consumers can attribute live objects to the checkpoint that produced +/// their current version. +#[derive(Eq, PartialEq, Debug, Clone, Deserialize, Serialize, Hash)] +pub enum StoreObjectV2 { + Value(Box), + Deleted, + Wrapped, +} + +/// V2 of [`StoreObjectValue`]. Adds `previous_transaction_checkpoint`, +/// the checkpoint sequence number that contained `previous_transaction`. +#[derive(Eq, PartialEq, Debug, Clone, Deserialize, Serialize, Hash)] +pub struct StoreObjectValueV2 { + pub data: StoreData, + pub owner: Owner, + pub previous_transaction: TransactionDigest, + /// Checkpoint sequence number of the checkpoint that contained + /// `previous_transaction`. `SENTINEL_PREVIOUS_TRANSACTION_CHECKPOINT` + /// indicates the row was lifted from `StoreObjectV1` by `migrate()` and + /// has not yet been rewritten by the migration backfill. + pub previous_transaction_checkpoint: CheckpointSequenceNumber, + pub storage_rebate: u64, +} + +impl From for StoreObjectV2 { + fn from(v1: StoreObjectV1) -> Self { + match v1 { + StoreObjectV1::Value(v1_value) => Self::Value(Box::new(StoreObjectValueV2 { + data: v1_value.data, + owner: v1_value.owner, + previous_transaction: v1_value.previous_transaction, + previous_transaction_checkpoint: SENTINEL_PREVIOUS_TRANSACTION_CHECKPOINT, + storage_rebate: v1_value.storage_rebate, + })), + StoreObjectV1::Deleted => Self::Deleted, + StoreObjectV1::Wrapped => Self::Wrapped, + } + } +} + /// Forked version of [`iota_types::object::Data`] /// Adds extra enum value `IndirectObject`, which represents a reference to an /// object stored separately @@ -111,7 +168,17 @@ pub enum StoreData { Coin(u64), } -pub fn get_store_object(object: Object) -> StoreObjectWrapper { +/// Build a `StoreObjectWrapper` for a newly written object version. The caller +/// supplies the checkpoint sequence number that contains the transaction whose +/// effects produced this object version; pass +/// `SENTINEL_PREVIOUS_TRANSACTION_CHECKPOINT` at execution-time write paths +/// where the containing checkpoint is not yet known (those rows are rewritten +/// at checkpoint commit time on caches that buffer, or by the migration +/// backfill otherwise). +pub fn get_store_object( + object: Object, + previous_transaction_checkpoint: CheckpointSequenceNumber, +) -> StoreObjectWrapper { let object = object.into_inner(); let data = match object.data { @@ -129,10 +196,11 @@ pub fn get_store_object(object: Object) -> StoreObjectWrapper { } } }; - let store_object = StoreObjectValue { + let store_object = StoreObjectValueV2 { data, owner: object.owner, previous_transaction: object.previous_transaction, + previous_transaction_checkpoint, storage_rebate: object.storage_rebate, }; StoreObject::Value(Box::new(store_object)).into() @@ -140,7 +208,7 @@ pub fn get_store_object(object: Object) -> StoreObjectWrapper { pub(crate) fn try_construct_object( object_key: &ObjectKey, - store_object: StoreObjectValue, + store_object: StoreObjectValueV2, ) -> Result { let data = match store_object.data { StoreData::Move(object) => Data::Struct(object), @@ -166,3 +234,126 @@ pub(crate) fn try_construct_object( } .into()) } + +#[cfg(test)] +mod tests { + use iota_types::base_types::TransactionDigest; + + use super::*; + + fn v1_value() -> StoreObjectValue { + StoreObjectValue { + data: StoreData::Coin(42), + owner: Owner::Immutable, + previous_transaction: TransactionDigest::random(), + storage_rebate: 7, + } + } + + #[test] + fn migrate_v1_value_lifts_with_sentinel() { + let v1 = v1_value(); + let wrapped = StoreObjectWrapper::V1(StoreObjectV1::Value(Box::new(v1.clone()))).migrate(); + let StoreObjectWrapper::V2(StoreObjectV2::Value(v2_value)) = wrapped else { + panic!("expected V2(Value), got {wrapped:?}"); + }; + assert_eq!(v2_value.data, v1.data); + assert_eq!(v2_value.owner, v1.owner); + assert_eq!(v2_value.previous_transaction, v1.previous_transaction); + assert_eq!(v2_value.storage_rebate, v1.storage_rebate); + assert_eq!( + v2_value.previous_transaction_checkpoint, + SENTINEL_PREVIOUS_TRANSACTION_CHECKPOINT + ); + } + + #[test] + fn migrate_v1_tombstones_lift_to_v2_tombstones() { + let deleted = StoreObjectWrapper::V1(StoreObjectV1::Deleted).migrate(); + assert!(matches!( + deleted, + StoreObjectWrapper::V2(StoreObjectV2::Deleted) + )); + + let wrapped = StoreObjectWrapper::V1(StoreObjectV1::Wrapped).migrate(); + assert!(matches!( + wrapped, + StoreObjectWrapper::V2(StoreObjectV2::Wrapped) + )); + } + + #[test] + fn v1_wrapper_bcs_stays_at_discriminant_zero() { + // Critical invariant: V1 must remain at BCS discriminant 0 so existing + // on-disk V1 rows decode correctly after V2 was added to the enum. + // Reordering StoreObjectWrapper variants would break this. + let v1 = StoreObjectWrapper::V1(StoreObjectV1::Value(Box::new(v1_value()))); + let bytes = bcs::to_bytes(&v1).unwrap(); + assert_eq!(bytes[0], 0, "V1 must remain at BCS discriminant 0"); + + let decoded: StoreObjectWrapper = bcs::from_bytes(&bytes).unwrap(); + assert!(matches!(decoded, StoreObjectWrapper::V1(_))); + } + + #[test] + fn migrate_v2_is_identity() { + let v2 = StoreObjectV2::Value(Box::new(StoreObjectValueV2 { + data: StoreData::Coin(1), + owner: Owner::Immutable, + previous_transaction: TransactionDigest::random(), + previous_transaction_checkpoint: 100, + storage_rebate: 0, + })); + let wrapper = StoreObjectWrapper::V2(v2.clone()); + let migrated = wrapper.migrate(); + let StoreObjectWrapper::V2(out) = migrated else { + panic!("V2 should remain V2 after migrate(), got {migrated:?}"); + }; + assert_eq!(out, v2); + } + + /// Locks the BCS field-order layout of `StoreObjectValueV2` against a + /// golden byte vector. Reordering or renaming any field would silently + /// corrupt every on-disk V2 row; this test fails loudly on any such + /// change. If a deliberate schema change is required, follow the + /// versioning recipe at the top of this file (introduce + /// `StoreObjectV3` rather than mutating `V2`). + /// + /// Field order: data | owner | previous_transaction | + /// previous_transaction_checkpoint | storage_rebate. + #[test] + fn store_object_value_v2_bcs_layout_is_locked() { + let v2 = StoreObjectValueV2 { + data: StoreData::Coin(0x0102_0304_0506_0708), + owner: Owner::Immutable, + previous_transaction: TransactionDigest::ZERO, + previous_transaction_checkpoint: 0x1011_1213_1415_1617, + storage_rebate: 0x2021_2223_2425_2627, + }; + let bytes = bcs::to_bytes(&v2).unwrap(); + + let mut golden: Vec = Vec::new(); + // `StoreData::Coin(u64)` — the four-variant `StoreData` enum places + // `Coin` at variant tag 3, followed by the u64 in little-endian. + golden.push(0x03); + golden.extend_from_slice(&0x0102_0304_0506_0708u64.to_le_bytes()); + // `Owner::Immutable` — Owner uses a custom serializer that maps to + // `ReadableOwner` (AddressOwner=0, ObjectOwner=1, Shared=2, + // Immutable=3), so Immutable encodes as a single tag byte. + golden.push(0x03); + // `TransactionDigest::ZERO` — Digest's binary BCS form is + // length-prefixed: ULEB128 length 32 (=`0x20`) + 32 zero bytes. + golden.push(0x20); + golden.extend_from_slice(&[0u8; 32]); + // `previous_transaction_checkpoint: u64` — little-endian. + golden.extend_from_slice(&0x1011_1213_1415_1617u64.to_le_bytes()); + // `storage_rebate: u64` — little-endian. + golden.extend_from_slice(&0x2021_2223_2425_2627u64.to_le_bytes()); + + assert_eq!( + bytes, golden, + "StoreObjectValueV2 BCS layout changed; introduce a new StoreObject \ + version rather than mutating V2" + ); + } +} diff --git a/crates/iota-core/src/authority/test_authority_builder.rs b/crates/iota-core/src/authority/test_authority_builder.rs index c1e110cb30b..cf0e6cf652e 100644 --- a/crates/iota-core/src/authority/test_authority_builder.rs +++ b/crates/iota-core/src/authority/test_authority_builder.rs @@ -421,9 +421,11 @@ impl<'a> TestAuthorityBuilder<'a> { ) .unwrap(); - let batch = state - .get_cache_commit() - .build_db_batch(epoch_store.epoch(), &[*genesis.transaction().digest()]); + let batch = state.get_cache_commit().build_db_batch( + epoch_store.epoch(), + genesis.checkpoint().sequence_number, + &[*genesis.transaction().digest()], + ); state.get_cache_commit().commit_transaction_outputs( epoch_store.epoch(), diff --git a/crates/iota-core/src/checkpoints/checkpoint_executor/mod.rs b/crates/iota-core/src/checkpoints/checkpoint_executor/mod.rs index 70a6b7b64c6..64e1cb6e055 100644 --- a/crates/iota-core/src/checkpoints/checkpoint_executor/mod.rs +++ b/crates/iota-core/src/checkpoints/checkpoint_executor/mod.rs @@ -31,6 +31,7 @@ use iota_types::{ base_types::{TransactionDigest, TransactionEffectsDigest}, crypto::RandomnessRound, effects::{TransactionEffects, TransactionEffectsAPI}, + epoch_info::EpochInfoEntry, executable_transaction::VerifiedExecutableTransaction, full_checkpoint_content::CheckpointData, global_state_hash::GlobalStateHash, @@ -344,10 +345,11 @@ impl CheckpointExecutor { let seq = ckpt_state.data.checkpoint.sequence_number; - let batch = self - .state - .get_cache_commit() - .build_db_batch(self.epoch_store.epoch(), &ckpt_state.data.tx_digests); + let batch = self.state.get_cache_commit().build_db_batch( + self.epoch_store.epoch(), + seq, + &ckpt_state.data.tx_digests, + ); finish_stage!(pipeline_handle, BuildDbBatch); @@ -435,6 +437,8 @@ impl CheckpointExecutor { .insert_epoch_last_checkpoint(self.epoch_store.epoch(), &ckpt_state.data.checkpoint) .expect("Failed to insert epoch last checkpoint"); + self.write_epoch_info_entry(&ckpt_state.data); + self.global_state_hasher .accumulate_epoch(self.epoch_store.clone(), seq) .expect("Accumulating epoch cannot fail"); @@ -900,6 +904,33 @@ impl CheckpointExecutor { .await; } + // Write the per-epoch metadata used by the snapshot V2 writer to populate + // the `EPOCH_INFO` file. Called once per epoch on every node after the + // AdvanceEpoch tx has executed and its events have been committed. + fn write_epoch_info_entry(&self, ckpt_data: &CheckpointExecutionData) { + let epoch = self.epoch_store.epoch(); + let last_tx_digest = ckpt_data + .tx_digests + .last() + .expect("end-of-epoch checkpoint always contains the AdvanceEpoch tx"); + let end_of_epoch_tx_events = self + .transaction_cache_reader + .get_events(last_tx_digest) + .expect("AdvanceEpoch tx events must be committed before write_epoch_info_entry"); + let first_checkpoint = self + .checkpoint_store + .get_epoch_first_checkpoint(epoch) + .expect("Failed to read previous epoch's last checkpoint"); + let entry = EpochInfoEntry { + last_checkpoint_summary: ckpt_data.checkpoint.clone().into_inner(), + first_checkpoint, + end_of_epoch_tx_events, + }; + self.checkpoint_store + .insert_epoch_info(epoch, &entry) + .expect("Failed to insert epoch info entry"); + } + // Increment the highest executed checkpoint watermark and prune old // full-checkpoint contents #[instrument(level = "debug", skip_all)] diff --git a/crates/iota-core/src/checkpoints/checkpoint_executor/tests.rs b/crates/iota-core/src/checkpoints/checkpoint_executor/tests.rs index 59d4ffef2f1..76ba8dc0367 100644 --- a/crates/iota-core/src/checkpoints/checkpoint_executor/tests.rs +++ b/crates/iota-core/src/checkpoints/checkpoint_executor/tests.rs @@ -492,3 +492,30 @@ fn sync_checkpoint( .update_highest_synced_checkpoint(checkpoint) .unwrap(); } + +/// Focused structural test for [`CheckpointStore::get_epoch_first_checkpoint`], +/// the helper that `write_epoch_info_entry` uses to populate +/// `EpochInfoEntry::first_checkpoint`. Covers both the genesis branch (`epoch +/// == 0` returns `0` without consulting the store) and the inductive branch +/// (`epoch > 0` returns prev-last + 1). +#[tokio::test] +async fn get_epoch_first_checkpoint_genesis_and_inductive() { + let tmp_dir = iota_common::tempdir(); + let checkpoint_store = CheckpointStore::new(tmp_dir.path()); + + // Genesis: returns 0 even when the store has no prior epoch. + assert_eq!(checkpoint_store.get_epoch_first_checkpoint(0).unwrap(), 0); + + // Inductive: epoch 1's first checkpoint is epoch 0's last + 1. + let committee = CommitteeFixture::generate(rand::rngs::OsRng, 0, 4); + let checkpoints = sync_new_checkpoints(&checkpoint_store, 7, None, &committee); + let last_of_epoch_zero = checkpoints.last().unwrap(); + checkpoint_store + .insert_epoch_last_checkpoint(0, last_of_epoch_zero) + .unwrap(); + + assert_eq!( + checkpoint_store.get_epoch_first_checkpoint(1).unwrap(), + last_of_epoch_zero.sequence_number() + 1 + ); +} diff --git a/crates/iota-core/src/checkpoints/mod.rs b/crates/iota-core/src/checkpoints/mod.rs index b8a191f80bd..a46a420a4db 100644 --- a/crates/iota-core/src/checkpoints/mod.rs +++ b/crates/iota-core/src/checkpoints/mod.rs @@ -29,6 +29,7 @@ use iota_types::{ crypto::AuthorityStrongQuorumSignInfo, digests::{CheckpointContentsDigest, CheckpointDigest}, effects::{TransactionEffects, TransactionEffectsAPI}, + epoch_info::EpochInfoEntry, error::{IotaError, IotaResult}, event::SystemEpochInfoEvent, iota_system_state::{ @@ -177,6 +178,12 @@ pub struct CheckpointStoreTables { /// that epoch. epoch_last_checkpoint_map: DBMap, + /// Per-epoch metadata sufficient to rebuild a per-epoch summary table + /// without reading historical checkpoint contents. Appended at each + /// AdvanceEpoch transaction by the checkpoint executor and read by the + /// snapshot V2 writer to produce the snapshot's `EPOCH_INFO` file. + epoch_info: DBMap, + /// Watermarks used to determine the highest verified, fully synced, and /// fully executed checkpoints pub(crate) watermarks: DBMap, @@ -808,6 +815,27 @@ impl CheckpointStore { Ok(checkpoint) } + /// First checkpoint sequence number of `epoch_id`. For genesis (epoch 0) + /// this is `0`; for later epochs it equals the prior epoch's last + /// checkpoint sequence number plus one. Caller must have committed the + /// previous epoch's last checkpoint before invoking this for + /// `epoch_id > 0`; the missing-prev-checkpoint case panics rather than + /// returning `Ok(None)`, since for any epoch the executor has reached + /// the previous epoch must have been finalized. + pub fn get_epoch_first_checkpoint( + &self, + epoch_id: EpochId, + ) -> IotaResult { + if epoch_id == 0 { + Ok(0) + } else { + let prev = self + .get_epoch_last_checkpoint(epoch_id - 1)? + .expect("Previous epoch must have a recorded last checkpoint"); + Ok(prev.sequence_number() + 1) + } + } + pub fn insert_epoch_last_checkpoint( &self, epoch_id: EpochId, @@ -819,6 +847,15 @@ impl CheckpointStore { Ok(()) } + pub fn get_epoch_info(&self, epoch_id: EpochId) -> IotaResult> { + Ok(self.tables.epoch_info.get(&epoch_id)?) + } + + pub fn insert_epoch_info(&self, epoch_id: EpochId, entry: &EpochInfoEntry) -> IotaResult { + self.tables.epoch_info.insert(&epoch_id, entry)?; + Ok(()) + } + pub fn get_epoch_state_commitments( &self, epoch: EpochId, diff --git a/crates/iota-core/src/execution_cache.rs b/crates/iota-core/src/execution_cache.rs index 3d974b66a69..efcdb3c8d37 100644 --- a/crates/iota-core/src/execution_cache.rs +++ b/crates/iota-core/src/execution_cache.rs @@ -277,7 +277,16 @@ pub type Batch = (Vec>, DBBatch); pub trait ExecutionCacheCommit: Send + Sync { /// Build a DBBatch containing the given transaction outputs. - fn build_db_batch(&self, epoch: EpochId, digests: &[TransactionDigest]) -> Batch; + /// + /// `checkpoint_seq` is the sequence number of the checkpoint that contains + /// these transactions; it is stamped onto each newly written object's + /// `previous_transaction_checkpoint` field. + fn build_db_batch( + &self, + epoch: EpochId, + checkpoint_seq: CheckpointSequenceNumber, + digests: &[TransactionDigest], + ) -> Batch; /// Durably commit the outputs of the given transactions to the database. /// Will be called by CheckpointExecutor to ensure that transaction outputs diff --git a/crates/iota-core/src/execution_cache/passthrough_cache.rs b/crates/iota-core/src/execution_cache/passthrough_cache.rs index ffcd9ac0b98..83996290a88 100644 --- a/crates/iota-core/src/execution_cache/passthrough_cache.rs +++ b/crates/iota-core/src/execution_cache/passthrough_cache.rs @@ -36,6 +36,7 @@ use crate::{ AuthorityStore, authority_per_epoch_store::AuthorityPerEpochStore, authority_store::{ExecutionLockWriteGuard, IotaLockResult}, + authority_store_types::SENTINEL_PREVIOUS_TRANSACTION_CHECKPOINT, epoch_start_configuration::{EpochFlag, EpochStartConfiguration}, }, global_state_hasher::GlobalStateHashStore, @@ -303,9 +304,17 @@ impl ExecutionCacheWrite for PassthroughCache { self.store .check_owned_objects_are_live(&tx_outputs.live_object_markers_to_delete)?; - let batch = self - .store - .build_db_batch(epoch_id, std::slice::from_ref(&tx_outputs))?; + // PassthroughCache writes to RocksDB at execution time, before the + // containing checkpoint's sequence number is known. Stamp the + // sentinel here; a backfill must rewrite these rows with real + // values before the snapshot V2 writer reads them. + // TODO(snapshot-v2-backfill): rewrite rows produced on this path + // with their real `previous_transaction_checkpoint`. + let batch = self.store.build_db_batch( + epoch_id, + SENTINEL_PREVIOUS_TRANSACTION_CHECKPOINT, + std::slice::from_ref(&tx_outputs), + )?; batch.write()?; self.executed_effects_digests_notify_read @@ -368,7 +377,12 @@ impl GlobalStateHashStore for PassthroughCache { } impl ExecutionCacheCommit for PassthroughCache { - fn build_db_batch(&self, _epoch: EpochId, _digests: &[TransactionDigest]) -> Batch { + fn build_db_batch( + &self, + _epoch: EpochId, + _checkpoint_seq: CheckpointSequenceNumber, + _digests: &[TransactionDigest], + ) -> Batch { // Nothing needs to be done since they were already committed in // write_transaction_outputs (vec![], self.store.perpetual_tables.transactions.batch()) @@ -426,7 +440,8 @@ impl PassthroughCache { let object_ref = object.compute_object_reference(); let object_key = ObjectKey::from(object_ref); - let store_object = get_store_object(object.clone()); + let store_object = + get_store_object(object.clone(), SENTINEL_PREVIOUS_TRANSACTION_CHECKPOINT); self.store .perpetual_tables .objects diff --git a/crates/iota-core/src/execution_cache/proxy_cache.rs b/crates/iota-core/src/execution_cache/proxy_cache.rs index 5581dd2152e..8c419a51733 100644 --- a/crates/iota-core/src/execution_cache/proxy_cache.rs +++ b/crates/iota-core/src/execution_cache/proxy_cache.rs @@ -331,8 +331,13 @@ impl GlobalStateHashStore for ProxyCache { } impl ExecutionCacheCommit for ProxyCache { - fn build_db_batch(&self, epoch: EpochId, digests: &[TransactionDigest]) -> super::Batch { - delegate_method!(self.build_db_batch(epoch, digests)) + fn build_db_batch( + &self, + epoch: EpochId, + checkpoint_seq: CheckpointSequenceNumber, + digests: &[TransactionDigest], + ) -> super::Batch { + delegate_method!(self.build_db_batch(epoch, checkpoint_seq, digests)) } fn try_commit_transaction_outputs( diff --git a/crates/iota-core/src/execution_cache/unit_tests/writeback_cache_tests.rs b/crates/iota-core/src/execution_cache/unit_tests/writeback_cache_tests.rs index 7ba08ab7b64..5eb09e81725 100644 --- a/crates/iota-core/src/execution_cache/unit_tests/writeback_cache_tests.rs +++ b/crates/iota-core/src/execution_cache/unit_tests/writeback_cache_tests.rs @@ -30,7 +30,11 @@ use tokio::sync::RwLock; use super::*; use crate::{ - authority::{AuthorityState, AuthorityStore, test_authority_builder::TestAuthorityBuilder}, + authority::{ + AuthorityState, AuthorityStore, + authority_store_types::SENTINEL_PREVIOUS_TRANSACTION_CHECKPOINT, + test_authority_builder::TestAuthorityBuilder, + }, execution_cache::ExecutionCacheAPI, }; @@ -372,7 +376,9 @@ impl Scenario { // commit a transaction to the database pub async fn commit(&mut self, tx: TransactionDigest) { - let batch = self.cache().build_db_batch(1, &[tx]); + let batch = self + .cache() + .build_db_batch(1, SENTINEL_PREVIOUS_TRANSACTION_CHECKPOINT, &[tx]); self.cache().commit_transaction_outputs(1, batch, &[tx]); self.count_action(); } @@ -400,9 +406,7 @@ impl Scenario { self.objects.clear(); self.store.iter_live_object_set().for_each(|o| { - let LiveObject::Normal(o) = o else { - panic!("expected normal object") - }; + let LiveObject::Normal(o) = o; let id = o.id(); // genesis objects are not managed by Scenario, ignore them if reverse_id_map.contains_key(&id) { @@ -569,7 +573,9 @@ async fn test_committed() { s.assert_live(&[1, 2]); s.assert_dirty(&[1, 2]); - let batch = s.cache().build_db_batch(1, &[tx]); + let batch = s + .cache() + .build_db_batch(1, SENTINEL_PREVIOUS_TRANSACTION_CHECKPOINT, &[tx]); s.cache().commit_transaction_outputs(1, batch, &[tx]); s.assert_not_dirty(&[1, 2]); s.assert_cached(&[1, 2]); diff --git a/crates/iota-core/src/execution_cache/writeback_cache.rs b/crates/iota-core/src/execution_cache/writeback_cache.rs index 8b18345814d..f25b474edd9 100644 --- a/crates/iota-core/src/execution_cache/writeback_cache.rs +++ b/crates/iota-core/src/execution_cache/writeback_cache.rs @@ -975,7 +975,12 @@ impl WritebackCache { Ok(()) } - fn build_db_batch(&self, epoch: EpochId, digests: &[TransactionDigest]) -> Batch { + fn build_db_batch( + &self, + epoch: EpochId, + checkpoint_seq: CheckpointSequenceNumber, + digests: &[TransactionDigest], + ) -> Batch { let _metrics_guard = iota_metrics::monitored_scope("WritebackCache::build_db_batch"); let mut all_outputs = Vec::with_capacity(digests.len()); for tx in digests { @@ -1000,7 +1005,7 @@ impl WritebackCache { let batch = self .store - .build_db_batch(epoch, &all_outputs) + .build_db_batch(epoch, checkpoint_seq, &all_outputs) .expect("db error"); (all_outputs, batch) } @@ -1329,8 +1334,13 @@ impl WritebackCache { impl ExecutionCacheAPI for WritebackCache {} impl ExecutionCacheCommit for WritebackCache { - fn build_db_batch(&self, epoch: EpochId, digests: &[TransactionDigest]) -> Batch { - self.build_db_batch(epoch, digests) + fn build_db_batch( + &self, + epoch: EpochId, + checkpoint_seq: CheckpointSequenceNumber, + digests: &[TransactionDigest], + ) -> Batch { + self.build_db_batch(epoch, checkpoint_seq, digests) } fn try_commit_transaction_outputs( diff --git a/crates/iota-core/src/global_state_hasher.rs b/crates/iota-core/src/global_state_hasher.rs index 4586261f047..1c3d66ff59d 100644 --- a/crates/iota-core/src/global_state_hasher.rs +++ b/crates/iota-core/src/global_state_hasher.rs @@ -8,7 +8,6 @@ use fastcrypto::hash::MultisetHash; use iota_common::fatal; use iota_metrics::monitored_scope; use iota_types::{ - base_types::{ObjectID, SequenceNumber}, committee::EpochId, digests::ObjectDigest, effects::{TransactionEffects, TransactionEffectsAPI}, @@ -19,7 +18,6 @@ use iota_types::{ storage::ObjectStore, }; use prometheus::{IntGauge, Registry, register_int_gauge_with_registry}; -use serde::Serialize; use tracing::debug; use crate::authority::{ @@ -101,26 +99,6 @@ impl GlobalStateHashStore for InMemoryStorage { } } -/// Serializable representation of the ObjectRef of an -/// object that has been wrapped -/// TODO: This can be replaced with ObjectKey. -#[derive(Serialize, Debug)] -pub struct WrappedObject { - id: ObjectID, - wrapped_at: SequenceNumber, - digest: ObjectDigest, -} - -impl WrappedObject { - pub fn new(id: ObjectID, wrapped_at: SequenceNumber) -> Self { - Self { - id, - wrapped_at, - digest: ObjectDigest::OBJECT_WRAPPED, - } - } -} - fn accumulate_effects(effects: &[TransactionEffects]) -> GlobalStateHash { let mut acc = GlobalStateHash::default(); @@ -214,17 +192,8 @@ impl GlobalStateHasher { } pub fn accumulate_live_object(acc: &mut GlobalStateHash, live_object: &LiveObject) { - match live_object { - LiveObject::Normal(object) => { - acc.insert(object.compute_object_reference().digest); - } - LiveObject::Wrapped(key) => { - acc.insert( - bcs::to_bytes(&WrappedObject::new(key.0, key.1)) - .expect("Failed to serialize WrappedObject"), - ); - } - } + let LiveObject::Normal(object) = live_object; + acc.insert(object.compute_object_reference().digest); } pub fn digest_live_object_set(&self) -> ECMHLiveObjectSetDigest { diff --git a/crates/iota-core/src/unit_tests/authority_tests.rs b/crates/iota-core/src/unit_tests/authority_tests.rs index d5c34b0652e..69ad0bb0892 100644 --- a/crates/iota-core/src/unit_tests/authority_tests.rs +++ b/crates/iota-core/src/unit_tests/authority_tests.rs @@ -61,6 +61,7 @@ pub use crate::authority::authority_test_utils::*; use crate::{ authority::{ authority_store_tables::AuthorityPerpetualTables, + authority_store_types::SENTINEL_PREVIOUS_TRANSACTION_CHECKPOINT, move_integration_tests::build_and_publish_test_package_with_upgrade_cap, test_authority_builder::TestAuthorityBuilder, transaction_deferral::DeferralKey, }, @@ -3272,7 +3273,7 @@ fn build_and_commit( epoch: EpochId, txs: &[TransactionDigest], ) { - let batch = cache_commit.build_db_batch(epoch, txs); + let batch = cache_commit.build_db_batch(epoch, SENTINEL_PREVIOUS_TRANSACTION_CHECKPOINT, txs); cache_commit.commit_transaction_outputs(epoch, batch, txs); } diff --git a/crates/iota-core/src/unit_tests/transaction_deny_tests.rs b/crates/iota-core/src/unit_tests/transaction_deny_tests.rs index d9f7422e71b..04710f2d8a1 100644 --- a/crates/iota-core/src/unit_tests/transaction_deny_tests.rs +++ b/crates/iota-core/src/unit_tests/transaction_deny_tests.rs @@ -36,6 +36,7 @@ use crate::{ auth_unit_test_utils::{ publish_package_on_single_authority, upgrade_package_on_single_authority, }, + authority_store_types::SENTINEL_PREVIOUS_TRANSACTION_CHECKPOINT, test_authority_builder::TestAuthorityBuilder, }, test_utils::make_transfer_iota_transaction, @@ -333,6 +334,7 @@ async fn test_package_denied() { let batch = state.get_cache_commit().build_db_batch( state.epoch_store_for_testing().epoch(), + SENTINEL_PREVIOUS_TRANSACTION_CHECKPOINT, &[tx_c, tx_b, tx_a, tx_c_prime, tx_b_prime], ); diff --git a/crates/iota-core/src/verify_indexes.rs b/crates/iota-core/src/verify_indexes.rs index c42213e03f6..062c298ea6d 100644 --- a/crates/iota-core/src/verify_indexes.rs +++ b/crates/iota-core/src/verify_indexes.rs @@ -26,9 +26,7 @@ pub fn verify_indexes(store: &dyn GlobalStateHashStore, indexes: Arc tracing::info!("Reading live objects set"); for object in store.iter_live_object_set() { - let LiveObject::Normal(object) = object else { - continue; - }; + let LiveObject::Normal(object) = object; let Owner::Address(owner) = object.owner else { continue; }; diff --git a/crates/iota-single-node-benchmark/src/benchmark_context.rs b/crates/iota-single-node-benchmark/src/benchmark_context.rs index eae668912a4..361a1a27621 100644 --- a/crates/iota-single-node-benchmark/src/benchmark_context.rs +++ b/crates/iota-single-node-benchmark/src/benchmark_context.rs @@ -10,6 +10,7 @@ use std::{ use futures::{StreamExt, stream::FuturesUnordered}; use iota_config::node::RunWithRange; +use iota_core::authority::authority_store_types::SENTINEL_PREVIOUS_TRANSACTION_CHECKPOINT; use iota_test_transaction_builder::PublishData; use iota_types::{ base_types::{IotaAddress, ObjectID, ObjectRef, SequenceNumber}, @@ -118,8 +119,11 @@ impl BenchmarkContext { let mut new_gas_objects = HashMap::new(); let cache_commit = self.validator().get_validator().get_cache_commit().clone(); for effects in results { - let batch = cache_commit - .build_db_batch(effects.executed_epoch(), &[*effects.transaction_digest()]); + let batch = cache_commit.build_db_batch( + effects.executed_epoch(), + SENTINEL_PREVIOUS_TRANSACTION_CHECKPOINT, + &[*effects.transaction_digest()], + ); cache_commit.commit_transaction_outputs( effects.executed_epoch(), @@ -189,8 +193,11 @@ impl BenchmarkContext { // object store, hence requiring these objects committed to DB. // For checkpoint executor, in order to commit a checkpoint it is required // previous versions of objects are already committed. - let batch = cache_commit - .build_db_batch(effects.executed_epoch(), &[*effects.transaction_digest()]); + let batch = cache_commit.build_db_batch( + effects.executed_epoch(), + SENTINEL_PREVIOUS_TRANSACTION_CHECKPOINT, + &[*effects.transaction_digest()], + ); cache_commit.commit_transaction_outputs( effects.executed_epoch(), batch, diff --git a/crates/iota-single-node-benchmark/src/single_node.rs b/crates/iota-single-node-benchmark/src/single_node.rs index 505ec9536ad..cfdb608c1aa 100644 --- a/crates/iota-single-node-benchmark/src/single_node.rs +++ b/crates/iota-single-node-benchmark/src/single_node.rs @@ -291,9 +291,9 @@ impl SingleValidator { .get_validator() .get_global_state_hash_store() .iter_cached_live_object_set_for_testing() - .map(|o| match o { - LiveObject::Normal(object) => (object.id(), object), - LiveObject::Wrapped(_) => unreachable!(), + .map(|o| { + let LiveObject::Normal(object) = o; + (object.id(), object) }) .collect(); InMemoryObjectStore::new(objects) diff --git a/crates/iota-snapshot/src/lib.rs b/crates/iota-snapshot/src/lib.rs index a90edb99cd5..9f13389d088 100644 --- a/crates/iota-snapshot/src/lib.rs +++ b/crates/iota-snapshot/src/lib.rs @@ -30,13 +30,14 @@ use iota_core::{ }, checkpoints::CheckpointStore, epoch::committee_store::CommitteeStore, - global_state_hasher::WrappedObject, + global_state_hasher::GlobalStateHasher, }; use iota_storage::{ FileCompression, SHA3_BYTES, compute_sha3_checksum, object_store::util::path_to_filesystem, }; use iota_types::{ base_types::ObjectID, + epoch_info::EpochInfoEntry, global_state_hash::GlobalStateHash, iota_system_state::{ IotaSystemStateTrait, epoch_start_iota_system_state::EpochStartSystemStateTrait, @@ -49,23 +50,27 @@ use object_store::path::Path; use serde::{Deserialize, Serialize}; use tokio::time::Instant; -/// The following describes the format of an object file (*.obj) used for -/// persisting live iota objects. The maximum size per .obj file is 128MB. State -/// snapshot will be taken at the end of every epoch. Live object set is split -/// into and stored across multiple hash buckets. The hashing function used -/// for bucketing objects is the same as the one used to build the accumulator -/// tree for computing state root hash. Buckets are further subdivided into -/// partitions. A partition is a smallest storage unit which holds a subset of -/// objects in one bucket. Each partition is a single *.obj file where -/// objects are appended to in an append-only fashion. A new partition is -/// created when the current one reaches its maximum size. i.e. 128MB. -/// Partitions allow a single hash bucket to be consumed in parallel. Partition -/// files are optionally compressed with the zstd compression format. Partition -/// filenames follows the format _.obj. Object -/// references for hash. There is one single ref file per hash bucket. Object -/// references are written in an append-only manner as well. Finally, the -/// MANIFEST file contains per file metadata of every file in the snapshot -/// directory. State Snapshot Directory Layout +/// The following describes the on-disk format of a snapshot, as written by +/// `StateSnapshotWriterV1` and consumed by `StateSnapshotReaderV1`. The +/// snapshot is taken at the end of every epoch and stores the live object +/// set bucketed by the same hash function used for the global state hash +/// accumulator, allowing a single bucket to be consumed in parallel. Each +/// bucket is split across one or more partitions; a partition is a single +/// `.obj` file with a maximum size of 128 MB and a matching `.ref` file +/// listing the object references in that partition. Partition files are +/// optionally zstd-compressed. +/// +/// V2 additions over V1: +/// - REFERENCE file magic is `0xCAFEBEEF` (V1 was `0xDEADBEEF`); a V2 reader +/// fails fast on a V1 magic and vice versa. +/// - REFERENCE records carry an extra 8-byte big-endian +/// `previous_transaction_checkpoint` trailer per record (80-byte records +/// instead of V1's 72). +/// - A per-snapshot `EPOCH_INFO` file is emitted alongside the bucket files, +/// carrying one entry per epoch in `[0, snapshot_epoch]` from +/// `CheckpointStore::epoch_info`. +/// +/// State Snapshot Directory Layout /// - snapshot/ /// - epoch_0/ /// - 1_1.obj @@ -73,11 +78,11 @@ use tokio::time::Instant; /// - 1_3.obj /// - 2_1.obj /// - ... -/// - 1000_1.obj -/// - REFERENCE-1 -/// - REFERENCE-2 +/// - 1_1.ref +/// - 1_2.ref +/// - 2_1.ref /// - ... -/// - REFERENCE-1000 +/// - EPOCH_INFO /// - MANIFEST /// - epoch_1/ /// - 1_1.obj @@ -100,22 +105,33 @@ use tokio::time::Instant; /// │ len │ encoding <1 byte> │ data │ /// └───────────────┴───────────────────┴──────────────┘ /// -/// REFERENCE File Disk Format +/// REFERENCE File Disk Format (V2) /// ┌──────────────────────────────┐ -/// │ magic(0x5EFE5E11) <4 byte> │ +/// │ magic(0xCAFEBEEF) <4 byte> │ /// ├──────────────────────────────┤ /// │ ┌──────────────────────────┐ │ -/// │ │ ObjectRef 1 │ │ +/// │ │ ObjectRefV2 1 │ │ /// │ ├──────────────────────────┤ │ /// │ │ ... │ │ /// │ ├──────────────────────────┤ │ -/// │ │ ObjectRef N │ │ +/// │ │ ObjectRefV2 N │ │ /// │ └──────────────────────────┘ │ /// └──────────────────────────────┘ -/// ObjectRef (ObjectID, SequenceNumber, ObjectDigest) -/// ┌───────────────┬───────────────────┬──────────────┐ -/// │ data (<(address_len + 8 + 32) bytes>) │ -/// └───────────────┴───────────────────┴──────────────┘ +/// ObjectRefV2: 80 bytes total, fields concatenated in declaration order: +/// - ObjectID : 32 bytes +/// - SequenceNumber : 8 bytes +/// - ObjectDigest : 32 bytes +/// - previous_transaction_checkpoint : 8 bytes (big-endian u64) +/// +/// EPOCH_INFO File Disk Format +/// ┌──────────────────────────────┐ +/// │ magic(0x9000C001) <4 byte> │ +/// ├──────────────────────────────┤ +/// │ bcs(EpochInfo) │ +/// └──────────────────────────────┘ +/// Integrity is anchored by `FileMetadata::sha3_digest` recorded in the +/// MANIFEST (matching how `.obj`/`.ref` files are validated); no in-file +/// sha3 trailer is written. /// /// MANIFEST File Disk Format /// ┌──────────────────────────────┐ @@ -126,7 +142,12 @@ use tokio::time::Instant; /// │ sha3 <32 bytes> │ /// └──────────────────────────────┘ const OBJECT_FILE_MAGIC: u32 = 0x00B7EC75; -const REFERENCE_FILE_MAGIC: u32 = 0xDEADBEEF; +/// Magic for V2 reference files. Distinct from the V1 magic (`0xDEADBEEF`) so +/// a V1 reader fails fast on the magic check rather than silently +/// miss-decoding a V2 ref record's extra `previous_transaction_checkpoint` +/// trailer. +const REFERENCE_FILE_MAGIC_V2: u32 = 0xCAFEBEEF; +const EPOCH_INFO_FILE_MAGIC: u32 = 0x9000C001; const MANIFEST_FILE_MAGIC: u32 = 0x00C0FFEE; const MAGIC_BYTES: usize = 4; const SNAPSHOT_VERSION_BYTES: usize = 1; @@ -138,7 +159,12 @@ const FILE_MAX_BYTES: usize = 128 * 1024 * 1024; const OBJECT_ID_BYTES: usize = ObjectID::LENGTH; const SEQUENCE_NUM_BYTES: usize = 8; const OBJECT_DIGEST_BYTES: usize = 32; -const OBJECT_REF_BYTES: usize = OBJECT_ID_BYTES + SEQUENCE_NUM_BYTES + OBJECT_DIGEST_BYTES; +/// Size of a V2 reference record: 72-byte V1 ObjectRef (ObjectID + +/// SequenceNumber + ObjectDigest) plus an 8-byte big-endian +/// `previous_transaction_checkpoint` trailer. +const PREV_TX_CHECKPOINT_BYTES: usize = 8; +const OBJECT_REF_BYTES_V2: usize = + OBJECT_ID_BYTES + SEQUENCE_NUM_BYTES + OBJECT_DIGEST_BYTES + PREV_TX_CHECKPOINT_BYTES; const FILE_TYPE_BYTES: usize = 1; const BUCKET_BYTES: usize = 4; const BUCKET_PARTITION_BYTES: usize = 4; @@ -152,7 +178,10 @@ const FILE_METADATA_BYTES: usize = #[repr(u8)] pub enum FileType { Object = 0, - Reference, + Reference = 1, + /// V2 only: per-epoch metadata file, populated from `CheckpointStore`'s + /// `epoch_info` table. + EpochInfo = 2, } #[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] @@ -174,6 +203,9 @@ impl FileMetadata { FileType::Reference => { dir_path.child(&*format!("{}_{}.ref", self.bucket_num, self.part_num)) } + // EPOCH_INFO is a singleton per snapshot, so bucket/part numbers + // do not contribute to the filename. + FileType::EpochInfo => dir_path.child("EPOCH_INFO"), } } pub fn local_file_path(&self, root_path: &std::path::Path, dir_path: &Path) -> Result { @@ -181,38 +213,69 @@ impl FileMetadata { } } +/// Body of a manifest at any version. V1 and V2 are structurally identical — +/// the on-disk wire format is the same and the BCS variant tag on `Manifest` +/// distinguishes them. V2 differs only in semantic associations: the +/// `file_metadata` list includes the per-snapshot `EPOCH_INFO` file, and +/// `.ref` files carry 80-byte records (with a `previous_transaction_checkpoint` +/// trailer). `address_length` is preserved as a sanity check across versions. #[derive(Debug, Serialize, Deserialize, Eq, PartialEq)] -pub struct ManifestV1 { +pub struct ManifestBody { pub snapshot_version: u8, pub address_length: u64, pub file_metadata: Vec, pub epoch: u64, } +// `Manifest::V1` and `Manifest::V2` use the same `ManifestBody` payload — +// the BCS variant tag distinguishes them. The variants must stay (removing +// `V1` would shift `V2`'s tag) even though the body type is shared. #[derive(Debug, Serialize, Deserialize, Eq, PartialEq)] pub enum Manifest { - V1(ManifestV1), + V1(ManifestBody), + V2(ManifestBody), } impl Manifest { - pub fn snapshot_version(&self) -> u8 { + fn body(&self) -> &ManifestBody { match self { - Self::V1(manifest) => manifest.snapshot_version, + Self::V1(manifest) | Self::V2(manifest) => manifest, } } + pub fn snapshot_version(&self) -> u8 { + self.body().snapshot_version + } pub fn address_length(&self) -> u64 { - match self { - Self::V1(manifest) => manifest.address_length, - } + self.body().address_length } pub fn file_metadata(&self) -> &Vec { - match self { - Self::V1(manifest) => &manifest.file_metadata, - } + &self.body().file_metadata } pub fn epoch(&self) -> u64 { + self.body().epoch + } +} + +/// On-disk schema for the per-snapshot `EPOCH_INFO` file. Versioned to allow +/// future schema evolution. `entries[i]` is the entry for epoch `i`; `None` +/// indicates the source `epoch_info` table had no row for that epoch. +/// Length is `snapshot_epoch + 1`. +// Note: no `Eq`/`PartialEq` derive here. `EpochInfoEntry` transitively +// contains `BLS12381AggregateSignature`, which does not implement `PartialEq`. +#[derive(Debug, Serialize, Deserialize)] +pub enum EpochInfo { + V1(EpochInfoV1), +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct EpochInfoV1 { + pub entries: Vec>, +} + +impl EpochInfo { + pub fn entries(&self) -> &[Option] { match self { - Self::V1(manifest) => manifest.epoch, + Self::V1(info) => &info.entries, } } } @@ -328,17 +391,7 @@ pub async fn accumulate_live_object_iter( // Accumulate live objects let mut acc = GlobalStateHash::default(); for live_object in iter { - match live_object { - LiveObject::Normal(object) => { - acc.insert(object.compute_object_reference().digest); - } - LiveObject::Wrapped(key) => { - acc.insert( - bcs::to_bytes(&WrappedObject::new(key.0, key.1)) - .expect("Failed to serialize WrappedObject"), - ); - } - } + GlobalStateHasher::accumulate_live_object(&mut acc, &live_object); accum_counter.fetch_add(1, Ordering::Relaxed); } accum_progress_bar.finish_with_message("DB live object accumulation completed"); diff --git a/crates/iota-snapshot/src/reader.rs b/crates/iota-snapshot/src/reader.rs index 0d4effbf9db..59a25a841ec 100644 --- a/crates/iota-snapshot/src/reader.rs +++ b/crates/iota-snapshot/src/reader.rs @@ -52,12 +52,18 @@ use tokio::{ use tracing::{error, info}; use crate::{ - FileMetadata, FileType, MAGIC_BYTES, MANIFEST_FILE_MAGIC, Manifest, OBJECT_FILE_MAGIC, - OBJECT_ID_BYTES, OBJECT_REF_BYTES, REFERENCE_FILE_MAGIC, SEQUENCE_NUM_BYTES, SHA3_BYTES, + FileMetadata, FileType, MAGIC_BYTES, MANIFEST_FILE_MAGIC, Manifest, OBJECT_DIGEST_BYTES, + OBJECT_FILE_MAGIC, OBJECT_ID_BYTES, OBJECT_REF_BYTES_V2, REFERENCE_FILE_MAGIC_V2, + SEQUENCE_NUM_BYTES, SHA3_BYTES, }; pub type SnapshotChecksums = (DigestByBucketAndPartition, GlobalStateHash); pub type DigestByBucketAndPartition = BTreeMap>; +/// Orchestrates restoring a state snapshot from a remote object store. The +/// `V1` suffix refers to the orchestration-layer revision of this struct +/// (its public API surface), not the snapshot wire format. After the V2 +/// snapshot rollout this reader consumes only V2 manifests and refuses V1 +/// snapshots up-front. #[derive(Clone)] pub struct StateSnapshotReaderV1 { epoch: u64, @@ -116,10 +122,14 @@ impl StateSnapshotReaderV1 { local_staging_dir_root.clone(), &manifest_file_path, )?)?; - // Verifies MANIFEST + // Verifies MANIFEST. Only V2 snapshots are readable; operators + // regenerate any older V1 snapshots after PR 1 deploy. let snapshot_version = manifest.snapshot_version(); - if snapshot_version != 1u8 { - bail!("Unexpected snapshot version: {}", snapshot_version); + if snapshot_version != 2u8 { + bail!( + "Unsupported snapshot version: {snapshot_version}. \ + Only snapshot V2 is supported; regenerate the snapshot." + ); } if manifest.address_length() as usize > ObjectID::LENGTH { bail!("Max possible address length is: {}", ObjectID::LENGTH); @@ -131,28 +141,39 @@ impl StateSnapshotReaderV1 { // directory let mut object_files = BTreeMap::new(); let mut ref_files = BTreeMap::new(); + let mut epoch_info_seen = false; for file_metadata in manifest.file_metadata() { match file_metadata.file_type { FileType::Object => { - // Gets the object FileMetadata bucket with the bucket number, or inserts a new - // one if it doesn't exist. let entry = object_files .entry(file_metadata.bucket_num) .or_insert_with(BTreeMap::new); - // Inserts the object FileMetadata with the partition number to the bucket. entry.insert(file_metadata.part_num, file_metadata.clone()); } FileType::Reference => { - // Gets the reference FileMetadata bucket with the bucket number, or inserts a - // new one if it doesn't exist. let entry = ref_files .entry(file_metadata.bucket_num) .or_insert_with(BTreeMap::new); - // Inserts the reference FileMetadata with the partition number to the bucket. entry.insert(file_metadata.part_num, file_metadata.clone()); } + FileType::EpochInfo => { + if epoch_info_seen { + bail!("Manifest contains more than one EPOCH_INFO entry"); + } + epoch_info_seen = true; + } } } + // V2 manifests must list the per-snapshot EPOCH_INFO file. The + // manifest entry is required so a missing entry fails fast, but + // the EPOCH_INFO file itself is not downloaded or sha3-verified + // here: it is consumed out-of-band from the bucket rather than + // through the restore path. Out-of-band consumers (e.g. the + // indexer) are responsible for verifying the file against + // `FileMetadata::sha3_digest` recorded in the MANIFEST. + if !epoch_info_seen { + bail!("V2 manifest missing required EPOCH_INFO entry"); + } let epoch_dir_path = Path::from(epoch_dir); // Collects the path of all reference files let files: Vec = ref_files @@ -613,7 +634,13 @@ impl StateSnapshotReaderV1 { } } -/// An iterator over all object refs in a .ref file. +/// An iterator over all object refs in a V2 .ref file. +/// +/// V2 records carry an 8-byte big-endian `previous_transaction_checkpoint` +/// trailer after the V1 layout. The trailer is read past to keep the stream +/// aligned but is not surfaced via the `Iterator` impl: the restore path +/// only needs `ObjectRef` for digest checksumming and live-object insertion, +/// so yielding the V1 shape keeps that code path unchanged. pub struct ObjectRefIter { reader: Box, } @@ -623,21 +650,30 @@ impl ObjectRefIter { let file_path = file_metadata.local_file_path(&root_path, &dir_path)?; let mut reader = file_metadata.file_compression.decompress(&file_path)?; let magic = reader.read_u32::()?; - if magic != REFERENCE_FILE_MAGIC { - bail!("Unexpected magic string in REFERENCE file: {:?}", magic) + if magic != REFERENCE_FILE_MAGIC_V2 { + bail!( + "Unexpected magic string in V2 REFERENCE file: {:#x}, expected {:#x}", + magic, + REFERENCE_FILE_MAGIC_V2 + ) } else { Ok(ObjectRefIter { reader }) } } fn next_ref(&mut self) -> Result { - let mut buf = [0u8; OBJECT_REF_BYTES]; + let mut buf = [0u8; OBJECT_REF_BYTES_V2]; self.reader.read_exact(&mut buf)?; let object_id = &buf[0..OBJECT_ID_BYTES]; let sequence_number = &buf[OBJECT_ID_BYTES..OBJECT_ID_BYTES + SEQUENCE_NUM_BYTES] .reader() .read_u64::()?; - let sha3_digest = &buf[OBJECT_ID_BYTES + SEQUENCE_NUM_BYTES..OBJECT_REF_BYTES]; + let digest_end = OBJECT_ID_BYTES + SEQUENCE_NUM_BYTES + OBJECT_DIGEST_BYTES; + let sha3_digest = &buf[OBJECT_ID_BYTES + SEQUENCE_NUM_BYTES..digest_end]; + // The trailing 8 bytes carry `previous_transaction_checkpoint`. We + // read past them to keep the stream aligned but do not surface them + // here; the per-record value is not needed for state-hash + // verification or live-object restoration. let object_ref = ObjectRef::new( ObjectID::from_bytes(object_id)?, SequenceNumber::from_u64(*sequence_number), diff --git a/crates/iota-snapshot/src/tests.rs b/crates/iota-snapshot/src/tests.rs index f3d7a102449..72f917372ba 100644 --- a/crates/iota-snapshot/src/tests.rs +++ b/crates/iota-snapshot/src/tests.rs @@ -2,14 +2,15 @@ // Modifications Copyright (c) 2024 IOTA Stiftung // SPDX-License-Identifier: Apache-2.0 -use std::{collections::HashSet, num::NonZeroUsize, sync::Arc}; +use std::{collections::HashSet, fs, io::Write, num::NonZeroUsize, sync::Arc}; -use fastcrypto::hash::MultisetHash; +use byteorder::{BigEndian, ByteOrder}; +use fastcrypto::hash::{HashFunction, MultisetHash, Sha3_256}; use futures::future::AbortHandle; use indicatif::MultiProgress; use iota_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType}; use iota_core::{ - authority::authority_store_tables::AuthorityPerpetualTables, + authority::authority_store_tables::AuthorityPerpetualTables, checkpoints::CheckpointStore, global_state_hasher::GlobalStateHasher, }; use iota_types::{ @@ -17,7 +18,11 @@ use iota_types::{ messages_checkpoint::ECMHLiveObjectSetDigest, object::Object, }; -use crate::{FileCompression, reader::StateSnapshotReaderV1, writer::StateSnapshotWriterV1}; +use crate::{ + EPOCH_INFO_FILE_MAGIC, EpochInfo, EpochInfoV1, FileCompression, FileMetadata, FileType, + MAGIC_BYTES, MANIFEST_FILE_MAGIC, Manifest, ManifestBody, OBJECT_REF_BYTES_V2, + reader::StateSnapshotReaderV1, writer::StateSnapshotWriterV1, +}; pub fn insert_keys( db: &AuthorityPerpetualTables, @@ -56,40 +61,104 @@ fn accumulate_live_object_set(perpetual_db: &AuthorityPerpetualTables) -> Global acc } -#[tokio::test] -async fn test_snapshot_basic() -> Result<(), anyhow::Error> { - let tmp_dir = iota_common::tempdir(); +/// Writes a snapshot with `num_objects` live objects to a temp remote store, +/// reads it back into a fresh perpetual DB, and asserts the live object set +/// round-trips. +// TODO(iota-snapshot tests): the two cases (populated, empty) are merged into +// `test_snapshot_round_trip` as a single `#[tokio::test]` to sidestep the +// `typed_store::DBMetrics` global Prometheus registry race documented at +// `crates/typed-store/src/metrics.rs` (`once_cell::sync::OnceCell`-based +// initialization that races between concurrent test threads). Do not split +// these back into separate tests until that registry is made re-entrant. +async fn snapshot_round_trip( + tmp_dir: &std::path::Path, + num_objects: u64, + file_compression: FileCompression, +) -> Result<(), anyhow::Error> { let local_store_config = ObjectStoreConfig { object_store: Some(ObjectStoreType::File), - directory: Some(tmp_dir.path().join("local_dir")), + directory: Some(tmp_dir.join("local_dir")), ..Default::default() }; let remote_store_config = ObjectStoreConfig { object_store: Some(ObjectStoreType::File), - directory: Some(tmp_dir.path().join("remote_dir")), + directory: Some(tmp_dir.join("remote_dir")), ..Default::default() }; let snapshot_writer = StateSnapshotWriterV1::new( &local_store_config, &remote_store_config, - FileCompression::Zstd, + file_compression, NonZeroUsize::new(1).unwrap(), ) .await?; - let perpetual_db = Arc::new(AuthorityPerpetualTables::open( - &tmp_dir.path().join("db"), - None, - )); - insert_keys(&perpetual_db, 1000)?; + let perpetual_db = Arc::new(AuthorityPerpetualTables::open(&tmp_dir.join("db"), None)); + insert_keys(&perpetual_db, num_objects)?; let root_accumulator = ECMHLiveObjectSetDigest::from(accumulate_live_object_set(&perpetual_db).digest()); + let checkpoint_store = CheckpointStore::new(&tmp_dir.join("checkpoint_store")); snapshot_writer - .write_internal(0, perpetual_db.clone(), root_accumulator) + .write_internal(0, perpetual_db.clone(), checkpoint_store, root_accumulator) .await?; + + // On-wire size assertion: with no compression the uploaded `.ref` file + // is exactly `MAGIC_BYTES + num_objects * OBJECT_REF_BYTES_V2`. This + // locks the V2 trailer width — a bug that miss-sized records would + // still pass the round-trip if writer and reader agreed on the wrong + // size. Reads from the remote store, since `sync_file_to_remote` + // removes the local copy after upload. + if file_compression == FileCompression::None && num_objects > 0 { + let ref_file = tmp_dir.join("remote_dir").join("epoch_0").join("1_1.ref"); + let actual_size = fs::metadata(&ref_file)?.len() as usize; + let expected_size = MAGIC_BYTES + (num_objects as usize) * OBJECT_REF_BYTES_V2; + assert_eq!( + actual_size, expected_size, + "ref-file on-wire size mismatch: expected {expected_size}, got {actual_size}" + ); + } + + // Lock the EPOCH_INFO file's on-disk shape: 4-byte big-endian magic + // followed by `bcs(EpochInfo)`. The reader does not consume this file + // during restore (the indexer reads it out-of-band from the bucket), so + // without this assertion a writer bug — typo'd magic, wrong filename, + // wrong BCS encoding — would pass every test and only surface + // post-deploy when the indexer fails to decode. Gated on `None` + // compression so the raw on-wire bytes are readable directly. + if file_compression == FileCompression::None { + let epoch_info_file = tmp_dir + .join("remote_dir") + .join("epoch_0") + .join("EPOCH_INFO"); + let bytes = fs::read(&epoch_info_file)?; + assert!( + bytes.len() >= MAGIC_BYTES, + "EPOCH_INFO file is shorter than the magic header: {} bytes", + bytes.len() + ); + let magic = BigEndian::read_u32(&bytes[..MAGIC_BYTES]); + assert_eq!( + magic, EPOCH_INFO_FILE_MAGIC, + "EPOCH_INFO magic mismatch: got {magic:#x}, expected {EPOCH_INFO_FILE_MAGIC:#x}" + ); + let decoded: EpochInfo = bcs::from_bytes(&bytes[MAGIC_BYTES..])?; + let EpochInfo::V1(decoded_v1) = decoded; + // The round-trip test uses an empty `CheckpointStore`, so for + // epoch 0 the writer emits a single `None` entry. + assert_eq!( + decoded_v1.entries.len(), + 1, + "expected `entries` of length 1 for snapshot at epoch 0" + ); + assert!( + decoded_v1.entries[0].is_none(), + "expected `entries[0]` to be `None` (CheckpointStore is empty)" + ); + } + let local_store_restore_config = ObjectStoreConfig { object_store: Some(ObjectStoreType::File), - directory: Some(tmp_dir.path().join("local_dir_restore")), + directory: Some(tmp_dir.join("local_dir_restore")), ..Default::default() }; let mut snapshot_reader = StateSnapshotReaderV1::new( @@ -101,8 +170,7 @@ async fn test_snapshot_basic() -> Result<(), anyhow::Error> { false, // skip_reset_local_store ) .await?; - let restored_perpetual_db = - AuthorityPerpetualTables::open(&tmp_dir.path().join("restored_db"), None); + let restored_perpetual_db = AuthorityPerpetualTables::open(&tmp_dir.join("restored_db"), None); let (_abort_handle, abort_registration) = AbortHandle::new_pair(); snapshot_reader .read(&restored_perpetual_db, abort_registration, None) @@ -112,54 +180,117 @@ async fn test_snapshot_basic() -> Result<(), anyhow::Error> { } #[tokio::test] -async fn test_snapshot_empty_db() -> Result<(), anyhow::Error> { +async fn test_snapshot_round_trip() -> Result<(), anyhow::Error> { + // Populated case, with compression — exercises the production path. + let basic_dir = iota_common::tempdir(); + snapshot_round_trip(basic_dir.path(), 1000, FileCompression::Zstd).await?; + // Empty database case. + let empty_dir = iota_common::tempdir(); + snapshot_round_trip(empty_dir.path(), 0, FileCompression::Zstd).await?; + // Uncompressed case so the ref-file on-wire size assertion can run + // directly against the staged file. + let uncompressed_dir = iota_common::tempdir(); + snapshot_round_trip(uncompressed_dir.path(), 100, FileCompression::None).await?; + Ok(()) +} + +/// Negative test: a V2 manifest without an `EPOCH_INFO` entry must be +/// rejected up-front by `StateSnapshotReaderV1::new`. Locks the contract that +/// the manifest's EPOCH_INFO entry is required. +#[tokio::test] +async fn test_v2_manifest_missing_epoch_info_is_rejected() { let tmp_dir = iota_common::tempdir(); - let local_store_config = ObjectStoreConfig { - object_store: Some(ObjectStoreType::File), - directory: Some(tmp_dir.path().join("local_dir")), - ..Default::default() - }; + let remote_root = tmp_dir.path().join("remote_dir"); + let epoch_dir = remote_root.join("epoch_0"); + fs::create_dir_all(&epoch_dir).unwrap(); + + // Manifest with file_metadata containing a bogus reference entry but no + // EPOCH_INFO. The reader should reject this up-front. + let manifest = Manifest::V2(ManifestBody { + snapshot_version: 2, + address_length: ObjectID::LENGTH as u64, + file_metadata: vec![FileMetadata { + file_type: FileType::Reference, + bucket_num: 1, + part_num: 1, + file_compression: FileCompression::None, + sha3_digest: [0u8; 32], + }], + epoch: 0, + }); + let manifest_path = epoch_dir.join("MANIFEST"); + write_manifest_file(&manifest_path, &manifest).unwrap(); + let remote_store_config = ObjectStoreConfig { object_store: Some(ObjectStoreType::File), - directory: Some(tmp_dir.path().join("remote_dir")), + directory: Some(remote_root), ..Default::default() }; - let snapshot_writer = StateSnapshotWriterV1::new( - &local_store_config, - &remote_store_config, - FileCompression::Zstd, - NonZeroUsize::new(1).unwrap(), - ) - .await?; - let perpetual_db = Arc::new(AuthorityPerpetualTables::open( - &tmp_dir.path().join("db"), - None, - )); - let root_accumulator = - ECMHLiveObjectSetDigest::from(accumulate_live_object_set(&perpetual_db).digest()); - snapshot_writer - .write_internal(0, perpetual_db.clone(), root_accumulator) - .await?; - let local_store_restore_config = ObjectStoreConfig { + let local_store_config = ObjectStoreConfig { object_store: Some(ObjectStoreType::File), directory: Some(tmp_dir.path().join("local_dir_restore")), ..Default::default() }; - let mut snapshot_reader = StateSnapshotReaderV1::new( + + let result = StateSnapshotReaderV1::new( 0, &remote_store_config, - &local_store_restore_config, + &local_store_config, NonZeroUsize::new(1).unwrap(), MultiProgress::new(), - false, // skip_reset_local_store + false, ) - .await?; - let restored_perpetual_db = - AuthorityPerpetualTables::open(&tmp_dir.path().join("restored_db"), None); - let (_abort_handle, abort_registration) = AbortHandle::new_pair(); - snapshot_reader - .read(&restored_perpetual_db, abort_registration, None) - .await?; - compare_live_objects(&perpetual_db, &restored_perpetual_db)?; + .await; + let err = result + .err() + .expect("missing EPOCH_INFO must be rejected by the reader"); + assert!( + err.to_string() + .contains("V2 manifest missing required EPOCH_INFO entry"), + "expected EPOCH_INFO-missing error, got: {err}" + ); +} + +/// Writes a freestanding MANIFEST file in the same on-disk format as +/// `StateSnapshotWriterV1::write_manifest`: 4-byte big-endian magic, BCS +/// payload, 32-byte sha3 trailer over (magic + bcs). +fn write_manifest_file(path: &std::path::Path, manifest: &Manifest) -> std::io::Result<()> { + let mut magic_buf = [0u8; MAGIC_BYTES]; + BigEndian::write_u32(&mut magic_buf, MANIFEST_FILE_MAGIC); + let body = bcs::to_bytes(manifest).expect("manifest serialization"); + let mut hasher = Sha3_256::default(); + hasher.update(magic_buf); + hasher.update(&body); + let sha3 = hasher.finalize().digest; + + let mut f = fs::File::create(path)?; + f.write_all(&magic_buf)?; + f.write_all(&body)?; + f.write_all(&sha3)?; + f.sync_data()?; Ok(()) } + +/// Locks the on-wire format of the `EPOCH_INFO` file body: BCS-encoding +/// `EpochInfo::V1` must use variant tag `0`, and `entries` must round-trip +/// with its length and `None`/`Some` shape preserved. `EpochInfoEntry` +/// does not implement `PartialEq`, so this covers the all-`None` shape +/// end-to-end. The `Some(..)` payload is not exercised here because +/// constructing a real `CertifiedCheckpointSummary` requires committee +/// fixtures from `iota-swarm-config`, which this crate does not depend on. +#[test] +fn epoch_info_v1_bcs_round_trip() { + let epoch_info = EpochInfo::V1(EpochInfoV1 { + entries: vec![None, None, None], + }); + let bytes = bcs::to_bytes(&epoch_info).unwrap(); + assert_eq!( + bytes[0], 0, + "EpochInfo::V1 must remain at BCS discriminant 0" + ); + + let decoded: EpochInfo = bcs::from_bytes(&bytes).unwrap(); + let EpochInfo::V1(decoded_v1) = decoded; + assert_eq!(decoded_v1.entries.len(), 3); + assert!(decoded_v1.entries.iter().all(Option::is_none)); +} diff --git a/crates/iota-snapshot/src/uploader.rs b/crates/iota-snapshot/src/uploader.rs index 1887b983946..2ca1bcd267e 100644 --- a/crates/iota-snapshot/src/uploader.rs +++ b/crates/iota-snapshot/src/uploader.rs @@ -153,7 +153,12 @@ impl StateSnapshotUploader { .expect("Expected at least one commitment") .clone(); state_snapshot_writer - .write(*epoch, db, state_hash_commitment) + .write( + *epoch, + db, + self.checkpoint_store.clone(), + state_hash_commitment, + ) .await?; info!("State snapshot creation successful for epoch: {}", *epoch); // Records the on-chain start timestamp of this epoch (= timestamp of the diff --git a/crates/iota-snapshot/src/writer.rs b/crates/iota-snapshot/src/writer.rs index 805d443298e..7759e0bd18b 100644 --- a/crates/iota-snapshot/src/writer.rs +++ b/crates/iota-snapshot/src/writer.rs @@ -21,7 +21,8 @@ use futures::StreamExt; use integer_encoding::VarInt; use iota_config::object_storage_config::ObjectStoreConfig; use iota_core::{ - authority::authority_store_tables::{AuthorityPerpetualTables, LiveObject}, + authority::authority_store_tables::{AuthorityPerpetualTables, LiveObject, LiveObjectV2}, + checkpoints::CheckpointStore, global_state_hasher::GlobalStateHasher, }; use iota_storage::{ @@ -31,7 +32,7 @@ use iota_storage::{ use iota_types::{ base_types::{ObjectID, ObjectRef}, global_state_hash::GlobalStateHash, - messages_checkpoint::ECMHLiveObjectSetDigest, + messages_checkpoint::{CheckpointSequenceNumber, ECMHLiveObjectSetDigest}, }; use object_store::{DynObjectStore, path::Path}; use tokio::{ @@ -45,9 +46,10 @@ use tokio_stream::wrappers::ReceiverStream; use tracing::debug; use crate::{ - FILE_MAX_BYTES, FileCompression, FileMetadata, FileType, MAGIC_BYTES, MANIFEST_FILE_MAGIC, - Manifest, ManifestV1, OBJECT_FILE_MAGIC, OBJECT_REF_BYTES, REFERENCE_FILE_MAGIC, - SEQUENCE_NUM_BYTES, compute_sha3_checksum, create_file_metadata, + EPOCH_INFO_FILE_MAGIC, EpochInfo, EpochInfoV1, FILE_MAX_BYTES, FileCompression, FileMetadata, + FileType, MAGIC_BYTES, MANIFEST_FILE_MAGIC, Manifest, ManifestBody, OBJECT_DIGEST_BYTES, + OBJECT_FILE_MAGIC, OBJECT_REF_BYTES_V2, REFERENCE_FILE_MAGIC_V2, SEQUENCE_NUM_BYTES, + compute_sha3_checksum, create_file_metadata, }; /// LiveObjectSetWriterV1 writes live object set. It creates multiple *.obj @@ -87,12 +89,12 @@ impl LiveObjectSetWriterV1 { }) } - /// Writes a live object to the object file and the reference to the - /// reference file. - pub fn write(&mut self, object: &LiveObject) -> Result<()> { - let object_reference = object.object_reference(); - self.write_object(object)?; - self.write_object_ref(&object_reference)?; + /// Writes a live object to the object file and the reference (with its + /// `previous_transaction_checkpoint` trailer) to the V2 reference file. + pub fn write(&mut self, object: &LiveObjectV2) -> Result<()> { + let object_reference = object.live.object_reference(); + self.write_object(&object.live)?; + self.write_object_ref(&object_reference, object.previous_transaction_checkpoint)?; Ok(()) } @@ -122,7 +124,7 @@ impl LiveObjectSetWriterV1 { Ok((n, f)) } - /// Creates a new reference file for the provided bucket number and part + /// Creates a new V2 reference file for the provided bucket number and part /// number, and returns the file and the number of bytes written to the /// file. fn ref_file(dir_path: PathBuf, bucket_num: u32, part_num: u32) -> Result { @@ -131,7 +133,7 @@ impl LiveObjectSetWriterV1 { let mut f = File::create(ref_tmp_path.clone())?; f.rewind()?; let mut metab = [0u8; MAGIC_BYTES]; - BigEndian::write_u32(&mut metab, REFERENCE_FILE_MAGIC); + BigEndian::write_u32(&mut metab, REFERENCE_FILE_MAGIC_V2); let n = f.write(&metab)?; drop(f); fs::rename(ref_tmp_path, ref_path.clone())?; @@ -234,23 +236,31 @@ impl LiveObjectSetWriterV1 { Ok(()) } - /// Writes an object reference to the reference file. - fn write_object_ref(&mut self, object_ref: &ObjectRef) -> Result<()> { - let mut buf = [0u8; OBJECT_REF_BYTES]; - buf[0..ObjectID::LENGTH].copy_from_slice(object_ref.object_id.as_ref()); - BigEndian::write_u64( - &mut buf[ObjectID::LENGTH..OBJECT_REF_BYTES], - object_ref.version.as_u64(), - ); - buf[ObjectID::LENGTH + SEQUENCE_NUM_BYTES..OBJECT_REF_BYTES] - .copy_from_slice(object_ref.digest.as_ref()); + /// Writes a V2 object reference record to the reference file: + /// `ObjectID(32) | SequenceNumber(8 BE) | ObjectDigest(32) | + /// PrevTxCheckpoint(8 BE)`. + fn write_object_ref( + &mut self, + object_ref: &ObjectRef, + previous_transaction_checkpoint: CheckpointSequenceNumber, + ) -> Result<()> { + let mut buf = [0u8; OBJECT_REF_BYTES_V2]; + let id_end = ObjectID::LENGTH; + let seq_end = id_end + SEQUENCE_NUM_BYTES; + let digest_end = seq_end + OBJECT_DIGEST_BYTES; + buf[0..id_end].copy_from_slice(object_ref.object_id.as_ref()); + BigEndian::write_u64(&mut buf[id_end..seq_end], object_ref.version.as_u64()); + buf[seq_end..digest_end].copy_from_slice(object_ref.digest.as_ref()); + BigEndian::write_u64(&mut buf[digest_end..], previous_transaction_checkpoint); self.ref_wbuf.write_all(&buf)?; Ok(()) } } -/// StateSnapshotWriterV1 writes snapshot files to a local staging dir and -/// simultaneously uploads them to a remote object store +/// Writes snapshot files to a local staging dir and simultaneously uploads +/// them to a remote object store. The `V1` suffix refers to the +/// orchestration-layer revision of this struct (its public API surface), +/// not the snapshot wire format — this writer emits V2 snapshots. pub struct StateSnapshotWriterV1 { local_staging_dir: PathBuf, file_compression: FileCompression, @@ -305,9 +315,10 @@ impl StateSnapshotWriterV1 { self, epoch: u64, perpetual_db: Arc, + checkpoint_store: Arc, root_state_hash: ECMHLiveObjectSetDigest, ) -> Result<()> { - self.write_internal(epoch, perpetual_db, root_state_hash) + self.write_internal(epoch, perpetual_db, checkpoint_store, root_state_hash) .await } @@ -317,6 +328,7 @@ impl StateSnapshotWriterV1 { mut self, epoch: u64, perpetual_db: Arc, + checkpoint_store: Arc, root_state_hash: ECMHLiveObjectSetDigest, ) -> Result<()> { self.setup_epoch_dir(epoch).await?; @@ -333,6 +345,7 @@ impl StateSnapshotWriterV1 { self.write_live_object_set( epoch, perpetual_db, + checkpoint_store, sender, Self::bucket_func, root_state_hash, @@ -406,12 +419,13 @@ impl StateSnapshotWriterV1 { } /// Writes the provided live object set in the form of reference files, - /// object files, and MANIFEST. These files are stored in the local - /// staging directory and the FileMetadata is sent to the channel. + /// object files, EPOCH_INFO file, and MANIFEST. These files are staged + /// locally and their `FileMetadata` is sent to the upload channel. fn write_live_object_set( &mut self, epoch: u64, perpetual_db: Arc, + checkpoint_store: Arc, sender: Sender, bucket_func: F, root_state_hash: ECMHLiveObjectSetDigest, @@ -423,12 +437,12 @@ impl StateSnapshotWriterV1 { let local_staging_dir_path = path_to_filesystem(self.local_staging_dir.clone(), &self.epoch_dir(epoch))?; let mut acc = GlobalStateHash::default(); - for object in perpetual_db.iter_live_object_set() { - GlobalStateHasher::accumulate_live_object(&mut acc, &object); - let bucket_num = bucket_func(&object); + for entry in perpetual_db.iter_live_object_set_v2() { + GlobalStateHasher::accumulate_live_object(&mut acc, &entry.live); + let bucket_num = bucket_func(&entry.live); // Creates a new LiveObjectSetWriterV1 for the bucket if it does not exist - if let Vacant(entry) = object_writers.entry(bucket_num) { - entry.insert(LiveObjectSetWriterV1::new( + if let Vacant(slot) = object_writers.entry(bucket_num) { + slot.insert(LiveObjectSetWriterV1::new( local_staging_dir_path.clone(), bucket_num, self.file_compression, @@ -438,7 +452,7 @@ impl StateSnapshotWriterV1 { let writer = object_writers .get_mut(&bucket_num) .context("Unexpected missing bucket writer")?; - writer.write(&object)?; + writer.write(&entry)?; } assert_eq!( ECMHLiveObjectSetDigest::from(acc.digest()), @@ -451,18 +465,81 @@ impl StateSnapshotWriterV1 { for (_, writer) in object_writers.into_iter() { files.extend(writer.done()?); } + // Emit the EPOCH_INFO file alongside the bucket files. It must go through + // the same upload channel as `.obj`/`.ref` files so the existing + // upload-MANIFEST-last invariant continues to imply all referenced + // files are present. + let epoch_info_metadata = + self.write_epoch_info(epoch, &local_staging_dir_path, &checkpoint_store, &sender)?; + files.push(epoch_info_metadata); // Write the manifest file for the epoch(bucket) self.write_manifest(epoch, files)?; Ok(()) } + /// Writes the per-snapshot `EPOCH_INFO` file, which carries one entry per + /// epoch in `[0, epoch]` from `CheckpointStore::epoch_info`. Epochs with + /// no row in the source table are emitted as `None` so consumers can + /// distinguish a missing entry from a present one. + /// + /// File layout: 4-byte magic | bcs(EpochInfo). Integrity is anchored by + /// `FileMetadata::sha3_digest` recorded in the MANIFEST (matching how + /// `.obj`/`.ref` files are validated); no in-file sha3 trailer is + /// written. + fn write_epoch_info( + &self, + epoch: u64, + local_staging_dir_path: &std::path::Path, + checkpoint_store: &CheckpointStore, + sender: &Sender, + ) -> Result { + let mut entries = Vec::with_capacity((epoch + 1) as usize); + // O(epochs) point lookups. Cheap relative to writing the live-object + // set (millions of rows) and to the snapshot upload, so the simple + // loop is fine; a range scan would be a micro-optimization. + for epoch_id in 0..=epoch { + let entry = checkpoint_store.get_epoch_info(epoch_id)?; + // Turn a silent miswrite (entry stored under the wrong epoch + // key) into a loud panic at snapshot time. + if let Some(e) = entry.as_ref() { + assert_eq!( + e.last_checkpoint_summary.epoch(), + epoch_id, + "epoch_info[{epoch_id}] is populated with an entry for epoch {}; \ + the snapshot would silently misattribute checkpoints", + e.last_checkpoint_summary.epoch(), + ); + } + entries.push(entry); + } + let epoch_info = EpochInfo::V1(EpochInfoV1 { entries }); + let serialized = bcs::to_bytes(&epoch_info)?; + + let file_path = local_staging_dir_path.join("EPOCH_INFO"); + let mut metab = [0u8; MAGIC_BYTES]; + BigEndian::write_u32(&mut metab, EPOCH_INFO_FILE_MAGIC); + + let mut f = File::create(&file_path)?; + f.write_all(&metab)?; + f.write_all(&serialized)?; + f.sync_data()?; + drop(f); + + // Use bucket_num/part_num 0; EPOCH_INFO is a singleton per snapshot + // and the filename does not include them. + let file_metadata = + create_file_metadata(&file_path, self.file_compression, FileType::EpochInfo, 0, 0)?; + sender.blocking_send(file_metadata.clone())?; + Ok(file_metadata) + } + /// Writes the manifest file for the provided FileMetadata of an epoch and /// its sha3 checksum. fn write_manifest(&mut self, epoch: u64, file_metadata: Vec) -> Result<()> { let (f, manifest_file_path) = self.manifest_file(epoch)?; let mut wbuf = BufWriter::new(f); - let manifest: Manifest = Manifest::V1(ManifestV1 { - snapshot_version: 1, + let manifest: Manifest = Manifest::V2(ManifestBody { + snapshot_version: 2, address_length: ObjectID::LENGTH as u64, file_metadata, epoch, diff --git a/crates/iota-surfer/src/surfer_task.rs b/crates/iota-surfer/src/surfer_task.rs index 213a19693bd..29089222c66 100644 --- a/crates/iota-surfer/src/surfer_task.rs +++ b/crates/iota-surfer/src/surfer_task.rs @@ -55,49 +55,40 @@ impl SurferTask { .collect() }); for obj in all_live_objects { - match obj { - LiveObject::Normal(obj) => { - if let Some(struct_tag) = obj.struct_tag() { - let obj_ref = obj.compute_object_reference(); - match obj.owner { - Owner::Immutable => { - immutable_objects - .write() - .await - .entry(struct_tag) - .or_default() - .push(obj_ref); - } - Owner::Shared(initial_shared_version) => { - shared_objects - .write() - .await - .entry(struct_tag) - .or_default() - .push((obj_ref.object_id, initial_shared_version)); - } - Owner::Address(address) => { - if let Some((gas_object, owned_objects)) = - accounts.get_mut(&address) - { - if obj.is_gas_coin() && gas_object.is_none() { - gas_object.replace(obj_ref); - } else { - owned_objects - .entry(struct_tag) - .or_default() - .insert(obj_ref); - } - } + let LiveObject::Normal(obj) = obj; + if let Some(struct_tag) = obj.struct_tag() { + let obj_ref = obj.compute_object_reference(); + match obj.owner { + Owner::Immutable => { + immutable_objects + .write() + .await + .entry(struct_tag) + .or_default() + .push(obj_ref); + } + Owner::Shared(initial_shared_version) => { + shared_objects + .write() + .await + .entry(struct_tag) + .or_default() + .push((obj_ref.object_id, initial_shared_version)); + } + Owner::Address(address) => { + if let Some((gas_object, owned_objects)) = accounts.get_mut(&address) { + if obj.is_gas_coin() && gas_object.is_none() { + gas_object.replace(obj_ref); + } else { + owned_objects.entry(struct_tag).or_default().insert(obj_ref); } - Owner::Object(_) => (), - _ => unimplemented!( - "a new enum variant was added and needs to be handled" - ), } } + Owner::Object(_) => (), + _ => { + unimplemented!("a new enum variant was added and needs to be handled") + } } - LiveObject::Wrapped(_) => unreachable!("Explicitly skipped wrapped objects"), } } let entry_functions = Arc::new(RwLock::new(vec![])); diff --git a/crates/iota-types/src/epoch_info.rs b/crates/iota-types/src/epoch_info.rs new file mode 100644 index 00000000000..055c19e7ca6 --- /dev/null +++ b/crates/iota-types/src/epoch_info.rs @@ -0,0 +1,124 @@ +// Copyright (c) 2026 IOTA Stiftung +// SPDX-License-Identifier: Apache-2.0 + +use serde::{Deserialize, Serialize}; + +use crate::{ + effects::TransactionEvents, + messages_checkpoint::{CertifiedCheckpointSummary, CheckpointSequenceNumber}, +}; + +/// Per-epoch metadata sufficient to rebuild a per-epoch summary table +/// without reading historical checkpoint contents. +/// +/// Stored as the value type of the `epoch_info` table on `CheckpointStore`, +/// alongside `epoch_last_checkpoint_map`. The table is populated incrementally +/// at each AdvanceEpoch transaction by the checkpoint executor and read by +/// the snapshot V2 writer to produce the snapshot's `EPOCH_INFO` file. +/// +/// Wire-format stability: this struct is BCS-encoded both in RocksDB (as the +/// value of `epoch_info`) and on the snapshot wire (embedded inside +/// `EpochInfo::V1`). Adding, removing, or reordering fields would corrupt +/// every existing on-disk row AND change the on-wire layout under +/// `EpochInfo::V1`. Any schema change therefore requires bumping +/// `EpochInfo::V2` AND providing a separate `EpochInfoEntryV2`, mirroring +/// the `StoreObjectV1`/`StoreObjectV2` migration pattern. The +/// `epoch_info_entry_field_order_is_locked` test below locks the BCS +/// field order against silent reordering. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct EpochInfoEntry { + /// The certified summary of the last checkpoint of this epoch. Carries + /// `end_of_epoch_data` (committee transition, next protocol version, epoch + /// commitments, supply change), the gas summary, the timestamp, and quorum + /// signatures so consumers can verify the entry against the prior epoch's + /// committee. + pub last_checkpoint_summary: CertifiedCheckpointSummary, + + /// First checkpoint sequence number of this epoch. For the genesis epoch + /// this is `0`; for later epochs it equals the previous entry's + /// `last_checkpoint_summary.sequence_number + 1`. + pub first_checkpoint: CheckpointSequenceNumber, + + /// Raw events emitted by the AdvanceEpoch transaction (the last + /// transaction of the epoch). Carries the `SystemEpochInfoEvent` from + /// which storage charges/rebates, fees, mint/burn amounts, and stake + /// rewards can be extracted. Stored as raw events so consumers can + /// pick the fields they need rather than committing to a fixed + /// projection here. + pub end_of_epoch_tx_events: TransactionEvents, +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + crypto::AuthorityStrongQuorumSignInfo, gas::GasCostSummary, message_envelope::Envelope, + messages_checkpoint::CheckpointSummary, + }; + + fn empty_checkpoint_summary() -> CheckpointSummary { + CheckpointSummary { + epoch: 0, + sequence_number: 0, + network_total_transactions: 0, + content_digest: Default::default(), + previous_digest: None, + epoch_rolling_gas_cost_summary: GasCostSummary::default(), + end_of_epoch_data: None, + timestamp_ms: 0, + version_specific_data: Vec::new(), + checkpoint_commitments: Vec::new(), + } + } + + fn empty_certified_summary() -> CertifiedCheckpointSummary { + let sig = AuthorityStrongQuorumSignInfo { + epoch: 0, + signature: Default::default(), + signers_map: Default::default(), + }; + Envelope::new_from_data_and_sig(empty_checkpoint_summary(), sig) + } + + /// Locks the BCS field order of `EpochInfoEntry` against silent + /// reordering. BCS encodes struct fields in declaration order, so + /// swapping any two fields would silently corrupt every on-disk row + /// in the `epoch_info` column family AND change the on-wire layout + /// under `EpochInfo::V1` in the snapshot. If a deliberate schema + /// change is required, follow the versioning recipe in the doc comment + /// on `EpochInfoEntry` (introduce `EpochInfoEntryV2` rather than + /// mutating this type). + /// + /// Asserts that `bcs(entry)` equals the concatenation + /// `bcs(last_checkpoint_summary) ++ first_checkpoint.to_le_bytes() + /// ++ bcs(end_of_epoch_tx_events)`. This both verifies the relative + /// order of the three fields and detects any encoding-shape change + /// in the inner types. + #[test] + fn epoch_info_entry_field_order_is_locked() { + let entry = EpochInfoEntry { + last_checkpoint_summary: empty_certified_summary(), + // Distinct, recognizable u64 — easy to spot in a hex dump if + // this assertion ever needs to be debugged. + first_checkpoint: 0xDEAD_BEEF_CAFE_F00D, + end_of_epoch_tx_events: TransactionEvents::default(), + }; + + let entry_bytes = bcs::to_bytes(&entry).expect("entry serialization"); + let summary_bytes = + bcs::to_bytes(&entry.last_checkpoint_summary).expect("summary serialization"); + let events_bytes = + bcs::to_bytes(&entry.end_of_epoch_tx_events).expect("events serialization"); + + let mut expected = Vec::with_capacity(entry_bytes.len()); + expected.extend_from_slice(&summary_bytes); + expected.extend_from_slice(&entry.first_checkpoint.to_le_bytes()); + expected.extend_from_slice(&events_bytes); + + assert_eq!( + entry_bytes, expected, + "EpochInfoEntry BCS layout changed; introduce EpochInfoEntryV2 \ + rather than mutating this type" + ); + } +} diff --git a/crates/iota-types/src/lib.rs b/crates/iota-types/src/lib.rs index 5f8702daf88..7ee045d65cd 100644 --- a/crates/iota-types/src/lib.rs +++ b/crates/iota-types/src/lib.rs @@ -46,6 +46,7 @@ pub mod display; pub mod dynamic_field; pub mod effects; pub mod epoch_data; +pub mod epoch_info; pub mod event; pub mod executable_transaction; pub mod execution;