Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adding cross_contract_calls integration test for the V1->V2 resharding #9402

Merged
merged 15 commits into from
Aug 17, 2023
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,4 @@ rusty-tags.vi
# Estimator generated files
costs-*.txt
names-to-stats.txt
data_dump_*.bin
data_dump_*.bin
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we should not be changing this file

2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 10 additions & 2 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2711,8 +2711,16 @@ impl Chain {
parent_hash: &CryptoHash,
shard_id: ShardId,
) -> bool {
let will_shard_layout_change =
epoch_manager.will_shard_layout_change(parent_hash).unwrap_or(false);
let result = epoch_manager.will_shard_layout_change(parent_hash);
let will_shard_layout_change = match result {
Ok(will_shard_layout_change) => will_shard_layout_change,
Err(err) => {
// TODO(resharding) This is a problem, if this happens the node
// will not perform resharding and fall behind the network.
wacban marked this conversation as resolved.
Show resolved Hide resolved
tracing::error!(target: "chain", ?err, "failed to check if shard layout will change");
false
}
};
// if shard layout will change the next epoch, we should catch up the shard regardless
// whether we already have the shard's state this epoch, because we need to generate
// new states for shards split from the current shard for the next epoch
Expand Down
2 changes: 2 additions & 0 deletions chain/chunks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ rand.workspace = true
reed-solomon-erasure.workspace = true
time.workspace = true
tracing.workspace = true
# itertools has collect_vec which is useful in quick debugging prints
itertools.workspace = true
wacban marked this conversation as resolved.
Show resolved Hide resolved

near-async.workspace = true
near-chain-configs.workspace = true
Expand Down
57 changes: 29 additions & 28 deletions chain/chunks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,17 +606,21 @@ impl ShardsManager {
},
);

if !mark_only {
let fetch_from_archival = chunk_needs_to_be_fetched_from_archival(
if mark_only {
wacban marked this conversation as resolved.
Show resolved Hide resolved
debug!(target: "chunks", height, shard_id, ?chunk_hash, "Marked the chunk as being requested but did not send the request yet.");
return;
}

let fetch_from_archival = chunk_needs_to_be_fetched_from_archival(
&ancestor_hash, &self.chain_header_head.last_block_hash,
self.epoch_manager.as_ref()).unwrap_or_else(|err| {
error!(target: "chunks", "Error during requesting partial encoded chunk. Cannot determine whether to request from an archival node, defaulting to not: {}", err);
false
});
let old_block = self.chain_header_head.last_block_hash != prev_block_hash
&& self.chain_header_head.prev_block_hash != prev_block_hash;
let old_block = self.chain_header_head.last_block_hash != prev_block_hash
&& self.chain_header_head.prev_block_hash != prev_block_hash;

let should_wait_for_chunk_forwarding =
let should_wait_for_chunk_forwarding =
self.should_wait_for_chunk_forwarding(&ancestor_hash, chunk_header.shard_id(), chunk_header.height_created()+1).unwrap_or_else(|_| {
// ancestor_hash must be accepted because we don't request missing chunks through this
// this function for orphans
Expand All @@ -625,30 +629,27 @@ impl ShardsManager {
false
});

// If chunks forwarding is enabled,
// we purposely do not send chunk request messages right away for new blocks. Such requests
// will eventually be sent because of the `resend_chunk_requests` loop. However,
// we want to give some time for any `PartialEncodedChunkForward` messages to arrive
// before we send requests.
if !should_wait_for_chunk_forwarding || fetch_from_archival || old_block {
debug!(target: "chunks", height, shard_id, ?chunk_hash, "Requesting.");
let request_result = self.request_partial_encoded_chunk(
height,
&ancestor_hash,
shard_id,
&chunk_hash,
false,
old_block,
fetch_from_archival,
);
if let Err(err) = request_result {
error!(target: "chunks", "Error during requesting partial encoded chunk: {}", err);
}
} else {
debug!(target: "chunks",should_wait_for_chunk_forwarding, fetch_from_archival, old_block, "Delaying the chunk request.");
// If chunks forwarding is enabled,
// we purposely do not send chunk request messages right away for new blocks. Such requests
// will eventually be sent because of the `resend_chunk_requests` loop. However,
// we want to give some time for any `PartialEncodedChunkForward` messages to arrive
// before we send requests.
if !should_wait_for_chunk_forwarding || fetch_from_archival || old_block {
debug!(target: "chunks", height, shard_id, ?chunk_hash, "Requesting.");
let request_result = self.request_partial_encoded_chunk(
height,
&ancestor_hash,
shard_id,
&chunk_hash,
false,
old_block,
fetch_from_archival,
);
if let Err(err) = request_result {
error!(target: "chunks", "Error during requesting partial encoded chunk: {}", err);
}
} else {
debug!(target: "chunks", height, shard_id, ?chunk_hash, "Marked the chunk as being requested but did not send the request yet.");
debug!(target: "chunks",should_wait_for_chunk_forwarding, fetch_from_archival, old_block, "Delaying the chunk request.");
}
}

Expand Down Expand Up @@ -1434,7 +1435,7 @@ impl ShardsManager {
}
}

// 2. Consider it valid; mergeparts and receipts included in the partial encoded chunk
// 2. Consider it valid; merge parts and receipts included in the partial encoded chunk
// into chunk cache
let new_part_ords =
self.encoded_chunks.merge_in_partial_encoded_chunk(partial_encoded_chunk);
Expand Down
1 change: 0 additions & 1 deletion chain/chunks/src/logic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ pub fn decode_encoded_chunk(
})
{
debug!(target: "chunks", "Reconstructed and decoded chunk {}, encoded length was {}, num txs: {}, I'm {:?}", chunk_hash.0, encoded_chunk.encoded_length(), shard_chunk.transactions().len(), me);

let partial_chunk = create_partial_chunk(
encoded_chunk,
merkle_paths,
Expand Down
60 changes: 45 additions & 15 deletions chain/client/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1874,21 +1874,48 @@ impl TestEnv {

pub fn process_partial_encoded_chunks(&mut self) {
let network_adapters = self.network_adapters.clone();
for network_adapter in network_adapters {
// process partial encoded chunks
while let Some(request) = network_adapter.pop() {
if let PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::PartialEncodedChunkMessage {
account_id,
partial_encoded_chunk,
},
) = request
{
self.shards_manager(&account_id).send(
ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunk(
PartialEncodedChunk::from(partial_encoded_chunk),
),
);

let mut keep_going = true;
while keep_going {
for network_adapter in network_adapters.iter() {
keep_going = false;
wacban marked this conversation as resolved.
Show resolved Hide resolved
// process partial encoded chunks
while let Some(request) = network_adapter.pop() {
// if there are any requests in any of the adapters reset
// keep going to true as processing of any message may
// trigger more messages to be processed in other clients
// it's a bit sad and it would be much nicer if all messages
// were forwarded to a single queue
// TODO would be nicer to first handle all PECs and then all PECFs
keep_going = true;
match request {
PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::PartialEncodedChunkMessage {
account_id,
partial_encoded_chunk,
},
) => {
let partial_encoded_chunk =
PartialEncodedChunk::from(partial_encoded_chunk);
let message =
ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunk(
partial_encoded_chunk,
);
self.shards_manager(&account_id).send(message);
}
PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::PartialEncodedChunkForward { account_id, forward },
) => {
let message =
ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunkForward(
forward,
);
self.shards_manager(&account_id).send(message);
}
_ => {
tracing::debug!(target: "test", ?request, "skipping unsupported request type");
}
}
}
}
}
Expand Down Expand Up @@ -1983,6 +2010,9 @@ impl TestEnv {
}

pub fn process_shards_manager_responses_and_finish_processing_blocks(&mut self, idx: usize) {
let _span =
tracing::debug_span!(target: "test", "process_shards_manager", client=idx).entered();

loop {
self.process_shards_manager_responses(idx);
if self.clients[idx].finish_blocks_in_processing().is_empty() {
Expand Down
11 changes: 6 additions & 5 deletions chain/epoch-manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ rand_hc.workspace = true
serde_json.workspace = true
smart-default.workspace = true
tracing.workspace = true
# itertools has collect_vec which is useful in quick debugging prints
itertools.workspace = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here: are you actually using itertools in the code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I usually use collect_vec when printing debug logs. How would you feel about leaving that in?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally speaking, I don't really like adding dependencies that aren't used in committed code. It's the kind of thing I personally might create a PR, just to remove a dead dependency. So at least a comment why it's in would be useful to prevent that sort of engineers canceling out each others "improvements".

But that said, itertools wouldn't be the worst to have pulled in for no reason, so I won't stop you from adding it if you hate writing collect::<Vec<_>> that much :D

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, thanks, I'll leave that in and add a comment.


near-crypto.workspace = true
near-primitives.workspace = true
Expand All @@ -28,7 +30,9 @@ near-cache.workspace = true

[features]
expensive_tests = []
protocol_feature_fix_staking_threshold = ["near-primitives/protocol_feature_fix_staking_threshold"]
protocol_feature_fix_staking_threshold = [
"near-primitives/protocol_feature_fix_staking_threshold",
]
nightly = [
"nightly_protocol",
"protocol_feature_fix_staking_threshold",
Expand All @@ -43,7 +47,4 @@ nightly_protocol = [
"near-store/nightly_protocol",
]
no_cache = []
new_epoch_sync = [
"near-store/new_epoch_sync",
"near-primitives/new_epoch_sync"
]
new_epoch_sync = ["near-store/new_epoch_sync", "near-primitives/new_epoch_sync"]
10 changes: 4 additions & 6 deletions chain/epoch-manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -555,19 +555,17 @@ impl EpochManager {
let next_version = if let Some((version, stake)) =
versions.into_iter().max_by_key(|&(_version, stake)| stake)
{
if stake
> (total_block_producer_stake
* *config.protocol_upgrade_stake_threshold.numer() as u128)
/ *config.protocol_upgrade_stake_threshold.denom() as u128
{
let numer = *config.protocol_upgrade_stake_threshold.numer() as u128;
let denom = *config.protocol_upgrade_stake_threshold.denom() as u128;
let threshold = total_block_producer_stake * numer / denom;
if stake > threshold {
version
} else {
protocol_version
}
} else {
protocol_version
};

// Gather slashed validators and add them to kick out first.
let slashed_validators = last_block_info.slashed();
for (account_id, _) in slashed_validators.iter() {
Expand Down
Loading