From 79100a5687881eef489f347629bf5b19e417cc8e Mon Sep 17 00:00:00 2001 From: MW Tian Date: Wed, 30 Nov 2022 11:05:38 -0800 Subject: [PATCH] Revert "Check for lock existence before attempting tx execution. (#6129)" This reverts commit 616d2284a793f78d09672f7f83713f9bd8e1f731. --- crates/sui-benchmark/tests/simtest.rs | 13 +-- crates/sui-core/src/authority.rs | 8 -- .../sui-core/src/authority/authority_store.rs | 97 ++----------------- .../authority_active/execution_driver/mod.rs | 2 +- .../src/authority_active/gossip/tests.rs | 2 - crates/sui-core/src/transaction_manager.rs | 9 +- .../src/unit_tests/authority_tests.rs | 10 +- crates/sui-storage/src/lock_service.rs | 39 -------- crates/test-utils/src/network.rs | 24 ++--- 9 files changed, 27 insertions(+), 177 deletions(-) diff --git a/crates/sui-benchmark/tests/simtest.rs b/crates/sui-benchmark/tests/simtest.rs index a55822510b439..dcf505ebb8b65 100644 --- a/crates/sui-benchmark/tests/simtest.rs +++ b/crates/sui-benchmark/tests/simtest.rs @@ -51,11 +51,7 @@ mod test { #[sim_test(config = "test_config()")] async fn test_simulated_load() { - let test_cluster = init_cluster_builder_env_aware() - .with_num_validators(get_var("SIM_STRESS_TEST_NUM_VALIDATORS", 4)) - .build() - .await - .unwrap(); + let test_cluster = init_cluster_builder_env_aware().build().await.unwrap(); let swarm = &test_cluster.swarm; let context = &test_cluster.wallet; let sender = test_cluster.get_address_0(); @@ -89,12 +85,7 @@ mod test { ); for w in workloads.iter_mut() { - w.workload - .init( - get_var("SIM_STRESS_TEST_NUM_SHARED_OBJECTS", 5), - proxy.clone(), - ) - .await; + w.workload.init(5, proxy.clone()).await; } let driver = BenchDriver::new(5); diff --git a/crates/sui-core/src/authority.rs b/crates/sui-core/src/authority.rs index 8b95bf9d396b2..fbb308ef7d62f 100644 --- a/crates/sui-core/src/authority.rs +++ b/crates/sui-core/src/authority.rs @@ -770,11 +770,6 @@ impl AuthorityState { .tap_err(|e| debug!(?tx_digest, "process_certificate failed: {e}")) } - #[instrument(level = "trace", skip_all)] - async fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> SuiResult { - self.database.check_owned_locks(owned_object_refs).await - } - #[instrument(level = "trace", skip_all)] async fn check_shared_locks( &self, @@ -1046,9 +1041,6 @@ impl AuthorityState { let (gas_status, input_objects) = transaction_input_checker::check_certificate_input(&self.database, certificate).await?; - let owned_object_refs = input_objects.filter_owned_objects(); - self.check_owned_locks(&owned_object_refs).await?; - // At this point we need to check if any shared objects need locks, // and whether they have them. let shared_object_refs = input_objects.filter_shared_objects(); diff --git a/crates/sui-core/src/authority/authority_store.rs b/crates/sui-core/src/authority/authority_store.rs index ae4ade40245ce..0b16cb22fffeb 100644 --- a/crates/sui-core/src/authority/authority_store.rs +++ b/crates/sui-core/src/authority/authority_store.rs @@ -302,7 +302,7 @@ impl Deserialize<'de>> SuiDataStore { .get(&ObjectKey(*object_id, version))?) } - pub fn object_version_exists( + pub fn object_exists( &self, object_id: &ObjectID, version: VersionNumber, @@ -356,7 +356,7 @@ impl Deserialize<'de>> SuiDataStore { /// When making changes, please see if check_sequenced_input_objects() below needs /// similar changes as well. - pub async fn get_missing_input_objects( + pub fn get_missing_input_objects( &self, digest: &TransactionDigest, objects: &[InputObjectKind], @@ -364,7 +364,6 @@ impl Deserialize<'de>> SuiDataStore { let shared_locks_cell: OnceCell> = OnceCell::new(); let mut missing = Vec::new(); - let mut probe_lock_exists = Vec::new(); for kind in objects { match kind { InputObjectKind::SharedMoveObject { id, .. } => { @@ -375,7 +374,7 @@ impl Deserialize<'de>> SuiDataStore { })?; match shared_locks.get(id) { Some(version) => { - if !self.object_version_exists(id, *version)? { + if !self.object_exists(id, *version)? { // When this happens, other transactions that use smaller versions of // this shared object haven't finished execution. missing.push(ObjectKey(*id, *version)); @@ -388,37 +387,19 @@ impl Deserialize<'de>> SuiDataStore { }; } InputObjectKind::MovePackage(id) => { - if !self.object_version_exists(id, PACKAGE_VERSION)? { + if !self.object_exists(id, PACKAGE_VERSION)? { // The cert cannot have been formed if immutable inputs were missing. missing.push(ObjectKey(*id, PACKAGE_VERSION)); } } InputObjectKind::ImmOrOwnedMoveObject(objref) => { - if let Some(obj) = self.get_object_by_key(&objref.0, objref.1)? { - if !obj.is_immutable() { - probe_lock_exists.push(*objref); - } - } else { + if !self.object_exists(&objref.0, objref.1)? { missing.push(ObjectKey::from(objref)); } } }; } - if !probe_lock_exists.is_empty() { - // It is possible that we probed the objects after they are written, but before the - // locks are created. In that case, if we attempt to execute the transaction, it will - // fail. Because the objects_committed() call is made only after the locks are written, - // the tx manager will be awoken after the locks are written. - missing.extend( - self.lock_service - .get_missing_locks(probe_lock_exists) - .await? - .into_iter() - .map(ObjectKey::from), - ); - } - Ok(missing) } @@ -606,12 +587,6 @@ impl Deserialize<'de>> SuiDataStore { .map_err(SuiError::from) } - pub async fn check_owned_locks(&self, owned_object_refs: &[ObjectRef]) -> SuiResult { - self.lock_service - .locks_exist(owned_object_refs.into()) - .await - } - /// Read a lock for a specific (transaction, shared object) pair. pub fn all_shared_locks( &self, @@ -789,11 +764,6 @@ impl Deserialize<'de>> SuiDataStore { self.effects_notify_read .notify(transaction_digest, effects.data()); - // Cleanup the lock of the shared objects. This must be done after we write effects, as - // effects_exists is used as the guard to avoid re-locking objects for a previously - // executed tx. remove_shared_objects_locks. - self.remove_shared_objects_locks(transaction_digest, certificate)?; - Ok(seq) } @@ -1123,7 +1093,7 @@ impl Deserialize<'de>> SuiDataStore { /// 2. Latest parent_sync entries for each mutated object are deleted. /// 3. All new object states are deleted. /// 4. owner_index table change is reverted. - pub async fn revert_state_update(&self, tx_digest: &TransactionDigest) -> SuiResult { + pub fn revert_state_update(&self, tx_digest: &TransactionDigest) -> SuiResult { let effects = self.get_effects(tx_digest)?; let mut write_batch = self.perpetual_tables.certificates.batch(); write_batch = @@ -1180,33 +1150,21 @@ impl Deserialize<'de>> SuiDataStore { .expect("version revert should never fail"), ) }); - let (old_objects, old_locks): (Vec<_>, Vec<_>) = self + let old_objects = self .perpetual_tables .objects .multi_get(mutated_objects)? .into_iter() .map(|obj_opt| { let obj = obj_opt.expect("Older object version not found"); - let obj_ref = obj.compute_object_reference(); - let lock = if obj.is_address_owned() { - Some(obj_ref) - } else { - None - }; ( - ((obj.owner, obj.id()), ObjectInfo::new(&obj_ref, &obj)), - lock, + (obj.owner, obj.id()), + ObjectInfo::new(&obj.compute_object_reference(), &obj), ) - }) - .unzip(); - - let old_locks: Vec<_> = old_locks.into_iter().flatten().collect(); - + }); write_batch = write_batch.insert_batch(&self.perpetual_tables.owner_index, old_objects)?; write_batch.write()?; - - self.lock_service.initialize_locks(&old_locks, true).await?; Ok(()) } @@ -1228,41 +1186,6 @@ impl Deserialize<'de>> SuiDataStore { self.perpetual_tables.get_latest_parent_entry(object_id) } - pub fn object_exists(&self, object_id: ObjectID) -> SuiResult { - match self.get_latest_parent_entry(object_id)? { - None => Ok(false), - Some(entry) => Ok(entry.0 .2.is_alive()), - } - } - - /// Remove the shared objects locks. - pub fn remove_shared_objects_locks( - &self, - transaction_digest: &TransactionDigest, - transaction: &VerifiedCertificate, - ) -> SuiResult { - let mut sequenced_to_delete = Vec::new(); - let mut schedule_to_delete = Vec::new(); - for (object_id, _) in transaction.shared_input_objects() { - sequenced_to_delete.push((*transaction_digest, *object_id)); - - if !self.object_exists(*object_id)? { - schedule_to_delete.push(*object_id); - } - } - let mut write_batch = self.epoch_tables().assigned_object_versions.batch(); - write_batch = write_batch.delete_batch( - &self.epoch_tables().assigned_object_versions, - sequenced_to_delete, - )?; - write_batch = write_batch.delete_batch( - &self.epoch_tables().next_object_versions, - schedule_to_delete, - )?; - write_batch.write()?; - Ok(()) - } - /// Lock a sequence number for the shared objects of the input transaction based on the effects /// of that transaction. Used by the nodes, which don't listen to consensus. pub fn acquire_shared_locks_from_effects( diff --git a/crates/sui-core/src/authority_active/execution_driver/mod.rs b/crates/sui-core/src/authority_active/execution_driver/mod.rs index 37fc485e65d7f..f97e09cb2fa78 100644 --- a/crates/sui-core/src/authority_active/execution_driver/mod.rs +++ b/crates/sui-core/src/authority_active/execution_driver/mod.rs @@ -124,7 +124,7 @@ where } // Assume only transient failure can happen. Permanent failure is probably // a bug. There would be nothing that can be done for permanent failures. - error!(tx_digest=?digest, "Failed to execute certified transaction! attempt {attempts}, {e}"); + warn!(tx_digest=?digest, "Failed to execute certified transaction! attempt {attempts}, {e}"); sleep(EXECUTION_FAILURE_RETRY_INTERVAL).await; } else { break; diff --git a/crates/sui-core/src/authority_active/gossip/tests.rs b/crates/sui-core/src/authority_active/gossip/tests.rs index bf76235ebcbfd..3303d2581d420 100644 --- a/crates/sui-core/src/authority_active/gossip/tests.rs +++ b/crates/sui-core/src/authority_active/gossip/tests.rs @@ -90,7 +90,6 @@ pub async fn test_gossip_after_revert() { state .database .revert_state_update(&digests[0].transaction) - .await .unwrap(); break; } @@ -100,7 +99,6 @@ pub async fn test_gossip_after_revert() { state .database .revert_state_update(&digests[1].transaction) - .await .unwrap(); } } diff --git a/crates/sui-core/src/transaction_manager.rs b/crates/sui-core/src/transaction_manager.rs index 0fe1fabfae50d..8f40a6c18c81e 100644 --- a/crates/sui-core/src/transaction_manager.rs +++ b/crates/sui-core/src/transaction_manager.rs @@ -8,7 +8,7 @@ use std::{ use sui_types::{base_types::TransactionDigest, error::SuiResult, messages::VerifiedCertificate}; use tokio::sync::mpsc::UnboundedSender; -use tracing::{debug, error}; +use tracing::error; use crate::authority::{authority_store::ObjectKey, AuthorityMetrics, AuthorityStore}; @@ -80,15 +80,11 @@ impl TransactionManager { let missing = self .authority_store .get_missing_input_objects(&digest, &cert.data().data.input_objects()?) - .await .expect("Are shared object locks set prior to enqueueing certificates?"); if missing.is_empty() { - debug!(tx_digest = ?digest, "certificate ready"); self.certificate_ready(cert); continue; - } else { - debug!(tx_digest = ?digest, ?missing, "certificate waiting on missing objects"); } for obj_key in missing { @@ -146,10 +142,7 @@ impl TransactionManager { continue; } }; - debug!(tx_digest = ?digest, "certificate ready"); self.certificate_ready(cert); - } else { - debug!(tx_digest = ?digest, missing = ?set, "certificate waiting on missing"); } } self.metrics diff --git a/crates/sui-core/src/unit_tests/authority_tests.rs b/crates/sui-core/src/unit_tests/authority_tests.rs index 0c8658723640d..1e7b8a31d1dee 100644 --- a/crates/sui-core/src/unit_tests/authority_tests.rs +++ b/crates/sui-core/src/unit_tests/authority_tests.rs @@ -1896,7 +1896,7 @@ async fn test_store_revert_transfer_sui() { .unwrap(); let db = &authority_state.database; - db.revert_state_update(&tx_digest).await.unwrap(); + db.revert_state_update(&tx_digest).unwrap(); assert_eq!( db.get_object(&gas_object_id).unwrap().unwrap().owner, @@ -1975,7 +1975,7 @@ async fn test_store_revert_wrap_move_call() { let wrapper_v0 = wrap_effects.created[0].0; let db = &authority_state.database; - db.revert_state_update(&wrap_digest).await.unwrap(); + db.revert_state_update(&wrap_digest).unwrap(); // The wrapped object is unwrapped once again (accessible from storage). let object = db.get_object(&object_v0.0).unwrap().unwrap(); @@ -2062,7 +2062,7 @@ async fn test_store_revert_unwrap_move_call() { let db = &authority_state.database; - db.revert_state_update(&unwrap_digest).await.unwrap(); + db.revert_state_update(&unwrap_digest).unwrap(); // The unwrapped object is wrapped once again assert!(db.get_object(&object_v0.0).unwrap().is_none()); @@ -2159,7 +2159,7 @@ async fn test_store_revert_add_ofield() { assert_eq!(inner.version(), inner_v1.1); assert_eq!(inner.owner, Owner::ObjectOwner(field_v0.0.into())); - db.revert_state_update(&add_digest).await.unwrap(); + db.revert_state_update(&add_digest).unwrap(); let outer = db.get_object(&outer_v0.0).unwrap().unwrap(); assert_eq!(outer.version(), outer_v0.1); @@ -2265,7 +2265,7 @@ async fn test_store_revert_remove_ofield() { assert_eq!(inner.owner, Owner::AddressOwner(sender)); assert_eq!(inner.version(), inner_v2.1); - db.revert_state_update(&remove_ofield_digest).await.unwrap(); + db.revert_state_update(&remove_ofield_digest).unwrap(); let outer = db.get_object(&outer_v0.0).unwrap().unwrap(); assert_eq!(outer.version(), outer_v1.1); diff --git a/crates/sui-storage/src/lock_service.rs b/crates/sui-storage/src/lock_service.rs index 5fbde5bf9b8a0..8ae2187eee049 100644 --- a/crates/sui-storage/src/lock_service.rs +++ b/crates/sui-storage/src/lock_service.rs @@ -79,10 +79,6 @@ enum LockServiceQueries { objects: Vec, resp: oneshot::Sender, }, - GetMissingLocks { - objects: Vec, - resp: oneshot::Sender>>, - }, GetTxSequence { tx: TransactionDigest, resp: oneshot::Sender, SuiError>>, @@ -219,17 +215,6 @@ impl LockServiceImpl { ) } - fn get_missing_locks(&self, objects: &[ObjectRef]) -> SuiResult> { - let locks = self.transaction_lock.multi_get(objects)?; - let mut missing = Vec::new(); - for (lock, obj_ref) in locks.into_iter().zip(objects) { - if lock.is_none() { - missing.push(*obj_ref); - } - } - Ok(missing) - } - /// Checks multiple object locks exist. /// Returns SuiError::ObjectNotFound if cannot find lock record for at least one of the objects. /// Returns SuiError::ObjectVersionUnavailableForConsumption if at least one object lock is not initialized @@ -563,11 +548,6 @@ impl LockServiceImpl { warn!("Could not respond to sender, sender dropped!"); } } - LockServiceQueries::GetMissingLocks { objects, resp } => { - if let Err(_e) = resp.send(self.get_missing_locks(&objects)) { - warn!("Could not respond to sender, sender dropped!"); - } - } LockServiceQueries::GetTxSequence { tx, resp } => { if let Err(_e) = resp.send(self.get_tx_sequence(tx)) { warn!("Could not respond to sender, sender dropped!"); @@ -836,25 +816,6 @@ impl LockService { }) .await } - - /// Returns any locks from the input list that are still missing. - pub async fn get_missing_locks(&self, objects: Vec) -> SuiResult> { - block_on_future_in_sim(async move { - let (os_sender, os_receiver) = oneshot::channel::>>(); - self.inner - .query_sender() - .send(LockServiceQueries::GetMissingLocks { - objects, - resp: os_sender, - }) - .await - .expect("Could not send message to inner LockService"); - os_receiver - .await - .expect("Response from lockservice was cancelled, should not happen!") - }) - .await - } } #[cfg(test)] diff --git a/crates/test-utils/src/network.rs b/crates/test-utils/src/network.rs index 589e5ed7dca41..b09f2364dea77 100644 --- a/crates/test-utils/src/network.rs +++ b/crates/test-utils/src/network.rs @@ -97,7 +97,6 @@ pub struct TestClusterBuilder { fullnode_rpc_port: Option, fullnode_ws_port: Option, do_not_build_fullnode: bool, - num_validators: Option, } impl TestClusterBuilder { @@ -107,7 +106,6 @@ impl TestClusterBuilder { fullnode_rpc_port: None, fullnode_ws_port: None, do_not_build_fullnode: false, - num_validators: None, } } @@ -131,11 +129,6 @@ impl TestClusterBuilder { self } - pub fn with_num_validators(mut self, num: usize) -> Self { - self.num_validators = Some(num); - self - } - pub async fn build(self) -> anyhow::Result { let cluster = self.start_test_network_with_customized_ports().await?; #[cfg(msim)] @@ -148,9 +141,7 @@ impl TestClusterBuilder { Ok(cluster) } - async fn start_test_network_with_customized_ports( - mut self, - ) -> Result { + async fn start_test_network_with_customized_ports(self) -> Result { // Where does wallet client connect to? // 1. `start_test_swarm_with_fullnodes` init the wallet to use an embedded // Gateway. If `use_embedded_gateway` is true, the config remains intact. @@ -159,7 +150,7 @@ impl TestClusterBuilder { // 3. Otherwise, the wallet connects to Fullnode rpc server, unless // `do_not_build_fullnode` is false, in which case the wallet is connected // with the initial embedded Gateway. - let swarm = self.start_test_swarm_with_fullnodes().await?; + let swarm = Self::start_test_swarm_with_fullnodes(self.genesis_config).await?; let working_dir = swarm.dir(); let mut wallet_conf: SuiClientConfig = @@ -204,12 +195,13 @@ impl TestClusterBuilder { } /// Start a Swarm and set up WalletConfig with an embedded Gateway - async fn start_test_swarm_with_fullnodes(&mut self) -> Result { - let mut builder: SwarmBuilder = Swarm::builder().committee_size( - NonZeroUsize::new(self.num_validators.unwrap_or(NUM_VALIDAOTR)).unwrap(), - ); + async fn start_test_swarm_with_fullnodes( + genesis_config: Option, + ) -> Result { + let mut builder: SwarmBuilder = + Swarm::builder().committee_size(NonZeroUsize::new(NUM_VALIDAOTR).unwrap()); - if let Some(genesis_config) = self.genesis_config.take() { + if let Some(genesis_config) = genesis_config { builder = builder.initial_accounts_config(genesis_config); }