Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.

serialize incremental_snapshot_hash #26839

Merged
merged 2 commits into from
Aug 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/src/accounts_hash_verifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ impl AccountsHashVerifier {
accounts_package.snapshot_links.path(),
accounts_package.slot,
&accounts_hash,
None,
);
datapoint_info!(
"accounts_hash_verifier",
Expand Down
3 changes: 3 additions & 0 deletions core/tests/snapshots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ fn run_bank_forks_snapshot_n<F>(
accounts_package.snapshot_links.path(),
accounts_package.slot,
&last_bank.get_accounts_hash(),
None,
);
let snapshot_package = SnapshotPackage::new(accounts_package, last_bank.get_accounts_hash());
snapshot_utils::archive_snapshot_package(
Expand Down Expand Up @@ -491,6 +492,7 @@ fn test_concurrent_snapshot_packaging(
accounts_package.snapshot_links.path(),
accounts_package.slot,
&Hash::default(),
None,
);
let snapshot_package = SnapshotPackage::new(accounts_package, Hash::default());
pending_snapshot_package
Expand Down Expand Up @@ -534,6 +536,7 @@ fn test_concurrent_snapshot_packaging(
saved_snapshots_dir.path(),
saved_slot,
&Hash::default(),
None,
);

snapshot_utils::verify_snapshot_archive(
Expand Down
26 changes: 26 additions & 0 deletions runtime/src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,25 @@ impl RentDebit {
}
}

/// Incremental snapshots only calculate their accounts hash based on the account changes WITHIN the incremental slot range.
/// So, we need to keep track of the full snapshot expected accounts hash results.
/// We also need to keep track of the hash and capitalization specific to the incremental snapshot slot range.
/// The capitalization we calculate for the incremental slot will NOT be consistent with the bank's capitalization.
/// It is not feasible to calculate a capitalization delta that is correct given just incremental slots account data and the full snapshot's capitalization.
#[derive(Serialize, Deserialize, AbiExample, Clone, Debug, Default, PartialEq, Eq)]
pub struct BankIncrementalSnapshotPersistence {
/// slot of full snapshot
pub full_slot: Slot,
/// accounts hash from the full snapshot
pub full_hash: Hash,
/// capitalization from the full snapshot
pub full_capitalization: u64,
/// hash of the accounts in the incremental snapshot slot range, including zero-lamport accounts
pub incremental_hash: Hash,
/// capitalization of the accounts in the incremental snapshot slot range
pub incremental_capitalization: u64,
}

#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct RentDebits(HashMap<Pubkey, RentDebit>);
impl RentDebits {
Expand Down Expand Up @@ -976,6 +995,7 @@ pub struct BankFieldsToDeserialize {
pub(crate) epoch_stakes: HashMap<Epoch, EpochStakes>,
pub(crate) is_delta: bool,
pub(crate) accounts_data_len: u64,
pub(crate) incremental_snapshot_persistence: Option<BankIncrementalSnapshotPersistence>,
}

// Bank's common fields shared by all supported snapshot versions for serialization.
Expand Down Expand Up @@ -1083,6 +1103,7 @@ impl PartialEq for Bank {
accounts_data_size_delta_on_chain: _,
accounts_data_size_delta_off_chain: _,
fee_structure: _,
incremental_snapshot_persistence: _,
// Ignore new fields explicitly if they do not impact PartialEq.
// Adding ".." will remove compile-time checks that if a new field
// is added to the struct, this ParitalEq is accordingly updated.
Expand Down Expand Up @@ -1336,6 +1357,8 @@ pub struct Bank {

/// Transaction fee structure
pub fee_structure: FeeStructure,

pub incremental_snapshot_persistence: Option<BankIncrementalSnapshotPersistence>,
}

struct VoteWithStakeDelegations {
Expand Down Expand Up @@ -1466,6 +1489,7 @@ impl Bank {

fn default_with_accounts(accounts: Accounts) -> Self {
let mut bank = Self {
incremental_snapshot_persistence: None,
rewrites_skipped_this_slot: Rewrites::default(),
rc: BankRc::new(accounts, Slot::default()),
status_cache: Arc::<RwLock<BankStatusCache>>::default(),
Expand Down Expand Up @@ -1765,6 +1789,7 @@ impl Bank {

let accounts_data_size_initial = parent.load_accounts_data_size();
let mut new = Bank {
incremental_snapshot_persistence: None,
rewrites_skipped_this_slot: Rewrites::default(),
rc,
status_cache,
Expand Down Expand Up @@ -2126,6 +2151,7 @@ impl Bank {
}
let feature_set = new();
let mut bank = Self {
incremental_snapshot_persistence: fields.incremental_snapshot_persistence,
rewrites_skipped_this_slot: Rewrites::default(),
rc: bank_rc,
status_cache: new(),
Expand Down
14 changes: 12 additions & 2 deletions runtime/src/serde_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use {
accounts_index::AccountSecondaryIndexes,
accounts_update_notifier_interface::AccountsUpdateNotifier,
append_vec::{AppendVec, StoredMetaWriteVersion},
bank::{Bank, BankFieldsToDeserialize, BankRc},
bank::{Bank, BankFieldsToDeserialize, BankIncrementalSnapshotPersistence, BankRc},
blockhash_queue::BlockhashQueue,
builtins::Builtins,
epoch_stakes::EpochStakes,
Expand Down Expand Up @@ -77,6 +77,7 @@ pub struct AccountsDbFields<T>(
/// slots that were roots within the last epoch for which we care about the hash value
#[serde(deserialize_with = "default_on_eof")]
Vec<(Slot, Hash)>,
// here?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if there's a better spot. Maybe adding a new struct for non-Bank and non-AccountsDb snapshot fields?

Would that break anything? Would the two vectors of roots need to move as well for that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That might be a better approach, having a struct for optional EOF fields. We also have to make sure we still keep the dependency when serializing, i.e. if we serialize optional field 4 we must also serialize optional fields 1-3 otherwise we'll have eof deserialization errors like before.

I think anything that is default_on_eof should be moved into there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AshwinSekar will investigate this afterwards.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created an issue to track #27202

);

/// Helper type to wrap BufReader streams when deserializing and reconstructing from either just a
Expand Down Expand Up @@ -193,6 +194,7 @@ trait TypeContext<'a>: PartialEq {
stream_reader: &mut BufReader<R>,
stream_writer: &mut BufWriter<W>,
accounts_hash: &Hash,
incremental_snapshot_persistence: Option<&BankIncrementalSnapshotPersistence>,
) -> std::result::Result<(), Box<bincode::ErrorKind>>
where
R: Read,
Expand Down Expand Up @@ -370,12 +372,18 @@ fn reserialize_bank_fields_with_new_hash<W, R>(
stream_reader: &mut BufReader<R>,
stream_writer: &mut BufWriter<W>,
accounts_hash: &Hash,
incremental_snapshot_persistence: Option<&BankIncrementalSnapshotPersistence>,
) -> Result<(), Error>
where
W: Write,
R: Read,
{
newer::Context::reserialize_bank_fields_with_hash(stream_reader, stream_writer, accounts_hash)
newer::Context::reserialize_bank_fields_with_hash(
stream_reader,
stream_writer,
accounts_hash,
incremental_snapshot_persistence,
)
}

/// effectively updates the accounts hash in the serialized bank file on disk
Expand All @@ -387,6 +395,7 @@ pub fn reserialize_bank_with_new_accounts_hash(
bank_snapshots_dir: impl AsRef<Path>,
slot: Slot,
accounts_hash: &Hash,
incremental_snapshot_persistence: Option<&BankIncrementalSnapshotPersistence>,
) -> bool {
let bank_post = snapshot_utils::get_bank_snapshots_dir(bank_snapshots_dir, slot);
let bank_post = bank_post.join(snapshot_utils::get_snapshot_file_name(slot));
Expand All @@ -404,6 +413,7 @@ pub fn reserialize_bank_with_new_accounts_hash(
&mut BufReader::new(file),
&mut BufWriter::new(file_out),
accounts_hash,
incremental_snapshot_persistence,
)
.unwrap();
}
Expand Down
17 changes: 15 additions & 2 deletions runtime/src/serde_snapshot/newer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ impl From<DeserializableVersionedBank> for BankFieldsToDeserialize {
stakes: dvb.stakes,
epoch_stakes: dvb.epoch_stakes,
is_delta: dvb.is_delta,
incremental_snapshot_persistence: None,
}
}
}
Expand Down Expand Up @@ -209,6 +210,7 @@ impl<'a> TypeContext<'a> for Context {
// we can grab it on restart.
// TODO: if we do a snapshot version bump, consider moving this out.
lamports_per_signature,
None::<BankIncrementalSnapshotPersistence>,
)
.serialize(serializer)
}
Expand Down Expand Up @@ -314,6 +316,10 @@ impl<'a> TypeContext<'a> for Context {
bank_fields.fee_rate_governor = bank_fields
.fee_rate_governor
.clone_with_lamports_per_signature(lamports_per_signature);

let incremental_snapshot_persistence = ignore_eof_error(deserialize_from(stream))?;
bank_fields.incremental_snapshot_persistence = incremental_snapshot_persistence;

Ok((bank_fields, accounts_db_fields))
}

Expand All @@ -327,12 +333,13 @@ impl<'a> TypeContext<'a> for Context {
}

/// deserialize the bank from 'stream_reader'
/// modify the accounts_hash
/// modify the accounts_hash and incremental_snapshot_persistence
/// reserialize the bank to 'stream_writer'
fn reserialize_bank_fields_with_hash<R, W>(
stream_reader: &mut BufReader<R>,
stream_writer: &mut BufWriter<W>,
accounts_hash: &Hash,
incremental_snapshot_persistence: Option<&BankIncrementalSnapshotPersistence>,
) -> std::result::Result<(), Box<bincode::ErrorKind>>
where
R: Read,
Expand All @@ -345,6 +352,7 @@ impl<'a> TypeContext<'a> for Context {
let blockhash_queue = RwLock::new(rhs.blockhash_queue.clone());
let hard_forks = RwLock::new(rhs.hard_forks.clone());
let lamports_per_signature = rhs.fee_rate_governor.lamports_per_signature;

let bank = SerializableVersionedBank {
blockhash_queue: &blockhash_queue,
ancestors: &rhs.ancestors,
Expand Down Expand Up @@ -382,7 +390,12 @@ impl<'a> TypeContext<'a> for Context {

bincode::serialize_into(
stream_writer,
&(bank, accounts_db_fields, lamports_per_signature),
&(
bank,
accounts_db_fields,
lamports_per_signature,
incremental_snapshot_persistence,
),
)
}
}
54 changes: 42 additions & 12 deletions runtime/src/serde_snapshot/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ fn test_bank_serialize_style(
serde_style: SerdeStyle,
reserialize_accounts_hash: bool,
update_accounts_hash: bool,
incremental_snapshot_persistence: bool,
) {
solana_logger::setup();
let (genesis_config, _) = create_genesis_config(500);
Expand Down Expand Up @@ -236,8 +237,18 @@ fn test_bank_serialize_style(
} else {
bank2.get_accounts_hash()
};
if reserialize_accounts_hash {
let slot = bank2.slot();

let slot = bank2.slot();
let incremental =
incremental_snapshot_persistence.then(|| BankIncrementalSnapshotPersistence {
full_slot: slot + 1,
full_hash: Hash::new(&[1; 32]),
full_capitalization: 31,
incremental_hash: Hash::new(&[2; 32]),
incremental_capitalization: 32,
});

if reserialize_accounts_hash || incremental_snapshot_persistence {
let temp_dir = TempDir::new().unwrap();
let slot_dir = temp_dir.path().join(slot.to_string());
let post_path = slot_dir.join(slot.to_string());
Expand All @@ -248,21 +259,32 @@ fn test_bank_serialize_style(
let mut f = std::fs::File::create(&pre_path).unwrap();
f.write_all(&buf).unwrap();
}

assert!(reserialize_bank_with_new_accounts_hash(
temp_dir.path(),
slot,
&accounts_hash
&accounts_hash,
incremental.as_ref(),
));
let previous_len = buf.len();
// larger buffer than expected to make sure the file isn't larger than expected
let mut buf_reserialized = vec![0; previous_len + 1];
let sizeof_none = std::mem::size_of::<u64>();
let sizeof_incremental_snapshot_persistence =
std::mem::size_of::<Option<BankIncrementalSnapshotPersistence>>();
let mut buf_reserialized =
vec![0; previous_len + sizeof_incremental_snapshot_persistence + 1];
{
let mut f = std::fs::File::open(post_path).unwrap();
let size = f.read(&mut buf_reserialized).unwrap();
assert_eq!(size, previous_len);
let expected = if !incremental_snapshot_persistence {
previous_len
} else {
previous_len + sizeof_incremental_snapshot_persistence - sizeof_none
};
assert_eq!(size, expected);
buf_reserialized.truncate(size);
}
if update_accounts_hash {
if update_accounts_hash || incremental_snapshot_persistence {
// We cannot guarantee buffer contents are exactly the same if hash is the same.
// Things like hashsets/maps have randomness in their in-mem representations.
// This make serialized bytes not deterministic.
Expand Down Expand Up @@ -311,6 +333,7 @@ fn test_bank_serialize_style(
assert_eq!(dbank.get_balance(&key3.pubkey()), 0);
assert_eq!(dbank.get_accounts_hash(), accounts_hash);
assert!(bank2 == dbank);
assert_eq!(dbank.incremental_snapshot_persistence, incremental);
}

pub(crate) fn reconstruct_accounts_db_via_serialization(
Expand Down Expand Up @@ -359,11 +382,18 @@ fn test_bank_serialize_newer() {
for (reserialize_accounts_hash, update_accounts_hash) in
[(false, false), (true, false), (true, true)]
{
test_bank_serialize_style(
SerdeStyle::Newer,
reserialize_accounts_hash,
update_accounts_hash,
)
for incremental_snapshot_persistence in if reserialize_accounts_hash {
[false, true].to_vec()
} else {
[false].to_vec()
} {
test_bank_serialize_style(
SerdeStyle::Newer,
reserialize_accounts_hash,
update_accounts_hash,
incremental_snapshot_persistence,
)
}
}
}

Expand Down Expand Up @@ -555,7 +585,7 @@ mod test_bank_serialize {

// This some what long test harness is required to freeze the ABI of
// Bank's serialization due to versioned nature
#[frozen_abi(digest = "9vGBt7YfymKUTPWLHVVpQbDtPD7dFDwXRMFkCzwujNqJ")]
#[frozen_abi(digest = "5py4Wkuj5fV2sLyA1MrPg4pGNwMEaygQLnpLyY8MMLGC")]
#[derive(Serialize, AbiExample)]
pub struct BankAbiTestWrapperNewer {
#[serde(serialize_with = "wrapper_newer")]
Expand Down
2 changes: 2 additions & 0 deletions runtime/src/snapshot_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2043,6 +2043,7 @@ pub fn package_and_archive_full_snapshot(
accounts_package.snapshot_links.path(),
accounts_package.slot,
&bank.get_accounts_hash(),
None,
);

let snapshot_package = SnapshotPackage::new(accounts_package, bank.get_accounts_hash());
Expand Down Expand Up @@ -2095,6 +2096,7 @@ pub fn package_and_archive_incremental_snapshot(
accounts_package.snapshot_links.path(),
accounts_package.slot,
&bank.get_accounts_hash(),
None,
);

let snapshot_package = SnapshotPackage::new(accounts_package, bank.get_accounts_hash());
Expand Down