Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: implemented a test for handling incoming receipts during resharding #9467

Merged
merged 8 commits into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3981,13 +3981,15 @@ impl Chain {
})?;
// we can't use hash from the current block here yet because the incoming receipts
// for this block is not stored yet
let mut receipts = collect_receipts(incoming_receipts.get(&shard_id).unwrap());
let receipt_proof_response = &self.store().get_incoming_receipts_for_shard(
let new_receipts = collect_receipts(incoming_receipts.get(&shard_id).unwrap());
let old_receipts = &self.store().get_incoming_receipts_for_shard(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure why reassigning old_receipts is needed here, maybe let's just do

let old_receipts = collect_receipts_from_response(&self.store().get_incoming_receipts_for_shard(...));

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed, just personal preference for not having too long lines

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so far I haven't seen that pattern in our code, so I suggest to keep it consistent and not introduce that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is idiomatic rust and personally I'm using it a lot so there is at least some of it :)
I don't think it has any disavantages and it does make code more readable. Some other usages are to change the type of a variable e.g. when parsing or removing mutability after it's no longer needed.

let number = "123";
let number = parse(number); // changes the type of number to return type of parse e.g. u32

let mut x = 0;
for ... { x += 1; }
let x = x; 

shard_id,
*prev_hash,
prev_chunk_height_included,
)?;
receipts.extend(collect_receipts_from_response(receipt_proof_response));
let old_receipts = collect_receipts_from_response(old_receipts);
let receipts = [new_receipts, old_receipts].concat();

let chunk = self.get_chunk_clone_from_header(&chunk_header.clone())?;

let transactions = chunk.transactions();
Expand Down
35 changes: 28 additions & 7 deletions chain/chain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,20 +233,41 @@ pub trait ChainStoreAccess {
break;
}

let prev_hash = *header.prev_hash();
let receipts = self.get_incoming_receipts(&block_hash, shard_id);
match receipts {
Ok(receipt_proofs) => {
tracing::debug!(
?shard_id,
?last_chunk_height_included,
?block_hash,
"get_incoming_receipts_for_shard found receipts from block with missing chunk"
);
ret.push(ReceiptProofResponse(block_hash, receipt_proofs));
}
Err(err) => {
tracing::debug!(
?shard_id,
?last_chunk_height_included,
?block_hash,
?err,
"get_incoming_receipts_for_shard could not find receipts from block with missing chunk"
);

if let Ok(receipt_proofs) = self.get_incoming_receipts(&block_hash, shard_id) {
ret.push(ReceiptProofResponse(block_hash, receipt_proofs));
} else {
ret.push(ReceiptProofResponse(block_hash, Arc::new(vec![])));
// This can happen when all chunks are missing in a block
// and then we can safely assume that there aren't any
// incoming receipts. It would be nicer to explicitly check
// that condition rather than relying on errors when reading
// from the db.
ret.push(ReceiptProofResponse(block_hash, Arc::new(vec![])));
}
}

// TODO(resharding)
// when crossing the epoch boundary we should check if the shard
// layout is different and handle that
// one idea would be to do shard_id := parent(shard_id) but remember to
// deduplicate the receipts as well
block_hash = prev_hash;
block_hash = *header.prev_hash();
}

Ok(ret)
Expand Down Expand Up @@ -1237,7 +1258,7 @@ impl ChainStoreAccess for ChainStore {
&self.incoming_receipts,
&get_block_shard_id(block_hash, shard_id),
),
format_args!("INCOMING RECEIPT: {}", block_hash),
format_args!("INCOMING RECEIPT: {} {}", block_hash, shard_id),
)
}

Expand Down
18 changes: 10 additions & 8 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2014,7 +2014,7 @@ impl Client {
.validate_tx(gas_price, None, tx, true, &epoch_id, protocol_version)
.expect("no storage errors")
{
debug!(target: "client", "Invalid tx during basic validation: {:?}", err);
debug!(target: "client", tx=?tx.get_hash(), "Invalid tx during basic validation: {:?}", err);
return Ok(ProcessTxResponse::InvalidTx(err));
}

Expand All @@ -2024,6 +2024,8 @@ impl Client {
self.shard_tracker.care_about_shard(me, &head.last_block_hash, shard_id, true);
let will_care_about_shard =
self.shard_tracker.will_care_about_shard(me, &head.last_block_hash, shard_id, true);
// TODO(resharding) will_care_about_shard should be called with the
// account shard id from the next epoch, in case shard layout changes
if care_about_shard || will_care_about_shard {
let shard_uid = self.epoch_manager.shard_id_to_uid(shard_id, &epoch_id)?;
let state_root = match self.chain.get_chunk_extra(&head.last_block_hash, &shard_uid) {
Expand Down Expand Up @@ -2053,17 +2055,17 @@ impl Client {
if me.is_some() {
match self.sharded_tx_pool.insert_transaction(shard_id, tx.clone()) {
InsertTransactionResult::Success => {
trace!(target: "client", shard_id, "Recorded a transaction.");
trace!(target: "client", shard_id, tx=?tx.get_hash(), "Recorded a transaction.");
}
InsertTransactionResult::Duplicate => {
trace!(target: "client", shard_id, "Duplicate transaction, not forwarding it.");
trace!(target: "client", shard_id, tx=?tx.get_hash(), "Duplicate transaction, not forwarding it.");
return Ok(ProcessTxResponse::ValidTx);
}
InsertTransactionResult::NoSpaceLeft => {
if is_forwarded {
trace!(target: "client", shard_id, "Transaction pool is full, dropping the transaction.");
trace!(target: "client", shard_id, tx=?tx.get_hash(), "Transaction pool is full, dropping the transaction.");
} else {
trace!(target: "client", shard_id, "Transaction pool is full, trying to forward the transaction.");
trace!(target: "client", shard_id, tx=?tx.get_hash(), "Transaction pool is full, trying to forward the transaction.");
}
}
}
Expand All @@ -2075,20 +2077,20 @@ impl Client {
// forward to current epoch validators,
// possibly forward to next epoch validators
if self.active_validator(shard_id)? {
trace!(target: "client", account = ?me, shard_id, is_forwarded, "Recording a transaction.");
trace!(target: "client", account = ?me, shard_id, tx=?tx.get_hash(), is_forwarded, "Recording a transaction.");
metrics::TRANSACTION_RECEIVED_VALIDATOR.inc();

if !is_forwarded {
self.possibly_forward_tx_to_next_epoch(tx)?;
}
Ok(ProcessTxResponse::ValidTx)
} else if !is_forwarded {
trace!(target: "client", shard_id, "Forwarding a transaction.");
trace!(target: "client", shard_id, tx=?tx.get_hash(), "Forwarding a transaction.");
metrics::TRANSACTION_RECEIVED_NON_VALIDATOR.inc();
self.forward_tx(&epoch_id, tx)?;
Ok(ProcessTxResponse::RequestRouted)
} else {
trace!(target: "client", shard_id, "Non-validator received a forwarded transaction, dropping it.");
trace!(target: "client", shard_id, tx=?tx.get_hash(), "Non-validator received a forwarded transaction, dropping it.");
metrics::TRANSACTION_RECEIVED_NON_VALIDATOR_FORWARDED.inc();
Ok(ProcessTxResponse::NoResponse)
}
Expand Down
46 changes: 27 additions & 19 deletions core/primitives/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -906,26 +906,10 @@ pub mod epoch_info {
}
Self::V3(v3) => {
let protocol_version = self.protocol_version();
let seed =
if checked_feature!(
"stable",
SynchronizeBlockChunkProduction,
protocol_version
) && !checked_feature!("stable", ChunkOnlyProducers, protocol_version)
{
// This is same seed that used for determining block producer
Self::block_produce_seed(height, &v3.rng_seed)
} else {
// 32 bytes from epoch_seed, 8 bytes from height, 8 bytes from shard_id
let mut buffer = [0u8; 48];
buffer[0..32].copy_from_slice(&v3.rng_seed);
buffer[32..40].copy_from_slice(&height.to_le_bytes());
buffer[40..48].copy_from_slice(&shard_id.to_le_bytes());
hash(&buffer).0
};
let seed = Self::chunk_produce_seed(protocol_version, height, v3, shard_id);
let shard_id = shard_id as usize;
v3.chunk_producers_settlement[shard_id]
[v3.chunk_producers_sampler[shard_id].sample(seed)]
let sample = v3.chunk_producers_sampler[shard_id].sample(seed);
v3.chunk_producers_settlement[shard_id][sample]
}
}
}
Expand All @@ -937,6 +921,30 @@ pub mod epoch_info {
buffer[32..40].copy_from_slice(&height.to_le_bytes());
hash(&buffer).0
}

fn chunk_produce_seed(
protocol_version: u32,
wacban marked this conversation as resolved.
Show resolved Hide resolved
height: u64,
wacban marked this conversation as resolved.
Show resolved Hide resolved
epoch_info_v3: &EpochInfoV3,
shard_id: u64,
wacban marked this conversation as resolved.
Show resolved Hide resolved
) -> [u8; 32] {
if checked_feature!("stable", SynchronizeBlockChunkProduction, protocol_version)
&& !checked_feature!("stable", ChunkOnlyProducers, protocol_version)
{
// This is same seed that used for determining block
// producer. This seed does not contain the shard id
// so all shards will be produced by the same
// validator.
Self::block_produce_seed(height, &epoch_info_v3.rng_seed)
} else {
// 32 bytes from epoch_seed, 8 bytes from height, 8 bytes from shard_id
let mut buffer = [0u8; 48];
buffer[0..32].copy_from_slice(&epoch_info_v3.rng_seed);
buffer[32..40].copy_from_slice(&height.to_le_bytes());
buffer[40..48].copy_from_slice(&shard_id.to_le_bytes());
hash(&buffer).0
}
}
}

#[derive(BorshSerialize, BorshDeserialize)]
Expand Down
Loading