diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index 21571e8eb5e..d3b639bd4e4 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -3981,13 +3981,15 @@ impl Chain { })?; // we can't use hash from the current block here yet because the incoming receipts // for this block is not stored yet - let mut receipts = collect_receipts(incoming_receipts.get(&shard_id).unwrap()); - let receipt_proof_response = &self.store().get_incoming_receipts_for_shard( + let new_receipts = collect_receipts(incoming_receipts.get(&shard_id).unwrap()); + let old_receipts = &self.store().get_incoming_receipts_for_shard( shard_id, *prev_hash, prev_chunk_height_included, )?; - receipts.extend(collect_receipts_from_response(receipt_proof_response)); + let old_receipts = collect_receipts_from_response(old_receipts); + let receipts = [new_receipts, old_receipts].concat(); + let chunk = self.get_chunk_clone_from_header(&chunk_header.clone())?; let transactions = chunk.transactions(); diff --git a/chain/chain/src/store.rs b/chain/chain/src/store.rs index 123e4a4a78a..9c63e1b11cd 100644 --- a/chain/chain/src/store.rs +++ b/chain/chain/src/store.rs @@ -233,12 +233,33 @@ pub trait ChainStoreAccess { break; } - let prev_hash = *header.prev_hash(); + let receipts = self.get_incoming_receipts(&block_hash, shard_id); + match receipts { + Ok(receipt_proofs) => { + tracing::debug!( + ?shard_id, + ?last_chunk_height_included, + ?block_hash, + "get_incoming_receipts_for_shard found receipts from block with missing chunk" + ); + ret.push(ReceiptProofResponse(block_hash, receipt_proofs)); + } + Err(err) => { + tracing::debug!( + ?shard_id, + ?last_chunk_height_included, + ?block_hash, + ?err, + "get_incoming_receipts_for_shard could not find receipts from block with missing chunk" + ); - if let Ok(receipt_proofs) = self.get_incoming_receipts(&block_hash, shard_id) { - ret.push(ReceiptProofResponse(block_hash, receipt_proofs)); - } else { - ret.push(ReceiptProofResponse(block_hash, Arc::new(vec![]))); + // This can happen when all chunks are missing in a block + // and then we can safely assume that there aren't any + // incoming receipts. It would be nicer to explicitly check + // that condition rather than relying on errors when reading + // from the db. + ret.push(ReceiptProofResponse(block_hash, Arc::new(vec![]))); + } } // TODO(resharding) @@ -246,7 +267,7 @@ pub trait ChainStoreAccess { // layout is different and handle that // one idea would be to do shard_id := parent(shard_id) but remember to // deduplicate the receipts as well - block_hash = prev_hash; + block_hash = *header.prev_hash(); } Ok(ret) @@ -1237,7 +1258,7 @@ impl ChainStoreAccess for ChainStore { &self.incoming_receipts, &get_block_shard_id(block_hash, shard_id), ), - format_args!("INCOMING RECEIPT: {}", block_hash), + format_args!("INCOMING RECEIPT: {} {}", block_hash, shard_id), ) } diff --git a/chain/client/src/client.rs b/chain/client/src/client.rs index c8a3dd6f5c1..f69ad35f9bc 100644 --- a/chain/client/src/client.rs +++ b/chain/client/src/client.rs @@ -2014,7 +2014,7 @@ impl Client { .validate_tx(gas_price, None, tx, true, &epoch_id, protocol_version) .expect("no storage errors") { - debug!(target: "client", "Invalid tx during basic validation: {:?}", err); + debug!(target: "client", tx=?tx.get_hash(), "Invalid tx during basic validation: {:?}", err); return Ok(ProcessTxResponse::InvalidTx(err)); } @@ -2024,6 +2024,8 @@ impl Client { self.shard_tracker.care_about_shard(me, &head.last_block_hash, shard_id, true); let will_care_about_shard = self.shard_tracker.will_care_about_shard(me, &head.last_block_hash, shard_id, true); + // TODO(resharding) will_care_about_shard should be called with the + // account shard id from the next epoch, in case shard layout changes if care_about_shard || will_care_about_shard { let shard_uid = self.epoch_manager.shard_id_to_uid(shard_id, &epoch_id)?; let state_root = match self.chain.get_chunk_extra(&head.last_block_hash, &shard_uid) { @@ -2053,17 +2055,17 @@ impl Client { if me.is_some() { match self.sharded_tx_pool.insert_transaction(shard_id, tx.clone()) { InsertTransactionResult::Success => { - trace!(target: "client", shard_id, "Recorded a transaction."); + trace!(target: "client", shard_id, tx=?tx.get_hash(), "Recorded a transaction."); } InsertTransactionResult::Duplicate => { - trace!(target: "client", shard_id, "Duplicate transaction, not forwarding it."); + trace!(target: "client", shard_id, tx=?tx.get_hash(), "Duplicate transaction, not forwarding it."); return Ok(ProcessTxResponse::ValidTx); } InsertTransactionResult::NoSpaceLeft => { if is_forwarded { - trace!(target: "client", shard_id, "Transaction pool is full, dropping the transaction."); + trace!(target: "client", shard_id, tx=?tx.get_hash(), "Transaction pool is full, dropping the transaction."); } else { - trace!(target: "client", shard_id, "Transaction pool is full, trying to forward the transaction."); + trace!(target: "client", shard_id, tx=?tx.get_hash(), "Transaction pool is full, trying to forward the transaction."); } } } @@ -2075,7 +2077,7 @@ impl Client { // forward to current epoch validators, // possibly forward to next epoch validators if self.active_validator(shard_id)? { - trace!(target: "client", account = ?me, shard_id, is_forwarded, "Recording a transaction."); + trace!(target: "client", account = ?me, shard_id, tx=?tx.get_hash(), is_forwarded, "Recording a transaction."); metrics::TRANSACTION_RECEIVED_VALIDATOR.inc(); if !is_forwarded { @@ -2083,12 +2085,12 @@ impl Client { } Ok(ProcessTxResponse::ValidTx) } else if !is_forwarded { - trace!(target: "client", shard_id, "Forwarding a transaction."); + trace!(target: "client", shard_id, tx=?tx.get_hash(), "Forwarding a transaction."); metrics::TRANSACTION_RECEIVED_NON_VALIDATOR.inc(); self.forward_tx(&epoch_id, tx)?; Ok(ProcessTxResponse::RequestRouted) } else { - trace!(target: "client", shard_id, "Non-validator received a forwarded transaction, dropping it."); + trace!(target: "client", shard_id, tx=?tx.get_hash(), "Non-validator received a forwarded transaction, dropping it."); metrics::TRANSACTION_RECEIVED_NON_VALIDATOR_FORWARDED.inc(); Ok(ProcessTxResponse::NoResponse) } diff --git a/core/primitives/src/epoch_manager.rs b/core/primitives/src/epoch_manager.rs index f12672f7cd6..5f7175a0d89 100644 --- a/core/primitives/src/epoch_manager.rs +++ b/core/primitives/src/epoch_manager.rs @@ -906,26 +906,10 @@ pub mod epoch_info { } Self::V3(v3) => { let protocol_version = self.protocol_version(); - let seed = - if checked_feature!( - "stable", - SynchronizeBlockChunkProduction, - protocol_version - ) && !checked_feature!("stable", ChunkOnlyProducers, protocol_version) - { - // This is same seed that used for determining block producer - Self::block_produce_seed(height, &v3.rng_seed) - } else { - // 32 bytes from epoch_seed, 8 bytes from height, 8 bytes from shard_id - let mut buffer = [0u8; 48]; - buffer[0..32].copy_from_slice(&v3.rng_seed); - buffer[32..40].copy_from_slice(&height.to_le_bytes()); - buffer[40..48].copy_from_slice(&shard_id.to_le_bytes()); - hash(&buffer).0 - }; + let seed = Self::chunk_produce_seed(protocol_version, v3, height, shard_id); let shard_id = shard_id as usize; - v3.chunk_producers_settlement[shard_id] - [v3.chunk_producers_sampler[shard_id].sample(seed)] + let sample = v3.chunk_producers_sampler[shard_id].sample(seed); + v3.chunk_producers_settlement[shard_id][sample] } } } @@ -937,6 +921,30 @@ pub mod epoch_info { buffer[32..40].copy_from_slice(&height.to_le_bytes()); hash(&buffer).0 } + + fn chunk_produce_seed( + protocol_version: ProtocolVersion, + epoch_info_v3: &EpochInfoV3, + height: BlockHeight, + shard_id: ShardId, + ) -> [u8; 32] { + if checked_feature!("stable", SynchronizeBlockChunkProduction, protocol_version) + && !checked_feature!("stable", ChunkOnlyProducers, protocol_version) + { + // This is same seed that used for determining block + // producer. This seed does not contain the shard id + // so all shards will be produced by the same + // validator. + Self::block_produce_seed(height, &epoch_info_v3.rng_seed) + } else { + // 32 bytes from epoch_seed, 8 bytes from height, 8 bytes from shard_id + let mut buffer = [0u8; 48]; + buffer[0..32].copy_from_slice(&epoch_info_v3.rng_seed); + buffer[32..40].copy_from_slice(&height.to_le_bytes()); + buffer[40..48].copy_from_slice(&shard_id.to_le_bytes()); + hash(&buffer).0 + } + } } #[derive(BorshSerialize, BorshDeserialize)] diff --git a/integration-tests/src/tests/client/sharding_upgrade.rs b/integration-tests/src/tests/client/sharding_upgrade.rs index 0ecb406466c..27f31fd994a 100644 --- a/integration-tests/src/tests/client/sharding_upgrade.rs +++ b/integration-tests/src/tests/client/sharding_upgrade.rs @@ -13,7 +13,7 @@ use near_client::test_utils::{run_catchup, TestEnv}; use near_crypto::{InMemorySigner, KeyType, Signer}; use near_o11y::testonly::init_test_logger; use near_primitives::account::id::AccountId; -use near_primitives::block::Block; +use near_primitives::block::{Block, Tip}; use near_primitives::hash::CryptoHash; use near_primitives::serialize::to_base64; use near_primitives::shard_layout::{account_id_to_shard_id, account_id_to_shard_uid}; @@ -57,6 +57,20 @@ enum ReshardingType { V2, } +fn get_target_protocol_version(resharding_type: &ReshardingType) -> ProtocolVersion { + match resharding_type { + ReshardingType::V1 => SIMPLE_NIGHTSHADE_PROTOCOL_VERSION, + ReshardingType::V2 => SIMPLE_NIGHTSHADE_V2_PROTOCOL_VERSION, + } +} + +fn get_genesis_protocol_version(resharding_type: &ReshardingType) -> ProtocolVersion { + match resharding_type { + ReshardingType::V1 => SIMPLE_NIGHTSHADE_PROTOCOL_VERSION - 1, + ReshardingType::V2 => SIMPLE_NIGHTSHADE_V2_PROTOCOL_VERSION - 1, + } +} + // Return the expected number of shards. fn get_expected_shards_num( epoch_length: u64, @@ -65,14 +79,14 @@ fn get_expected_shards_num( ) -> u64 { match resharding_type { ReshardingType::V1 => { - if height < 2 * epoch_length { + if height <= 2 * epoch_length { return 1; } else { return 4; } } ReshardingType::V2 => { - if height < 2 * epoch_length { + if height <= 2 * epoch_length { return 4; } else { return 5; @@ -98,6 +112,62 @@ struct TestShardUpgradeEnv { num_clients: usize, } +/// The condition that determines if a chunk should be produced of dropped. +/// Can be probabilistic, predefined based on height and shard id or both. +struct DropChunkCondition { + probability: f64, + by_height_shard_id: HashSet<(BlockHeight, ShardId)>, +} + +impl DropChunkCondition { + fn should_drop_chunk( + &self, + rng: &mut rand::rngs::ThreadRng, + height: BlockHeight, + shard_ids: &Vec, + ) -> bool { + if rng.gen_bool(self.probability) { + return true; + } + + // One chunk producer may be responsible for producing multiple chunks. + // Ensure that the by_height_shard_id is consistent in that it doesn't + // try to produce one chunk and drop another chunk, produced by the same + // chunk producer. + // It shouldn't happen in setups where num_validators >= num_shards. + let mut all_true = true; + let mut all_false = true; + for shard_id in shard_ids { + match self.by_height_shard_id.contains(&(height, *shard_id)) { + true => all_false = false, + false => all_true = false, + } + } + if all_false { + return false; + } + if all_true { + return true; + } + + tracing::warn!("Inconsistent test setup. Chunk producer configured to produce one of its chunks and to skip another. This is not supported, skipping all. height {} shard_ids {:?}", height, shard_ids); + return true; + } + + /// Returns a DropChunkCondition that doesn't drop any chunks. + fn new() -> Self { + Self { probability: 0.0, by_height_shard_id: HashSet::new() } + } + + fn with_probability(probability: f64) -> Self { + Self { probability, by_height_shard_id: HashSet::new() } + } + + fn with_by_height_shard_id(by_height_shard_id: HashSet<(u64, ShardId)>) -> Self { + Self { probability: 0.0, by_height_shard_id: by_height_shard_id } + } +} + impl TestShardUpgradeEnv { fn new( epoch_length: u64, @@ -154,8 +224,12 @@ impl TestShardUpgradeEnv { /// also checks that all accounts in initial_accounts are intact /// /// please also see the step_impl for changing the protocol version - fn step(&mut self, p_drop_chunk: f64) { - self.step_impl(p_drop_chunk, SIMPLE_NIGHTSHADE_PROTOCOL_VERSION, &ReshardingType::V1); + fn step(&mut self, drop_chunk_condition: &DropChunkCondition) { + self.step_impl( + &drop_chunk_condition, + SIMPLE_NIGHTSHADE_PROTOCOL_VERSION, + &ReshardingType::V1, + ); } /// produces and processes the next block also checks that all accounts in @@ -166,21 +240,21 @@ impl TestShardUpgradeEnv { /// layout V2 to be used once the appropriate protocol version is reached fn step_impl( &mut self, - p_drop_chunk: f64, + drop_chunk_condition: &DropChunkCondition, protocol_version: ProtocolVersion, resharding_type: &ReshardingType, ) { let env = &mut self.env; let mut rng = thread_rng(); let head = env.clients[0].chain.head().unwrap(); - let height = head.height + 1; + let next_height = head.height + 1; let expected_num_shards = - get_expected_shards_num(self.epoch_length, height, resharding_type); + get_expected_shards_num(self.epoch_length, next_height, resharding_type); - tracing::debug!(target: "test", height, expected_num_shards, "step"); + tracing::debug!(target: "test", next_height, expected_num_shards, "step"); // add transactions for the next block - if height == 1 { + if next_height == 1 { for tx in self.init_txs.iter() { Self::process_tx(env, tx); } @@ -190,7 +264,7 @@ impl TestShardUpgradeEnv { // (inside env.process_block) // Therefore, if we want a transaction to be included at the block at `height+1`, we must add // it when we are producing the block at `height` - if let Some(txs) = self.txs_by_height.get(&(height + 1)) { + if let Some(txs) = self.txs_by_height.get(&(next_height + 1)) { for tx in txs { Self::process_tx(env, tx); } @@ -198,44 +272,54 @@ impl TestShardUpgradeEnv { // produce block let block = { - let client = &env.clients[0]; - let epoch_id = - client.epoch_manager.get_epoch_id_from_prev_block(&head.last_block_hash).unwrap(); - let block_producer = - client.epoch_manager.get_block_producer(&epoch_id, height).unwrap(); + let block_producer = get_block_producer(env, &head); let _span = tracing::debug_span!(target: "test", "", client=?block_producer).entered(); let block_producer_client = env.client(&block_producer); - let mut block = block_producer_client.produce_block(height).unwrap().unwrap(); + let mut block = block_producer_client.produce_block(next_height).unwrap().unwrap(); set_block_protocol_version(&mut block, block_producer.clone(), protocol_version); block }; + // Mapping from the chunk producer account id to the list of chunks that + // it should produce. When processing block at height `next_height` the + // chunk producers will produce chunks at height `next_height + 1`. + let mut chunk_producer_to_shard_id: HashMap> = HashMap::new(); + for shard_id in 0..block.chunks().len() { + let shard_id = shard_id as ShardId; + let validator_id = get_chunk_producer(env, &block, shard_id); + chunk_producer_to_shard_id.entry(validator_id).or_default().push(shard_id); + } + // Make sure that catchup is done before the end of each epoch, but when it is done is // by chance. This simulates when catchup takes a long time to be done // Note: if the catchup happens only at the last block of an epoch then // client will fail to produce the chunks in the first block of the next epoch. - let should_catchup = rng.gen_bool(P_CATCHUP) || height % self.epoch_length == 0; + let should_catchup = rng.gen_bool(P_CATCHUP) || next_height % self.epoch_length == 0; // process block, this also triggers chunk producers for the next block to produce chunks for j in 0..self.num_clients { let client = &mut env.clients[j]; let _span = tracing::debug_span!(target: "test", "process block", client=j).entered(); - let produce_chunks = !rng.gen_bool(p_drop_chunk); + + let shard_ids = chunk_producer_to_shard_id + .get(client.validator_signer.as_ref().unwrap().validator_id()) + .cloned() + .unwrap_or_default(); + let should_produce_chunk = + !drop_chunk_condition.should_drop_chunk(&mut rng, next_height + 1, &shard_ids); + tracing::info!(target: "test", ?next_height, ?should_produce_chunk, ?shard_ids, "should produce chunk"); + // Here we don't just call self.clients[i].process_block_sync_with_produce_chunk_options // because we want to call run_catchup before finish processing this block. This simulates // that catchup and block processing run in parallel. - client - .start_process_block( - MaybeValidated::from(block.clone()), - Provenance::NONE, - Arc::new(|_| {}), - ) - .unwrap(); + let block = MaybeValidated::from(block.clone()); + client.start_process_block(block, Provenance::NONE, Arc::new(|_| {})).unwrap(); if should_catchup { run_catchup(client, &[]).unwrap(); } while wait_for_all_blocks_in_processing(&mut client.chain) { - let (_, errors) = client.postprocess_ready_blocks(Arc::new(|_| {}), produce_chunks); + let (_, errors) = + client.postprocess_ready_blocks(Arc::new(|_| {}), should_produce_chunk); assert!(errors.is_empty(), "unexpected errors: {:?}", errors); } if should_catchup { @@ -246,7 +330,7 @@ impl TestShardUpgradeEnv { { let num_shards = env.clients[0] .epoch_manager - .get_shard_layout_from_prev_block(block.hash()) + .get_shard_layout_from_prev_block(block.header().prev_hash()) .unwrap() .num_shards(); assert_eq!(num_shards, expected_num_shards); @@ -369,7 +453,7 @@ impl TestShardUpgradeEnv { fn check_tx_outcomes( &mut self, allow_not_started: bool, - skip_heights: Vec, + skip_heights: Vec, ) -> Vec { tracing::debug!(target: "test", "checking tx outcomes"); let env = &mut self.env; @@ -524,10 +608,32 @@ impl TestShardUpgradeEnv { } } +// Returns the block producer for the next block after the current head. +fn get_block_producer(env: &TestEnv, head: &Tip) -> AccountId { + let client = &env.clients[0]; + let epoch_manager = &client.epoch_manager; + let parent_hash = &head.last_block_hash; + let epoch_id = epoch_manager.get_epoch_id_from_prev_block(parent_hash).unwrap(); + let height = head.height + 1; + let block_producer = epoch_manager.get_block_producer(&epoch_id, height).unwrap(); + block_producer +} + +// Returns the chunk producer for the next chunk after the given block. +fn get_chunk_producer(env: &TestEnv, block: &Block, shard_id: ShardId) -> AccountId { + let client = &env.clients[0]; + let epoch_manager = &client.epoch_manager; + let parent_hash = block.header().prev_hash(); + let epoch_id = epoch_manager.get_epoch_id_from_prev_block(parent_hash).unwrap(); + let height = block.header().height() + 1; + let chunk_producer = epoch_manager.get_chunk_producer(&epoch_id, height, shard_id).unwrap(); + chunk_producer +} + fn check_outgoing_receipts_reassigned_impl( client: &Client, - shard_id: u64, - last_height_included: u64, + shard_id: ShardId, + last_height_included: BlockHeight, resharding_type: &ReshardingType, ) { let chain = &client.chain; @@ -654,6 +760,52 @@ fn setup_genesis( genesis } +fn generate_create_accounts_txs( + mut rng: &mut rand::rngs::ThreadRng, + genesis_hash: CryptoHash, + initial_accounts: &Vec, + accounts_to_check: &mut Vec, + all_accounts: &mut HashSet, + nonce: &mut u64, + max_size: usize, + check_accounts: bool, +) -> Vec { + let size = rng.gen_range(0..max_size) + 1; + std::iter::repeat_with(|| loop { + let signer_account = initial_accounts.choose(&mut rng).unwrap(); + let signer0 = InMemorySigner::from_seed( + signer_account.clone(), + KeyType::ED25519, + &signer_account.to_string(), + ); + let account_id = gen_account(&mut rng, b"abcdefghijkmn"); + if all_accounts.insert(account_id.clone()) { + let signer = InMemorySigner::from_seed( + account_id.clone(), + KeyType::ED25519, + account_id.as_ref(), + ); + let tx = SignedTransaction::create_account( + *nonce, + signer_account.clone(), + account_id.clone(), + NEAR_BASE, + signer.public_key(), + &signer0, + genesis_hash, + ); + if check_accounts { + accounts_to_check.push(account_id.clone()); + } + *nonce += 1; + tracing::trace!(target: "test", ?account_id, tx=?tx.get_hash(), "adding create account tx"); + return tx; + } + }) + .take(size) + .collect() +} + fn test_shard_layout_upgrade_simple_impl(resharding_type: ReshardingType) { init_test_logger(); tracing::info!(target: "test", "test_shard_layout_upgrade_simple_impl starting"); @@ -702,8 +854,9 @@ fn test_shard_layout_upgrade_simple_impl(resharding_type: ReshardingType) { test_env.set_tx_at_height(height, txs); } + let drop_chunk_condition = DropChunkCondition::new(); for _ in 1..4 * epoch_length { - test_env.step_impl(0., target_protocol_version, &resharding_type); + test_env.step_impl(&drop_chunk_condition, target_protocol_version, &resharding_type); test_env.check_receipt_id_to_shard_id(); } @@ -714,20 +867,6 @@ fn test_shard_layout_upgrade_simple_impl(resharding_type: ReshardingType) { tracing::info!(target: "test", "test_shard_layout_upgrade_simple_impl finished"); } -fn get_target_protocol_version(resharding_type: &ReshardingType) -> u32 { - match resharding_type { - ReshardingType::V1 => SIMPLE_NIGHTSHADE_PROTOCOL_VERSION, - ReshardingType::V2 => SIMPLE_NIGHTSHADE_V2_PROTOCOL_VERSION, - } -} - -fn get_genesis_protocol_version(resharding_type: &ReshardingType) -> u32 { - match resharding_type { - ReshardingType::V1 => SIMPLE_NIGHTSHADE_PROTOCOL_VERSION - 1, - ReshardingType::V2 => SIMPLE_NIGHTSHADE_V2_PROTOCOL_VERSION - 1, - } -} - #[test] fn test_shard_layout_upgrade_simple_v1() { test_shard_layout_upgrade_simple_impl(ReshardingType::V1); @@ -739,52 +878,6 @@ fn test_shard_layout_upgrade_simple_v2() { test_shard_layout_upgrade_simple_impl(ReshardingType::V2); } -fn generate_create_accounts_txs( - mut rng: &mut rand::rngs::ThreadRng, - genesis_hash: CryptoHash, - initial_accounts: &Vec, - accounts_to_check: &mut Vec, - all_accounts: &mut HashSet, - nonce: &mut u64, - max_size: usize, - check_accounts: bool, -) -> Vec { - let size = rng.gen_range(0..max_size) + 1; - std::iter::repeat_with(|| loop { - let signer_account = initial_accounts.choose(&mut rng).unwrap(); - let signer0 = InMemorySigner::from_seed( - signer_account.clone(), - KeyType::ED25519, - &signer_account.to_string(), - ); - let account_id = gen_account(&mut rng, b"abcdefghijkmn"); - if all_accounts.insert(account_id.clone()) { - let signer = InMemorySigner::from_seed( - account_id.clone(), - KeyType::ED25519, - account_id.as_ref(), - ); - let tx = SignedTransaction::create_account( - *nonce, - signer_account.clone(), - account_id.clone(), - NEAR_BASE, - signer.public_key(), - &signer0, - genesis_hash, - ); - if check_accounts { - accounts_to_check.push(account_id.clone()); - } - *nonce += 1; - tracing::trace!(target: "test", ?account_id, tx=?tx.get_hash(), "adding create account tx"); - return tx; - } - }) - .take(size) - .collect() -} - const GAS_1: u64 = 300_000_000_000_000; const GAS_2: u64 = GAS_1 / 3; @@ -924,6 +1017,7 @@ fn setup_test_env_with_cross_contract_txs( &genesis_hash, ); new_accounts.insert(tx.get_hash(), account_id); + tracing::trace!(target: "test", ?tx, "generated tx"); return tx; } }) @@ -968,8 +1062,9 @@ fn test_shard_layout_upgrade_cross_contract_calls_impl(resharding_type: Reshardi let (mut test_env, new_accounts) = setup_test_env_with_cross_contract_txs(epoch_length, genesis_protocol_version); + let drop_chunk_condition = DropChunkCondition::new(); for _ in 1..5 * epoch_length { - test_env.step_impl(0., target_protocol_version, &resharding_type); + test_env.step_impl(&drop_chunk_condition, target_protocol_version, &resharding_type); test_env.check_receipt_id_to_shard_id(); } @@ -997,6 +1092,69 @@ fn test_shard_layout_upgrade_cross_contract_calls_v2() { test_shard_layout_upgrade_cross_contract_calls_impl(ReshardingType::V2); } +fn test_shard_layout_upgrade_incoming_receipts_impl(resharding_type: ReshardingType) { + init_test_logger(); + + // setup + let epoch_length = 5; + let genesis_protocol_version = get_genesis_protocol_version(&resharding_type); + let target_protocol_version = get_target_protocol_version(&resharding_type); + + let (mut test_env, new_accounts) = + setup_test_env_with_cross_contract_txs(epoch_length, genesis_protocol_version); + + // Drop one of the chunks in the last block before switching to the new + // shard layout. + let drop_height = 2 * epoch_length; + let old_shard_num = get_expected_shards_num(epoch_length, drop_height, &resharding_type); + let new_shard_num = get_expected_shards_num(epoch_length, drop_height + 1, &resharding_type); + assert_ne!(old_shard_num, new_shard_num); + + // Drop the chunk from the shard with the highest shard id since it's the + // one that is split in both V1 and V2 reshardings. + let drop_shard_id = old_shard_num - 1; + + let by_height_shard_id = HashSet::from([(drop_height, drop_shard_id)]); + let drop_chunk_condition = DropChunkCondition::with_by_height_shard_id(by_height_shard_id); + for _ in 1..5 * epoch_length { + test_env.step_impl(&drop_chunk_condition, target_protocol_version, &resharding_type); + test_env.check_receipt_id_to_shard_id(); + } + + // TODO(resharding) get rid of skip_heights + // - 2 * epoch_length is skipped because we miss a chunk in that block and + // we lose the transaction pool during resharding. fix that + let skip_heights = vec![2 * epoch_length, 2 * epoch_length + 1]; + let successful_txs = test_env.check_tx_outcomes(false, skip_heights); + let new_accounts = + successful_txs.iter().flat_map(|tx_hash| new_accounts.get(tx_hash)).collect(); + + test_env.check_accounts(new_accounts); + test_env.check_split_states_artifacts(); +} + +// This test doesn't make much sense for the V1 resharding. That is because in +// V1 resharding there is only one shard before resharding. Even if that chunk +// is missing there aren't any other chunks so there aren't any incoming +// receipts at all. +#[test] +fn test_shard_layout_upgrade_incoming_receipts_impl_v1() { + test_shard_layout_upgrade_incoming_receipts_impl(ReshardingType::V1); +} + +// TODO(resharding) This test is currently broken because handling of incoming +// receipts doesn't work for the V2 resharding. It should be fixed and the test +// can be enabled then. +// TODO(resharding) Add another test like this but drop more chunks and at +// random. The _missing_chunks tests below test only the case when all chunks +// are missing in block but can likely be adjusted for this case. +#[cfg(feature = "protocol_feature_simple_nightshade_v2")] +#[ignore] +#[test] +fn test_shard_layout_upgrade_incoming_receipts_impl_v2() { + test_shard_layout_upgrade_incoming_receipts_impl(ReshardingType::V2); +} + // Test cross contract calls // This test case tests when there are missing chunks in the produced blocks // This is to test that all the chunk management logic in sharding split is correct @@ -1013,12 +1171,14 @@ fn test_shard_layout_upgrade_missing_chunks(p_missing: f64) { // randomly dropping chunks at the first few epochs when sharding splits happens // make sure initial txs (deploy smart contracts) are processed succesfully + let drop_chunk_condition = DropChunkCondition::new(); for _ in 1..3 { - test_env.step(0.); + test_env.step(&drop_chunk_condition); } + let drop_chunk_condition = DropChunkCondition::with_probability(p_missing); for _ in 3..3 * epoch_length { - test_env.step(p_missing); + test_env.step(&drop_chunk_condition); let last_height = test_env.env.clients[0].chain.head().unwrap().height; for height in last_height - 3..=last_height { test_env.check_next_block_with_new_chunk(height); @@ -1027,8 +1187,9 @@ fn test_shard_layout_upgrade_missing_chunks(p_missing: f64) { } // make sure all included transactions finished processing + let drop_chunk_condition = DropChunkCondition::new(); for _ in 3 * epoch_length..5 * epoch_length { - test_env.step(0.); + test_env.step(&drop_chunk_condition); let last_height = test_env.env.clients[0].chain.head().unwrap().height; for height in last_height - 3..=last_height { test_env.check_next_block_with_new_chunk(height); @@ -1058,3 +1219,5 @@ fn test_shard_layout_upgrade_missing_chunks_mid_missing_prob() { fn test_shard_layout_upgrade_missing_chunks_high_missing_prob() { test_shard_layout_upgrade_missing_chunks(0.9); } + +// TODO(resharding) add a test with missing blocks