Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 71 additions & 64 deletions crates/iota-core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ use crate::{
AuthorityStorePruner, AuthorityStorePruningMetrics, EPOCH_DURATION_MS_FOR_TESTING,
},
authority_store_tables::TotalIotaSupplyCheck,
authority_store_types::{StoreObject, StoreObjectWrapper, get_store_object},
authority_store_types::{
SENTINEL_PREVIOUS_TRANSACTION_CHECKPOINT, StoreObject, StoreObjectWrapper,
get_store_object,
},
epoch_start_configuration::{EpochFlag, EpochStartConfiguration},
},
global_state_hasher::GlobalStateHashStore,
Expand Down Expand Up @@ -707,8 +710,10 @@ impl AuthorityStore {
fn insert_object_direct(&self, object_ref: ObjectRef, object: &Object) -> IotaResult {
let mut write_batch = self.perpetual_tables.objects.batch();

// Insert object
let store_object = get_store_object(object.clone());
// Genesis objects are produced by the genesis checkpoint (sequence 0),
// so 0 is the real `previous_transaction_checkpoint` value here, not a
// sentinel.
let store_object = get_store_object(object.clone(), 0);
write_batch.insert_batch(
&self.perpetual_tables.objects,
std::iter::once((ObjectKey::from(object_ref), store_object)),
Expand Down Expand Up @@ -737,11 +742,12 @@ impl AuthorityStore {
.map(|o| (o.compute_object_reference(), o))
.collect();

// Genesis objects are produced by the genesis checkpoint (sequence 0).
batch.insert_batch(
&self.perpetual_tables.objects,
ref_and_objects
.iter()
.map(|(oref, o)| (ObjectKey::from(oref), get_store_object((*o).clone()))),
.map(|(oref, o)| (ObjectKey::from(oref), get_store_object((*o).clone(), 0))),
)?;

let non_child_object_refs: Vec<_> = ref_and_objects
Expand All @@ -766,33 +772,27 @@ impl AuthorityStore {
let mut batch = perpetual_db.objects.batch();
for object in live_objects {
hasher.update(object.object_reference().digest.inner());
match object {
LiveObject::Normal(object) => {
let store_object_wrapper = get_store_object(object.clone());
batch.insert_batch(
&perpetual_db.objects,
std::iter::once((
ObjectKey::from(object.compute_object_reference()),
store_object_wrapper,
)),
)?;
if !object.is_child_object() {
Self::initialize_live_object_markers(
&perpetual_db.live_owned_object_markers,
&mut batch,
&[object.compute_object_reference()],
)?;
}
}
LiveObject::Wrapped(object_key) => {
batch.insert_batch(
&perpetual_db.objects,
std::iter::once::<(ObjectKey, StoreObjectWrapper)>((
object_key,
StoreObject::Wrapped.into(),
)),
)?;
}
let LiveObject::Normal(object) = object;
// V2 ref records carry an 8-byte `previous_transaction_checkpoint`
// trailer, but `ObjectRefIter` reads past it without surfacing the
// value, so stamp the sentinel here.
// TODO(snapshot-v2-backfill): rewrite these rows once the iterator
// surfaces the trailer or a backfill is run.
let store_object_wrapper =
get_store_object(object.clone(), SENTINEL_PREVIOUS_TRANSACTION_CHECKPOINT);
batch.insert_batch(
&perpetual_db.objects,
std::iter::once((
ObjectKey::from(object.compute_object_reference()),
store_object_wrapper,
)),
)?;
if !object.is_child_object() {
Self::initialize_live_object_markers(
&perpetual_db.live_owned_object_markers,
&mut batch,
&[object.compute_object_reference()],
)?;
}
}
let sha3_digest = hasher.finalize().digest;
Expand Down Expand Up @@ -825,10 +825,18 @@ impl AuthorityStore {
/// Internally it checks that all locks for active inputs are at the correct
/// version, and then writes objects, certificates, parents and clean up
/// locks atomically.
///
/// `checkpoint_seq` is stamped onto each newly written object's
/// `previous_transaction_checkpoint` field. Callers that buffer execution
/// outputs until checkpoint commit time (e.g. `WritebackCache`) pass the
/// containing checkpoint's sequence number; callers that flush at
/// execution time before the containing checkpoint is known (e.g.
/// `PassthroughCache`) pass `SENTINEL_PREVIOUS_TRANSACTION_CHECKPOINT`.
#[instrument(level = "debug", skip_all)]
pub fn build_db_batch(
&self,
epoch_id: EpochId,
checkpoint_seq: CheckpointSequenceNumber,
tx_outputs: &[Arc<TransactionOutputs>],
) -> IotaResult<DBBatch> {
let mut written = Vec::with_capacity(tx_outputs.len());
Expand All @@ -838,7 +846,12 @@ impl AuthorityStore {

let mut write_batch = self.perpetual_tables.transactions.batch();
for outputs in tx_outputs {
self.write_one_transaction_outputs(&mut write_batch, epoch_id, outputs)?;
self.write_one_transaction_outputs(
&mut write_batch,
epoch_id,
checkpoint_seq,
outputs,
)?;
}
// test crashing before writing the batch
fail_point!("crash");
Expand All @@ -861,6 +874,7 @@ impl AuthorityStore {
&self,
write_batch: &mut DBBatch,
epoch_id: EpochId,
checkpoint_seq: CheckpointSequenceNumber,
tx_outputs: &TransactionOutputs,
) -> IotaResult {
let TransactionOutputs {
Expand Down Expand Up @@ -906,7 +920,7 @@ impl AuthorityStore {
let new_objects = written.iter().map(|(id, new_object)| {
let version = new_object.version();
trace!(?id, ?version, "writing object");
let store_object = get_store_object(new_object.clone());
let store_object = get_store_object(new_object.clone(), checkpoint_seq);
(ObjectKey(*id, version), store_object)
});

Expand Down Expand Up @@ -1465,38 +1479,31 @@ impl AuthorityStore {
let (mut total_iota, mut total_storage_rebate) = thread::scope(|s| {
let pending_tasks = FuturesUnordered::new();
for o in self.iter_live_object_set() {
match o {
LiveObject::Normal(object) => {
size += object.object_size_for_gas_metering();
count += 1;
pending_objects.push(object);
if count % 1_000_000 == 0 {
let mut task_objects = vec![];
mem::swap(&mut pending_objects, &mut task_objects);
pending_tasks.push(s.spawn(move || {
let mut layout_resolver =
executor.type_layout_resolver(Box::new(type_layout_store));
let mut total_storage_rebate = 0;
let mut total_iota = 0;
for object in task_objects {
total_storage_rebate += object.storage_rebate;
// get_total_iota includes storage rebate, however all storage
// rebate is also stored in
// the storage fund, so we need to subtract it here.
total_iota +=
object.get_total_iota(layout_resolver.as_mut()).unwrap()
- object.storage_rebate;
}
if count % 50_000_000 == 0 {
info!("Processed {} objects", count);
}
(total_iota, total_storage_rebate)
}));
let LiveObject::Normal(object) = o;
size += object.object_size_for_gas_metering();
count += 1;
pending_objects.push(object);
if count % 1_000_000 == 0 {
let mut task_objects = vec![];
mem::swap(&mut pending_objects, &mut task_objects);
pending_tasks.push(s.spawn(move || {
let mut layout_resolver =
executor.type_layout_resolver(Box::new(type_layout_store));
let mut total_storage_rebate = 0;
let mut total_iota = 0;
for object in task_objects {
total_storage_rebate += object.storage_rebate;
// get_total_iota includes storage rebate, however all storage
// rebate is also stored in
// the storage fund, so we need to subtract it here.
total_iota += object.get_total_iota(layout_resolver.as_mut()).unwrap()
- object.storage_rebate;
}
}
LiveObject::Wrapped(_) => {
unreachable!("Explicitly asked to not include wrapped tombstones")
}
if count % 50_000_000 == 0 {
info!("Processed {} objects", count);
}
(total_iota, total_storage_rebate)
}));
}
}
pending_tasks.into_iter().fold((0, 0), |init, result| {
Expand Down
17 changes: 13 additions & 4 deletions crates/iota-core/src/authority/authority_store_pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -952,7 +952,10 @@ mod tests {
use crate::authority::{
authority_store_pruner::AuthorityStorePruningMetrics,
authority_store_tables::AuthorityPerpetualTables,
authority_store_types::{StoreObject, StoreObjectWrapper, get_store_object},
authority_store_types::{
SENTINEL_PREVIOUS_TRANSACTION_CHECKPOINT, StoreObject, StoreObjectWrapper,
get_store_object,
},
};

fn get_keys_after_pruning(path: &Path) -> anyhow::Result<HashSet<ObjectKey>> {
Expand Down Expand Up @@ -1008,7 +1011,10 @@ mod tests {
} else {
to_delete.push(object_key);
}
let obj = get_store_object(Object::immutable_with_id_for_testing(id));
let obj = get_store_object(
Object::immutable_with_id_for_testing(id),
SENTINEL_PREVIOUS_TRANSACTION_CHECKPOINT,
);
batch.insert_batch(
&db.objects,
[(ObjectKey(id, SequenceNumber::from(seq)), obj.clone())],
Expand All @@ -1021,7 +1027,7 @@ mod tests {
println!("Adding tombstone object {tombstone_key:?}");
batch.insert_batch(
&db.objects,
[(tombstone_key, StoreObjectWrapper::V1(StoreObject::Deleted))],
[(tombstone_key, StoreObjectWrapper::V2(StoreObject::Deleted))],
)?;
tombstones.push(tombstone_key);
}
Expand Down Expand Up @@ -1124,7 +1130,10 @@ mod tests {
if i < num_versions_per_object - 2 {
to_delete.push((id, SequenceNumber::from(i)));
}
let obj = get_store_object(Object::immutable_with_id_for_testing(id));
let obj = get_store_object(
Object::immutable_with_id_for_testing(id),
SENTINEL_PREVIOUS_TRANSACTION_CHECKPOINT,
);
perpetual_db
.objects
.insert(&ObjectKey(id, SequenceNumber::from(i)), &obj)?;
Expand Down
Loading
Loading