Skip to content

Commit

Permalink
feat: adding cross_contract_calls integration test for the V1->V2 res…
Browse files Browse the repository at this point in the history
…harding (#9402)

The goal of this PR is to refactor the existing cross_contract_calls integration test so that it can be reused for testing the new resharding and do that. I have encountered numerous issues when trying to adapt that test because enabling the new resharding also enables all of other protocol features since simple nightshade. The test wasn't quite ready for it. I managed to fix a few issues but not all. I'm leaving it unfinished for now as I suspect the current issue may be due to lack of flat storage support so it's better to wait for that. I marked the new test as #[ignore]. I would still like to merge in all of the refactoring, fixes and improvements I made so far and I will pick it up again once flat storage is supported in resharding. 

Some of the new features are:
- I refactored the cross_contract_calls so that it can handle either resharding type.
- I allowed not all clients to process all transactions but also route them. This was needed only before I set all clients to track all shards but is correct, handles both cases and it more future proof. 
 
Some of the challenges were:
- The test was failing with missing chunk error. This was because the "Partial Encoded Chunk Forward" messages were not correctly handled in the test infra. I fixed this issue in TestEnv::process_partial_encoded_chunks. There are two parts of the fix:
  - handle the forwards
  - keeping looping until all messages are processed - this is needed because one message can trigger sending of another message
- The protocol version was not getting upgraded and because of that the resharding was not getting triggered (or at least not deterministically where I wanted it to happen). This was because of unlucky block producer assignment where one block producer (out of 4) does not actually get to produce any blocks in the (rather short) first epoch. When that happens voting for the new protocol version is stuck at 3/4 = 75% votes which is lower than the default threshold. I lowered the threshold in the test to work around that. 
- The state sync process was getting triggered and failing because of lack of some event loop or other async framework setup. I fixed that by setting all nodes to track all shards. (not an issue in V0->V1 because when there is 1 shard everyone tracks it and apparently the node keep tracking the new shards as well).
  • Loading branch information
wacban authored and nikurt committed Aug 28, 2023
1 parent 60caf46 commit 26e6015
Show file tree
Hide file tree
Showing 10 changed files with 198 additions and 137 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,4 @@ rusty-tags.vi
# Estimator generated files
costs-*.txt
names-to-stats.txt
data_dump_*.bin
data_dump_*.bin
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 10 additions & 2 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2711,8 +2711,16 @@ impl Chain {
parent_hash: &CryptoHash,
shard_id: ShardId,
) -> bool {
let will_shard_layout_change =
epoch_manager.will_shard_layout_change(parent_hash).unwrap_or(false);
let result = epoch_manager.will_shard_layout_change(parent_hash);
let will_shard_layout_change = match result {
Ok(will_shard_layout_change) => will_shard_layout_change,
Err(err) => {
// TODO(resharding) This is a problem, if this happens the node
// will not perform resharding and fall behind the network.
tracing::error!(target: "chain", ?err, "failed to check if shard layout will change");
false
}
};
// if shard layout will change the next epoch, we should catch up the shard regardless
// whether we already have the shard's state this epoch, because we need to generate
// new states for shards split from the current shard for the next epoch
Expand Down
2 changes: 2 additions & 0 deletions chain/chunks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ rand.workspace = true
reed-solomon-erasure.workspace = true
time.workspace = true
tracing.workspace = true
# itertools has collect_vec which is useful in quick debugging prints
itertools.workspace = true

near-async.workspace = true
near-chain-configs.workspace = true
Expand Down
57 changes: 29 additions & 28 deletions chain/chunks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,17 +606,21 @@ impl ShardsManager {
},
);

if !mark_only {
let fetch_from_archival = chunk_needs_to_be_fetched_from_archival(
if mark_only {
debug!(target: "chunks", height, shard_id, ?chunk_hash, "Marked the chunk as being requested but did not send the request yet.");
return;
}

let fetch_from_archival = chunk_needs_to_be_fetched_from_archival(
&ancestor_hash, &self.chain_header_head.last_block_hash,
self.epoch_manager.as_ref()).unwrap_or_else(|err| {
error!(target: "chunks", "Error during requesting partial encoded chunk. Cannot determine whether to request from an archival node, defaulting to not: {}", err);
false
});
let old_block = self.chain_header_head.last_block_hash != prev_block_hash
&& self.chain_header_head.prev_block_hash != prev_block_hash;
let old_block = self.chain_header_head.last_block_hash != prev_block_hash
&& self.chain_header_head.prev_block_hash != prev_block_hash;

let should_wait_for_chunk_forwarding =
let should_wait_for_chunk_forwarding =
self.should_wait_for_chunk_forwarding(&ancestor_hash, chunk_header.shard_id(), chunk_header.height_created()+1).unwrap_or_else(|_| {
// ancestor_hash must be accepted because we don't request missing chunks through this
// this function for orphans
Expand All @@ -625,30 +629,27 @@ impl ShardsManager {
false
});

// If chunks forwarding is enabled,
// we purposely do not send chunk request messages right away for new blocks. Such requests
// will eventually be sent because of the `resend_chunk_requests` loop. However,
// we want to give some time for any `PartialEncodedChunkForward` messages to arrive
// before we send requests.
if !should_wait_for_chunk_forwarding || fetch_from_archival || old_block {
debug!(target: "chunks", height, shard_id, ?chunk_hash, "Requesting.");
let request_result = self.request_partial_encoded_chunk(
height,
&ancestor_hash,
shard_id,
&chunk_hash,
false,
old_block,
fetch_from_archival,
);
if let Err(err) = request_result {
error!(target: "chunks", "Error during requesting partial encoded chunk: {}", err);
}
} else {
debug!(target: "chunks",should_wait_for_chunk_forwarding, fetch_from_archival, old_block, "Delaying the chunk request.");
// If chunks forwarding is enabled,
// we purposely do not send chunk request messages right away for new blocks. Such requests
// will eventually be sent because of the `resend_chunk_requests` loop. However,
// we want to give some time for any `PartialEncodedChunkForward` messages to arrive
// before we send requests.
if !should_wait_for_chunk_forwarding || fetch_from_archival || old_block {
debug!(target: "chunks", height, shard_id, ?chunk_hash, "Requesting.");
let request_result = self.request_partial_encoded_chunk(
height,
&ancestor_hash,
shard_id,
&chunk_hash,
false,
old_block,
fetch_from_archival,
);
if let Err(err) = request_result {
error!(target: "chunks", "Error during requesting partial encoded chunk: {}", err);
}
} else {
debug!(target: "chunks", height, shard_id, ?chunk_hash, "Marked the chunk as being requested but did not send the request yet.");
debug!(target: "chunks",should_wait_for_chunk_forwarding, fetch_from_archival, old_block, "Delaying the chunk request.");
}
}

Expand Down Expand Up @@ -1434,7 +1435,7 @@ impl ShardsManager {
}
}

// 2. Consider it valid; mergeparts and receipts included in the partial encoded chunk
// 2. Consider it valid; merge parts and receipts included in the partial encoded chunk
// into chunk cache
let new_part_ords =
self.encoded_chunks.merge_in_partial_encoded_chunk(partial_encoded_chunk);
Expand Down
1 change: 0 additions & 1 deletion chain/chunks/src/logic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ pub fn decode_encoded_chunk(
})
{
debug!(target: "chunks", "Reconstructed and decoded chunk {}, encoded length was {}, num txs: {}, I'm {:?}", chunk_hash.0, encoded_chunk.encoded_length(), shard_chunk.transactions().len(), me);

let partial_chunk = create_partial_chunk(
encoded_chunk,
merkle_paths,
Expand Down
60 changes: 45 additions & 15 deletions chain/client/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1874,21 +1874,48 @@ impl TestEnv {

pub fn process_partial_encoded_chunks(&mut self) {
let network_adapters = self.network_adapters.clone();
for network_adapter in network_adapters {
// process partial encoded chunks
while let Some(request) = network_adapter.pop() {
if let PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::PartialEncodedChunkMessage {
account_id,
partial_encoded_chunk,
},
) = request
{
self.shards_manager(&account_id).send(
ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunk(
PartialEncodedChunk::from(partial_encoded_chunk),
),
);

let mut keep_going = true;
while keep_going {
for network_adapter in network_adapters.iter() {
keep_going = false;
// process partial encoded chunks
while let Some(request) = network_adapter.pop() {
// if there are any requests in any of the adapters reset
// keep going to true as processing of any message may
// trigger more messages to be processed in other clients
// it's a bit sad and it would be much nicer if all messages
// were forwarded to a single queue
// TODO would be nicer to first handle all PECs and then all PECFs
keep_going = true;
match request {
PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::PartialEncodedChunkMessage {
account_id,
partial_encoded_chunk,
},
) => {
let partial_encoded_chunk =
PartialEncodedChunk::from(partial_encoded_chunk);
let message =
ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunk(
partial_encoded_chunk,
);
self.shards_manager(&account_id).send(message);
}
PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::PartialEncodedChunkForward { account_id, forward },
) => {
let message =
ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunkForward(
forward,
);
self.shards_manager(&account_id).send(message);
}
_ => {
tracing::debug!(target: "test", ?request, "skipping unsupported request type");
}
}
}
}
}
Expand Down Expand Up @@ -1983,6 +2010,9 @@ impl TestEnv {
}

pub fn process_shards_manager_responses_and_finish_processing_blocks(&mut self, idx: usize) {
let _span =
tracing::debug_span!(target: "test", "process_shards_manager", client=idx).entered();

loop {
self.process_shards_manager_responses(idx);
if self.clients[idx].finish_blocks_in_processing().is_empty() {
Expand Down
11 changes: 6 additions & 5 deletions chain/epoch-manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ rand_hc.workspace = true
serde_json.workspace = true
smart-default.workspace = true
tracing.workspace = true
# itertools has collect_vec which is useful in quick debugging prints
itertools.workspace = true

near-crypto.workspace = true
near-primitives.workspace = true
Expand All @@ -28,7 +30,9 @@ near-cache.workspace = true

[features]
expensive_tests = []
protocol_feature_fix_staking_threshold = ["near-primitives/protocol_feature_fix_staking_threshold"]
protocol_feature_fix_staking_threshold = [
"near-primitives/protocol_feature_fix_staking_threshold",
]
nightly = [
"nightly_protocol",
"protocol_feature_fix_staking_threshold",
Expand All @@ -43,7 +47,4 @@ nightly_protocol = [
"near-store/nightly_protocol",
]
no_cache = []
new_epoch_sync = [
"near-store/new_epoch_sync",
"near-primitives/new_epoch_sync"
]
new_epoch_sync = ["near-store/new_epoch_sync", "near-primitives/new_epoch_sync"]
10 changes: 4 additions & 6 deletions chain/epoch-manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -555,19 +555,17 @@ impl EpochManager {
let next_version = if let Some((version, stake)) =
versions.into_iter().max_by_key(|&(_version, stake)| stake)
{
if stake
> (total_block_producer_stake
* *config.protocol_upgrade_stake_threshold.numer() as u128)
/ *config.protocol_upgrade_stake_threshold.denom() as u128
{
let numer = *config.protocol_upgrade_stake_threshold.numer() as u128;
let denom = *config.protocol_upgrade_stake_threshold.denom() as u128;
let threshold = total_block_producer_stake * numer / denom;
if stake > threshold {
version
} else {
protocol_version
}
} else {
protocol_version
};

// Gather slashed validators and add them to kick out first.
let slashed_validators = last_block_info.slashed();
for (account_id, _) in slashed_validators.iter() {
Expand Down
Loading

0 comments on commit 26e6015

Please sign in to comment.