diff --git a/crates/sui-core/src/authority.rs b/crates/sui-core/src/authority.rs index 572393021a3af..6a836bb13c4c2 100644 --- a/crates/sui-core/src/authority.rs +++ b/crates/sui-core/src/authority.rs @@ -465,7 +465,7 @@ impl AuthorityState { } let resp = self - .process_certificate(tx_guard, &certificate) + .process_certificate(tx_guard, &certificate, true) .await .tap_err(|e| debug!(?digest, "process_certificate failed: {}", e))?; @@ -487,6 +487,23 @@ impl AuthorityState { pub async fn handle_certificate( &self, certificate: CertifiedTransaction, + ) -> SuiResult { + self.handle_certificate_impl(certificate, false).await + } + + #[instrument(level = "trace", skip_all)] + pub async fn handle_certificate_bypass_validator_halt( + &self, + certificate: CertifiedTransaction, + ) -> SuiResult { + self.handle_certificate_impl(certificate, true).await + } + + #[instrument(level = "trace", skip_all)] + async fn handle_certificate_impl( + &self, + certificate: CertifiedTransaction, + bypass_validator_halt: bool, ) -> SuiResult { self.metrics.total_cert_attempts.inc(); if self.is_fullnode() { @@ -510,7 +527,7 @@ impl AuthorityState { // for every tx. let tx_guard = self.database.acquire_tx_guard(&certificate).await?; - self.process_certificate(tx_guard, &certificate) + self.process_certificate(tx_guard, &certificate, bypass_validator_halt) .await .tap_err(|e| debug!(?digest, "process_certificate failed: {}", e)) } @@ -577,6 +594,7 @@ impl AuthorityState { &self, tx_guard: CertTxGuard<'_>, certificate: &CertifiedTransaction, + bypass_validator_halt: bool, ) -> SuiResult { let digest = *certificate.digest(); // The cert could have been processed by a concurrent attempt of the same cert, so check if @@ -586,7 +604,10 @@ impl AuthorityState { return Ok(info); } - if self.is_halted() && !certificate.signed_data.data.kind.is_system_tx() { + if self.is_halted() + && !bypass_validator_halt + && !certificate.signed_data.data.kind.is_system_tx() + { tx_guard.release(); // TODO: Do we want to include the new validator set? return Err(SuiError::ValidatorHaltedAtEpochEnd); @@ -1261,7 +1282,7 @@ impl AuthorityState { continue; } - if let Err(e) = self.process_certificate(tx_guard, &cert).await { + if let Err(e) = self.process_certificate(tx_guard, &cert, false).await { warn!(?digest, "Failed to process in-progress certificate: {}", e); } } else { diff --git a/crates/sui-core/src/authority_active/checkpoint_driver/mod.rs b/crates/sui-core/src/authority_active/checkpoint_driver/mod.rs index 4dad2e2dc1246..ce59d2db22277 100644 --- a/crates/sui-core/src/authority_active/checkpoint_driver/mod.rs +++ b/crates/sui-core/src/authority_active/checkpoint_driver/mod.rs @@ -636,14 +636,14 @@ where &available_authorities, ) .await?; - state_checkpoints - .lock() - .process_new_checkpoint_certificate( - checkpoint, - &contents, - committee, - active_authority.state.database.clone(), - )?; + process_new_checkpoint_certificate( + active_authority, + state_checkpoints, + committee, + checkpoint, + &contents, + ) + .await?; info!( cp_seq=?checkpoint.summary.sequence_number(), "Stored new checkpoint certificate", @@ -697,37 +697,55 @@ where let (past, contents) = get_one_checkpoint_with_contents(net.clone(), seq, &available_authorities).await?; - let errors = active_authority - .node_sync_handle() - .sync_checkpoint_cert_transactions(&contents) - .await? - .zip(futures::stream::iter(contents.iter())) - .filter_map(|(r, digests)| async move { - r.map_err(|e| { - info!(?digests, "failed to execute digest from checkpoint: {}", e); - e - }) - .err() - }) - .collect::>() - .await; - - if !errors.is_empty() { - let error = "Failed to sync transactions in checkpoint".to_string(); - error!(?seq, "{}", error); - return Err(SuiError::CheckpointingError { error }); - } - - checkpoint_db.lock().process_synced_checkpoint_certificate( + process_new_checkpoint_certificate( + active_authority, + &checkpoint_db, + &net.committee, &past, &contents, - &net.committee, - )?; + ) + .await?; } Ok(()) } +async fn process_new_checkpoint_certificate( + active_authority: &ActiveAuthority, + checkpoint_db: &Arc>, + committee: &Committee, + checkpoint_cert: &CertifiedCheckpointSummary, + contents: &CheckpointContents, +) -> SuiResult +where + A: AuthorityAPI + Send + Sync + 'static + Clone, +{ + let errors = active_authority + .node_sync_handle() + .sync_checkpoint_cert_transactions(contents) + .await? + .zip(futures::stream::iter(contents.iter())) + .filter_map(|(r, digests)| async move { + r.map_err(|e| { + info!(?digests, "failed to execute digest from checkpoint: {}", e); + e + }) + .err() + }) + .collect::>() + .await; + + if !errors.is_empty() { + let error = "Failed to sync transactions in checkpoint".to_string(); + error!(cp_seq=?checkpoint_cert.summary.sequence_number, "{}", error); + return Err(SuiError::CheckpointingError { error }); + } + + checkpoint_db + .lock() + .process_synced_checkpoint_certificate(checkpoint_cert, contents, committee) +} + pub async fn get_one_checkpoint_with_contents( net: Arc>, sequence_number: CheckpointSequenceNumber, diff --git a/crates/sui-core/src/checkpoints/mod.rs b/crates/sui-core/src/checkpoints/mod.rs index 29eea907dcda5..f26e165c09f13 100644 --- a/crates/sui-core/src/checkpoints/mod.rs +++ b/crates/sui-core/src/checkpoints/mod.rs @@ -506,8 +506,10 @@ impl CheckpointStore { // Send to consensus for sequencing. if let Some(sender) = &self.sender { - debug!("Send fragment: {} -- {}", self.name, other_name); + let seq = fragment.proposer.summary.sequence_number; + debug!(cp_seq=?seq, "Sending fragment: {} -- {}", self.name, other_name); sender.send_to_consensus(fragment.clone())?; + debug!(cp_seq=?seq, "Fragment successfully sent: {} -- {}", self.name, other_name); } else { return Err(SuiError::from("No consensus sender configured")); } diff --git a/crates/sui-core/src/node_sync/node_state.rs b/crates/sui-core/src/node_sync/node_state.rs index e651a5e9b8fe6..dad4eb88b6f8c 100644 --- a/crates/sui-core/src/node_sync/node_state.rs +++ b/crates/sui-core/src/node_sync/node_state.rs @@ -102,6 +102,13 @@ impl DigestsMessage { } } + fn new_for_pending_ckpt(digest: &TransactionDigest, tx: oneshot::Sender) -> Self { + Self { + sync_arg: SyncArg::PendingCheckpoint(*digest), + tx: Some(tx), + } + } + fn new_for_exec_driver(digest: &TransactionDigest, tx: oneshot::Sender) -> Self { Self { sync_arg: SyncArg::ExecDriver(*digest), @@ -140,6 +147,13 @@ pub enum SyncArg { /// In checkpoint mode, all txes are known to be final. Checkpoint(ExecutionDigests), + /// Transactions in the current checkpoint to be signed/stored. + /// We don't have the effect digest since we may not have it when constructing the checkpoint. + /// The primary difference between PendingCheckpoint and ExecDriver is that PendingCheckpoint + /// sync can by-pass validator halting. This is to ensure that the last checkpoint of the epoch + /// can always be formed when there are missing transactions. + PendingCheckpoint(TransactionDigest), + /// Used by the execution driver to execute pending certs. No effects digest is provided, /// because this mode is used on validators only, who must compute the effects digest /// themselves - they cannot trust some other validator's version of the effects because that @@ -165,7 +179,9 @@ impl SyncArg { effects, }, ) => (transaction, Some(effects)), - SyncArg::Parent(digest) | SyncArg::ExecDriver(digest) => (digest, None), + SyncArg::Parent(digest) + | SyncArg::ExecDriver(digest) + | SyncArg::PendingCheckpoint(digest) => (digest, None), } } } @@ -429,11 +445,19 @@ where &self, permit: OwnedSemaphorePermit, digest: &TransactionDigest, + bypass_validator_halt: bool, ) -> SyncResult { trace!(?digest, "validator pending execution requested"); let cert = self.get_cert(digest).await?; - match self.state.handle_certificate(cert.clone()).await { + let result = if bypass_validator_halt { + self.state + .handle_certificate_bypass_validator_halt(cert.clone()) + .await + } else { + self.state.handle_certificate(cert.clone()).await + }; + match result { Ok(_) => Ok(SyncStatus::CertExecuted), Err(SuiError::ObjectNotFound { .. }) | Err(SuiError::ObjectErrors { .. }) => { debug!(?digest, "cert execution failed due to missing parents"); @@ -449,7 +473,13 @@ where // Parents have been executed, so this should now succeed. debug!(?digest, "parents executed, re-attempting cert"); - self.state.handle_certificate(cert.clone()).await?; + if bypass_validator_halt { + self.state + .handle_certificate_bypass_validator_halt(cert.clone()) + .await + } else { + self.state.handle_certificate(cert.clone()).await + }?; Ok(SyncStatus::CertExecuted) } Err(e) => Err(e), @@ -488,7 +518,12 @@ where // down. let (digests, authorities_with_cert) = match arg { SyncArg::ExecDriver(digest) => { - return self.process_exec_driver_digest(permit, &digest).await; + return self + .process_exec_driver_digest(permit, &digest, false) + .await; + } + SyncArg::PendingCheckpoint(digest) => { + return self.process_exec_driver_digest(permit, &digest, true).await; } SyncArg::Parent(digest) => { // digest is known to be final because it appeared in the dependencies list of a @@ -867,7 +902,7 @@ impl NodeSyncHandle { let mut futures = FuturesOrdered::new(); for digests in transactions { let (tx, rx) = oneshot::channel(); - let msg = DigestsMessage::new_for_exec_driver(&digests.transaction, tx); + let msg = DigestsMessage::new_for_pending_ckpt(&digests.transaction, tx); Self::send_msg_with_tx(self.sender.clone(), msg).await?; futures.push_back(Self::map_rx(rx)); } diff --git a/crates/sui/tests/reconfiguration_tests.rs b/crates/sui/tests/reconfiguration_tests.rs index 7e23f2fb92fa7..29507e17b212f 100644 --- a/crates/sui/tests/reconfiguration_tests.rs +++ b/crates/sui/tests/reconfiguration_tests.rs @@ -3,28 +3,26 @@ use futures::future::join_all; use multiaddr::Multiaddr; +use std::time::Duration; use sui_config::ValidatorInfo; use sui_core::authority_active::checkpoint_driver::{ checkpoint_process_step, CheckpointProcessControl, }; -use sui_core::authority_client::AuthorityAPI; -use sui_core::safe_client::SafeClient; use sui_node::SuiNode; -use sui_types::base_types::{ObjectID, ObjectRef}; +use sui_types::base_types::{ObjectRef, SequenceNumber, SuiAddress}; use sui_types::crypto::{ generate_proof_of_possession, get_key_pair, AccountKeyPair, AuthorityKeyPair, AuthoritySignature, KeypairTraits, }; use sui_types::error::SuiResult; -use sui_types::messages::ObjectInfoResponse; -use sui_types::messages::{CallArg, ObjectArg, ObjectInfoRequest, TransactionEffects}; +use sui_types::messages::{CallArg, ObjectArg, TransactionEffects}; use sui_types::object::Object; use sui_types::SUI_SYSTEM_STATE_OBJECT_ID; -use test_utils::authority::test_authority_configs; -use test_utils::messages::move_transaction; +use test_utils::authority::{get_object, test_authority_configs}; +use test_utils::messages::{make_transfer_sui_transaction, move_transaction}; use test_utils::objects::{generate_gas_object_with_balance, test_gas_objects}; use test_utils::test_account_keys; -use test_utils::transaction::submit_shared_object_transaction; +use test_utils::transaction::{submit_shared_object_transaction, submit_single_owner_transaction}; #[tokio::test(flavor = "current_thread")] async fn reconfig_end_to_end_tests() { @@ -85,8 +83,99 @@ async fn reconfig_end_to_end_tests() { let new_committee_size = sui_system_state.validators.next_epoch_validators.len(); assert_eq!(old_committee_size + 1, new_committee_size); - let mut checkpoint_processes = vec![]; + fast_forward_to_ready_for_reconfig_start(&nodes).await; + + // Start epoch change and halt all validators. + for node in &nodes { + node.active().start_epoch_change().await.unwrap(); + } + + fast_forward_to_ready_for_reconfig_finish(&nodes).await; + + let results: Vec<_> = nodes + .iter() + .map(|node| async { + node.active().finish_epoch_change().await.unwrap(); + }) + .collect(); + + futures::future::join_all(results).await; + + // refresh the system state and network addresses + let sui_system_state = states[0].get_sui_system_state_object().await.unwrap(); + assert_eq!(sui_system_state.epoch, 1); + // We should now have one more active validator. + assert_eq!(sui_system_state.validators.active_validators.len(), 5); +} + +#[tokio::test(flavor = "current_thread")] +async fn reconfig_last_checkpoint_sync_missing_tx() { + telemetry_subscribers::init_for_testing(); + + let mut configs = test_authority_configs(); + for c in configs.validator_configs.iter_mut() { + // Turn off checkpoint process so that we can have fine control over it in the test. + c.enable_checkpoint = false; + } + let validator_info = configs.validator_set(); + let mut gas_objects = test_gas_objects(); + let mut states = Vec::new(); + let mut nodes = Vec::new(); + for validator in configs.validator_configs() { + let node = SuiNode::start(validator).await.unwrap(); + let state = node.state(); + + for gas in gas_objects.clone() { + state.insert_genesis_object(gas).await; + } + states.push(state); + nodes.push(node); + } + + fast_forward_to_ready_for_reconfig_start(&nodes).await; + + let (sender, key_pair) = test_account_keys().pop().unwrap(); + let object_ref = gas_objects.pop().unwrap().compute_object_reference(); + let transaction = make_transfer_sui_transaction( + object_ref, + SuiAddress::random_for_testing_only(), + None, + sender, + &key_pair, + ); + // Only send the transaction to validator 0, but not other validators. + // Since gossip is disabled by default, validator 1-3 will not see it. + submit_single_owner_transaction(transaction, &validator_info[0..1]).await; + tokio::time::sleep(Duration::from_secs(10)).await; + for (idx, validator) in validator_info.iter().enumerate() { + // Check that the object is mutated on validator 0 only. + assert_eq!( + get_object(validator, object_ref.0).await.version(), + SequenceNumber::from(if idx == 0 { 1 } else { 0 }) + ); + } + + // Start epoch change and halt all validators. for node in &nodes { + node.active().start_epoch_change().await.unwrap(); + } + + // Create a proposal on validator 0, which ensures that the transaction above will be included + // in the checkpoint. + nodes[0] + .state() + .checkpoints + .as_ref() + .unwrap() + .lock() + .set_proposal(0) + .unwrap(); + let mut checkpoint_processes = vec![]; + // Only validator 1 and 2 will participate the checkpoint progress, which will use fragments + // involving validator 0, 1, 2. Since validator 1 and 2 don't have the above transaction + // executed, they will actively sync and execute it. This exercises the code path where we can + // execute a transaction from a pending checkpoint even when validator is halted. + for node in &nodes[1..3] { let active = node.active().clone(); let handle = tokio::spawn(async move { while !active @@ -95,7 +184,7 @@ async fn reconfig_end_to_end_tests() { .as_ref() .unwrap() .lock() - .is_ready_to_start_epoch_change() + .is_ready_to_finish_epoch_change() { let _ = checkpoint_process_step(&active, &CheckpointProcessControl::default()).await; @@ -106,44 +195,24 @@ async fn reconfig_end_to_end_tests() { // Wait for all validators to be ready for epoch change. join_all(checkpoint_processes).await; - let results: Vec<_> = nodes - .iter() - .map(|node| async { - let active = node.active().clone(); - active.start_epoch_change().await.unwrap(); - while !active - .state - .checkpoints - .as_ref() - .unwrap() - .lock() - .is_ready_to_finish_epoch_change() - { - let _ = - checkpoint_process_step(&active, &CheckpointProcessControl::default()).await; - } - }) - .collect(); - - join_all(results).await; - - let results: Vec<_> = nodes - .iter() - .map(|node| async { - node.active().finish_epoch_change().await.unwrap(); - }) - .collect(); - - futures::future::join_all(results).await; - - // refresh the system state and network addresses - let sui_system_state = states[0].get_sui_system_state_object().await.unwrap(); - assert_eq!(sui_system_state.epoch, 1); - // We should now have one more active validator. - assert_eq!(sui_system_state.validators.active_validators.len(), 5); + // Now that we have a new checkpoint cert formed for the last checkpoint, check that + // validator 3 is able to also sync and execute the above transaction and finish epoch change. + // This exercises the code path where a validator can execute transactions from a checkpoint + // cert even when the validator is halted. + while !nodes[3] + .state() + .checkpoints + .as_ref() + .unwrap() + .lock() + .is_ready_to_finish_epoch_change() + { + let _ = + checkpoint_process_step(nodes[3].active(), &CheckpointProcessControl::default()).await; + } } -pub async fn create_and_register_new_validator( +async fn create_and_register_new_validator( framework_pkg: ObjectRef, gas_objects: &mut Vec, validator_stake: ObjectRef, @@ -198,21 +267,48 @@ pub fn get_new_validator() -> (ValidatorInfo, AuthoritySignature) { ) } -#[allow(dead_code)] -pub async fn get_latest_ref(authority: &SafeClient, object_id: ObjectID) -> ObjectRef -where - A: AuthorityAPI + Send + Sync + Clone + 'static, -{ - if let Ok(ObjectInfoResponse { - requested_object_reference: Some(object_ref), - .. - }) = authority - .handle_object_info_request(ObjectInfoRequest::latest_object_info_request( - object_id, None, - )) - .await - { - return object_ref; +async fn fast_forward_to_ready_for_reconfig_start(nodes: &[SuiNode]) { + let mut checkpoint_processes = vec![]; + for node in nodes { + let active = node.active().clone(); + let handle = tokio::spawn(async move { + while !active + .state + .checkpoints + .as_ref() + .unwrap() + .lock() + .is_ready_to_start_epoch_change() + { + let _ = + checkpoint_process_step(&active, &CheckpointProcessControl::default()).await; + } + }); + checkpoint_processes.push(handle); + } + // Wait for all validators to be ready for epoch change. + join_all(checkpoint_processes).await; +} + +async fn fast_forward_to_ready_for_reconfig_finish(nodes: &[SuiNode]) { + let mut checkpoint_processes = vec![]; + for node in nodes { + let active = node.active().clone(); + let handle = tokio::spawn(async move { + while !active + .state + .checkpoints + .as_ref() + .unwrap() + .lock() + .is_ready_to_finish_epoch_change() + { + let _ = + checkpoint_process_step(&active, &CheckpointProcessControl::default()).await; + } + }); + checkpoint_processes.push(handle); } - panic!("Object not found!"); + // Wait for all validators to be ready for epoch change. + join_all(checkpoint_processes).await; } diff --git a/crates/test-utils/src/authority.rs b/crates/test-utils/src/authority.rs index bf71c8a9377fc..a51ea25e6dbb7 100644 --- a/crates/test-utils/src/authority.rs +++ b/crates/test-utils/src/authority.rs @@ -6,7 +6,7 @@ use std::collections::BTreeMap; use std::sync::Arc; use std::time::Duration; use sui_config::{NetworkConfig, NodeConfig, ValidatorInfo}; -use sui_core::authority_client::NetworkAuthorityClientMetrics; +use sui_core::authority_client::{AuthorityAPI, NetworkAuthorityClientMetrics}; use sui_core::epoch::epoch_store::EpochStore; use sui_core::{ authority_active::{ @@ -20,6 +20,8 @@ use sui_core::{ use sui_types::{committee::Committee, object::Object}; pub use sui_node::SuiNode; +use sui_types::base_types::ObjectID; +use sui_types::messages::{ObjectInfoRequest, ObjectInfoRequestKind}; /// The default network buffer size of a test authority. pub const NETWORK_BUFFER_SIZE: usize = 65_000; @@ -138,3 +140,16 @@ pub fn get_client(config: &ValidatorInfo) -> NetworkAuthorityClient { ) .unwrap() } + +pub async fn get_object(config: &ValidatorInfo, object_id: ObjectID) -> Object { + get_client(config) + .handle_object_info_request(ObjectInfoRequest { + object_id, + request_kind: ObjectInfoRequestKind::LatestObjectInfo(None), + }) + .await + .unwrap() + .object() + .unwrap() + .clone() +}