Skip to content

Commit

Permalink
[reconfig] Allow sync last checkpoint tx when validator is halted
Browse files Browse the repository at this point in the history
  • Loading branch information
lxfind committed Sep 7, 2022
1 parent bca16cd commit 483f736
Show file tree
Hide file tree
Showing 6 changed files with 292 additions and 105 deletions.
29 changes: 25 additions & 4 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ impl AuthorityState {
}

let resp = self
.process_certificate(tx_guard, &certificate)
.process_certificate(tx_guard, &certificate, true)
.await
.tap_err(|e| debug!(?digest, "process_certificate failed: {}", e))?;

Expand All @@ -487,6 +487,23 @@ impl AuthorityState {
pub async fn handle_certificate(
&self,
certificate: CertifiedTransaction,
) -> SuiResult<TransactionInfoResponse> {
self.handle_certificate_impl(certificate, false).await
}

#[instrument(level = "trace", skip_all)]
pub async fn handle_certificate_bypass_validator_halt(
&self,
certificate: CertifiedTransaction,
) -> SuiResult<TransactionInfoResponse> {
self.handle_certificate_impl(certificate, true).await
}

#[instrument(level = "trace", skip_all)]
async fn handle_certificate_impl(
&self,
certificate: CertifiedTransaction,
bypass_validator_halt: bool,
) -> SuiResult<TransactionInfoResponse> {
self.metrics.total_cert_attempts.inc();
if self.is_fullnode() {
Expand All @@ -510,7 +527,7 @@ impl AuthorityState {
// for every tx.
let tx_guard = self.database.acquire_tx_guard(&certificate).await?;

self.process_certificate(tx_guard, &certificate)
self.process_certificate(tx_guard, &certificate, bypass_validator_halt)
.await
.tap_err(|e| debug!(?digest, "process_certificate failed: {}", e))
}
Expand Down Expand Up @@ -577,6 +594,7 @@ impl AuthorityState {
&self,
tx_guard: CertTxGuard<'_>,
certificate: &CertifiedTransaction,
bypass_validator_halt: bool,
) -> SuiResult<TransactionInfoResponse> {
let digest = *certificate.digest();
// The cert could have been processed by a concurrent attempt of the same cert, so check if
Expand All @@ -586,7 +604,10 @@ impl AuthorityState {
return Ok(info);
}

if self.is_halted() && !certificate.signed_data.data.kind.is_system_tx() {
if self.is_halted()
&& !bypass_validator_halt
&& !certificate.signed_data.data.kind.is_system_tx()
{
tx_guard.release();
// TODO: Do we want to include the new validator set?
return Err(SuiError::ValidatorHaltedAtEpochEnd);
Expand Down Expand Up @@ -1261,7 +1282,7 @@ impl AuthorityState {
continue;
}

if let Err(e) = self.process_certificate(tx_guard, &cert).await {
if let Err(e) = self.process_certificate(tx_guard, &cert, false).await {
warn!(?digest, "Failed to process in-progress certificate: {}", e);
}
} else {
Expand Down
82 changes: 50 additions & 32 deletions crates/sui-core/src/authority_active/checkpoint_driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -636,14 +636,14 @@ where
&available_authorities,
)
.await?;
state_checkpoints
.lock()
.process_new_checkpoint_certificate(
checkpoint,
&contents,
committee,
active_authority.state.database.clone(),
)?;
process_new_checkpoint_certificate(
active_authority,
state_checkpoints,
committee,
checkpoint,
&contents,
)
.await?;
info!(
cp_seq=?checkpoint.summary.sequence_number(),
"Stored new checkpoint certificate",
Expand Down Expand Up @@ -697,37 +697,55 @@ where
let (past, contents) =
get_one_checkpoint_with_contents(net.clone(), seq, &available_authorities).await?;

let errors = active_authority
.node_sync_handle()
.sync_checkpoint_cert_transactions(&contents)
.await?
.zip(futures::stream::iter(contents.iter()))
.filter_map(|(r, digests)| async move {
r.map_err(|e| {
info!(?digests, "failed to execute digest from checkpoint: {}", e);
e
})
.err()
})
.collect::<Vec<SuiError>>()
.await;

if !errors.is_empty() {
let error = "Failed to sync transactions in checkpoint".to_string();
error!(?seq, "{}", error);
return Err(SuiError::CheckpointingError { error });
}

checkpoint_db.lock().process_synced_checkpoint_certificate(
process_new_checkpoint_certificate(
active_authority,
&checkpoint_db,
&net.committee,
&past,
&contents,
&net.committee,
)?;
)
.await?;
}

Ok(())
}

async fn process_new_checkpoint_certificate<A>(
active_authority: &ActiveAuthority<A>,
checkpoint_db: &Arc<Mutex<CheckpointStore>>,
committee: &Committee,
checkpoint_cert: &CertifiedCheckpointSummary,
contents: &CheckpointContents,
) -> SuiResult
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
let errors = active_authority
.node_sync_handle()
.sync_checkpoint_cert_transactions(contents)
.await?
.zip(futures::stream::iter(contents.iter()))
.filter_map(|(r, digests)| async move {
r.map_err(|e| {
info!(?digests, "failed to execute digest from checkpoint: {}", e);
e
})
.err()
})
.collect::<Vec<SuiError>>()
.await;

if !errors.is_empty() {
let error = "Failed to sync transactions in checkpoint".to_string();
error!(cp_seq=?checkpoint_cert.summary.sequence_number, "{}", error);
return Err(SuiError::CheckpointingError { error });
}

checkpoint_db
.lock()
.process_synced_checkpoint_certificate(checkpoint_cert, contents, committee)
}

pub async fn get_one_checkpoint_with_contents<A>(
net: Arc<AuthorityAggregator<A>>,
sequence_number: CheckpointSequenceNumber,
Expand Down
4 changes: 3 additions & 1 deletion crates/sui-core/src/checkpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,8 +506,10 @@ impl CheckpointStore {

// Send to consensus for sequencing.
if let Some(sender) = &self.sender {
debug!("Send fragment: {} -- {}", self.name, other_name);
let seq = fragment.proposer.summary.sequence_number;
debug!(cp_seq=?seq, "Sending fragment: {} -- {}", self.name, other_name);
sender.send_to_consensus(fragment.clone())?;
debug!(cp_seq=?seq, "Fragment successfully sent: {} -- {}", self.name, other_name);
} else {
return Err(SuiError::from("No consensus sender configured"));
}
Expand Down
45 changes: 40 additions & 5 deletions crates/sui-core/src/node_sync/node_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ impl DigestsMessage {
}
}

fn new_for_pending_ckpt(digest: &TransactionDigest, tx: oneshot::Sender<SyncResult>) -> Self {
Self {
sync_arg: SyncArg::PendingCheckpoint(*digest),
tx: Some(tx),
}
}

fn new_for_exec_driver(digest: &TransactionDigest, tx: oneshot::Sender<SyncResult>) -> Self {
Self {
sync_arg: SyncArg::ExecDriver(*digest),
Expand Down Expand Up @@ -140,6 +147,13 @@ pub enum SyncArg {
/// In checkpoint mode, all txes are known to be final.
Checkpoint(ExecutionDigests),

/// Transactions in the current checkpoint to be signed/stored.
/// We don't have the effect digest since we may not have it when constructing the checkpoint.
/// The primary difference between PendingCheckpoint and ExecDriver is that PendingCheckpoint
/// sync can by-pass validator halting. This is to ensure that the last checkpoint of the epoch
/// can always be formed when there are missing transactions.
PendingCheckpoint(TransactionDigest),

/// Used by the execution driver to execute pending certs. No effects digest is provided,
/// because this mode is used on validators only, who must compute the effects digest
/// themselves - they cannot trust some other validator's version of the effects because that
Expand All @@ -165,7 +179,9 @@ impl SyncArg {
effects,
},
) => (transaction, Some(effects)),
SyncArg::Parent(digest) | SyncArg::ExecDriver(digest) => (digest, None),
SyncArg::Parent(digest)
| SyncArg::ExecDriver(digest)
| SyncArg::PendingCheckpoint(digest) => (digest, None),
}
}
}
Expand Down Expand Up @@ -429,11 +445,19 @@ where
&self,
permit: OwnedSemaphorePermit,
digest: &TransactionDigest,
bypass_validator_halt: bool,
) -> SyncResult {
trace!(?digest, "validator pending execution requested");
let cert = self.get_cert(digest).await?;

match self.state.handle_certificate(cert.clone()).await {
let result = if bypass_validator_halt {
self.state
.handle_certificate_bypass_validator_halt(cert.clone())
.await
} else {
self.state.handle_certificate(cert.clone()).await
};
match result {
Ok(_) => Ok(SyncStatus::CertExecuted),
Err(SuiError::ObjectNotFound { .. }) | Err(SuiError::ObjectErrors { .. }) => {
debug!(?digest, "cert execution failed due to missing parents");
Expand All @@ -449,7 +473,13 @@ where

// Parents have been executed, so this should now succeed.
debug!(?digest, "parents executed, re-attempting cert");
self.state.handle_certificate(cert.clone()).await?;
if bypass_validator_halt {
self.state
.handle_certificate_bypass_validator_halt(cert.clone())
.await
} else {
self.state.handle_certificate(cert.clone()).await
}?;
Ok(SyncStatus::CertExecuted)
}
Err(e) => Err(e),
Expand Down Expand Up @@ -488,7 +518,12 @@ where
// down.
let (digests, authorities_with_cert) = match arg {
SyncArg::ExecDriver(digest) => {
return self.process_exec_driver_digest(permit, &digest).await;
return self
.process_exec_driver_digest(permit, &digest, false)
.await;
}
SyncArg::PendingCheckpoint(digest) => {
return self.process_exec_driver_digest(permit, &digest, true).await;
}
SyncArg::Parent(digest) => {
// digest is known to be final because it appeared in the dependencies list of a
Expand Down Expand Up @@ -867,7 +902,7 @@ impl NodeSyncHandle {
let mut futures = FuturesOrdered::new();
for digests in transactions {
let (tx, rx) = oneshot::channel();
let msg = DigestsMessage::new_for_exec_driver(&digests.transaction, tx);
let msg = DigestsMessage::new_for_pending_ckpt(&digests.transaction, tx);
Self::send_msg_with_tx(self.sender.clone(), msg).await?;
futures.push_back(Self::map_rx(rx));
}
Expand Down
Loading

0 comments on commit 483f736

Please sign in to comment.