Skip to content

Commit

Permalink
[checkpoint] local_fragments should be indexed on sequence number as …
Browse files Browse the repository at this point in the history
…well (#4502)
  • Loading branch information
lxfind authored Sep 7, 2022
1 parent 624aafa commit 674d133
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 29 deletions.
9 changes: 6 additions & 3 deletions crates/sui-core/src/authority_active/checkpoint_driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -777,20 +777,23 @@ pub async fn create_fragments<A>(
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
let next_cp_seq = checkpoint_db.lock().next_checkpoint();

let mut available_authorities = committee.shuffle_by_stake(None, None);
// Remove ourselves and all validators that we have already diffed with.
let already_fragmented = checkpoint_db.lock().validators_already_fragmented_with();
let already_fragmented = checkpoint_db
.lock()
.validators_already_fragmented_with(next_cp_seq);
// TODO: We can also use AuthorityHealth to pick healthy authorities first.
available_authorities
.retain(|name| name != &active_authority.state.name && !already_fragmented.contains(name));
debug!(
?next_cp_seq,
fragmented_count=?already_fragmented.len(),
to_be_fragmented_count=?available_authorities.len(),
"Going through remaining validators to generate fragments",
);

let next_cp_seq = checkpoint_db.lock().next_checkpoint();

let result = checkpoint_db
.lock()
.attempt_to_construct_checkpoint(committee);
Expand Down
71 changes: 45 additions & 26 deletions crates/sui-core/src/checkpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use narwhal_executor::ExecutionIndices;
use rocksdb::Options;
use serde::{Deserialize, Serialize};
use std::collections::BTreeSet;
use std::{collections::HashSet, path::Path, sync::Arc};
use std::{path::Path, sync::Arc};
use sui_storage::default_db_options;
use sui_types::messages_checkpoint::{CheckpointProposal, CheckpointProposalContents};
use sui_types::{
Expand Down Expand Up @@ -110,7 +110,7 @@ pub struct CheckpointStoreTables {
// to. These are used for the local node to potentially reconstruct the full
// transaction set.
#[default_options_override_fn = "local_fragments_table_default_config"]
pub local_fragments: DBMap<AuthorityName, CheckpointFragment>,
pub local_fragments: DBMap<(CheckpointSequenceNumber, AuthorityName), CheckpointFragment>,

/// Store the fragments received in order, the counter is purely internal,
/// to allow us to provide a list in order they were received. We only store
Expand Down Expand Up @@ -419,7 +419,17 @@ impl CheckpointStore {
)?
.delete_batch(
&self.tables.local_fragments,
self.tables.local_fragments.keys(),
self.tables
.local_fragments
.iter()
.filter_map(|((seq, name), _)| {
// Delete all keys for checkpoints smaller than what we are committing now.
if seq <= checkpoint_sequence_number {
Some((seq, name))
} else {
None
}
}),
)?;

// Update the transactions databases.
Expand Down Expand Up @@ -480,14 +490,19 @@ impl CheckpointStore {
} else {
fragment.proposer.authority()
};
if !self.tables.local_fragments.contains_key(other_name)? {
self.tables.local_fragments.insert(other_name, fragment)?;
} else {
// We already have this fragment, so we can ignore it.
if self
.tables
.local_fragments
.contains_key(&(next_checkpoint_seq, *other_name))?
{
// If we already have this fragment, we can ignore it.
return Err(SuiError::GenericAuthorityError {
error: format!("Already processed fragment with {:?}", other_name),
});
}
self.tables
.local_fragments
.insert(&(next_checkpoint_seq, *other_name), fragment)?;

// Send to consensus for sequencing.
if let Some(sender) = &self.sender {
Expand Down Expand Up @@ -535,18 +550,16 @@ impl CheckpointStore {
self.tables.fragments.insert(&seq, &fragment)?;

// If the fragment contains us also save it in the list of local fragments
let next_sequence_number = self.next_checkpoint();
if fragment.proposer.summary.sequence_number == next_sequence_number {
if fragment.proposer.authority() == &self.name {
self.tables
.local_fragments
.insert(fragment.other.authority(), &fragment)?;
}
if fragment.other.authority() == &self.name {
self.tables
.local_fragments
.insert(fragment.proposer.authority(), &fragment)?;
}
let fragment_seq = fragment.proposer.summary.sequence_number;
if fragment.proposer.authority() == &self.name {
self.tables
.local_fragments
.insert(&(fragment_seq, *fragment.other.authority()), &fragment)?;
}
if fragment.other.authority() == &self.name {
self.tables
.local_fragments
.insert(&(fragment_seq, *fragment.proposer.authority()), &fragment)?;
}

Ok(())
Expand Down Expand Up @@ -644,17 +657,20 @@ impl CheckpointStore {
}

// Strategy 2 to reconstruct checkpoint -- There is a link between us and the checkpoint set

let local_links: HashSet<_> = self.tables.local_fragments.keys().collect();
let checkpoint_keys: HashSet<_> = reconstructed
let local_links = self.validators_already_fragmented_with(next_sequence_number);
let checkpoint_keys: BTreeSet<_> = reconstructed
.global
.authority_waypoints
.keys()
.cloned()
.collect();

if let Some(auth) = local_links.intersection(&checkpoint_keys).next() {
let fragment = self.tables.local_fragments.get(auth)?.unwrap();
let fragment = self
.tables
.local_fragments
.get(&(next_sequence_number, *auth))?
.unwrap();

// Extract the diff
let diff = if fragment.proposer.authority() == &self.name {
Expand Down Expand Up @@ -788,11 +804,14 @@ impl CheckpointStore {
next_seq % CHECKPOINT_COUNT_PER_EPOCH == 1 && next_seq != 1
}

pub fn validators_already_fragmented_with(&mut self) -> BTreeSet<AuthorityName> {
pub fn validators_already_fragmented_with(
&mut self,
next_seq: CheckpointSequenceNumber,
) -> BTreeSet<AuthorityName> {
self.tables
.local_fragments
.iter()
.map(|(name, _)| name)
.keys()
.filter_map(|(seq, name)| if seq == next_seq { Some(name) } else { None })
.collect()
}

Expand Down

0 comments on commit 674d133

Please sign in to comment.