Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(state sync): delete get_cached_state_parts #12197

Merged
merged 3 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 2 additions & 38 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ use crate::{
Provenance,
};
use crate::{metrics, DoomslugThresholdMode};
use borsh::BorshDeserialize;
use crossbeam_channel::{unbounded, Receiver, Sender};
use itertools::Itertools;
use lru::LruCache;
Expand Down Expand Up @@ -69,8 +68,8 @@ use near_primitives::sharding::{
};
use near_primitives::state_part::PartId;
use near_primitives::state_sync::{
get_num_state_parts, BitArray, CachedParts, ReceiptProofResponse, RootProof,
ShardStateSyncResponseHeader, ShardStateSyncResponseHeaderV2, StateHeaderKey, StatePartKey,
get_num_state_parts, ReceiptProofResponse, RootProof, ShardStateSyncResponseHeader,
ShardStateSyncResponseHeaderV2, StateHeaderKey, StatePartKey,
};
use near_primitives::stateless_validation::state_witness::{
ChunkStateWitness, ChunkStateWitnessSize,
Expand Down Expand Up @@ -3864,41 +3863,6 @@ impl Chain {

Ok((make_snapshot, delete_snapshot))
}

/// Returns a description of state parts cached for the given shard of the given epoch.
pub fn get_cached_state_parts(
&self,
sync_hash: CryptoHash,
shard_id: ShardId,
num_parts: u64,
) -> Result<CachedParts, Error> {
let _span = tracing::debug_span!(target: "chain", "get_cached_state_parts").entered();
// DBCol::StateParts is keyed by StatePartKey: (BlockHash || ShardId || PartId (u64)).
let lower_bound = StatePartKey(sync_hash, shard_id, 0);
let lower_bound = borsh::to_vec(&lower_bound)?;
let upper_bound = StatePartKey(sync_hash, shard_id + 1, 0);
let upper_bound = borsh::to_vec(&upper_bound)?;
let mut num_cached_parts = 0;
let mut bit_array = BitArray::new(num_parts);
for item in self.chain_store.store().iter_range(
DBCol::StateParts,
Some(&lower_bound),
Some(&upper_bound),
) {
let key = item?.0;
let key = StatePartKey::try_from_slice(&key)?;
let part_id = key.2;
num_cached_parts += 1;
bit_array.set_bit(part_id);
}
Ok(if num_cached_parts == 0 {
CachedParts::NoParts
} else if num_cached_parts == num_parts {
CachedParts::AllParts
} else {
CachedParts::BitArray(bit_array)
})
}
}

/// This method calculates the congestion info for the genesis chunks. It uses
Expand Down
30 changes: 2 additions & 28 deletions chain/client/src/view_client_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1358,17 +1358,6 @@ impl Handler<StateRequestHeader> for ViewClientActorInner {
};
let state_response = match header {
Some(header) => {
let num_parts = header.num_state_parts();
let cached_parts = match self
.chain
.get_cached_state_parts(sync_hash, shard_id, num_parts)
{
Ok(cached_parts) => Some(cached_parts),
Err(err) => {
tracing::error!(target: "sync", ?err, ?sync_hash, shard_id, "Failed to get cached state parts");
None
}
};
let header = match header {
ShardStateSyncResponseHeader::V2(inner) => inner,
_ => {
Expand All @@ -1381,7 +1370,7 @@ impl Handler<StateRequestHeader> for ViewClientActorInner {
ShardStateSyncResponse::V3(ShardStateSyncResponseV3 {
header: Some(header),
part: None,
cached_parts,
cached_parts: None,
can_generate,
})
}
Expand Down Expand Up @@ -1444,26 +1433,11 @@ impl Handler<StateRequestPart> for ViewClientActorInner {
None
}
};
let num_parts = part.as_ref().and_then(|_| match self.chain.get_state_response_header(shard_id, sync_hash) {
Ok(header) => Some(header.num_state_parts()),
Err(err) => {
tracing::error!(target: "sync", ?err, ?sync_hash, shard_id, "Failed to get num state parts");
None
}
});
let cached_parts = num_parts.and_then(|num_parts|
match self.chain.get_cached_state_parts(sync_hash, shard_id, num_parts) {
Ok(cached_parts) => Some(cached_parts),
Err(err) => {
tracing::error!(target: "sync", ?err, ?sync_hash, shard_id, "Failed to get cached state parts");
None
}
});
let can_generate = part.is_some();
let state_response = ShardStateSyncResponse::V3(ShardStateSyncResponseV3 {
header: None,
part,
cached_parts,
cached_parts: None,
can_generate,
});
let info =
Expand Down
5 changes: 1 addition & 4 deletions core/primitives/src/state_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,8 @@ pub struct ShardStateSyncResponseV2 {
pub struct ShardStateSyncResponseV3 {
pub header: Option<ShardStateSyncResponseHeaderV2>,
pub part: Option<(u64, Vec<u8>)>,
/// Parts that can be provided **cheaply**.
// Can be `None` only if both `header` and `part` are `None`.
// TODO(saketh): deprecate unused fields cached_parts and can_generate
pub cached_parts: Option<CachedParts>,
/// Whether the node can provide parts for this epoch of this shard.
/// Assumes that a node can either provide all state parts or no state parts.
pub can_generate: bool,
}

Expand Down
43 changes: 6 additions & 37 deletions integration-tests/src/tests/client/sync_state_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use near_o11y::testonly::{init_integration_logger, init_test_logger};
use near_o11y::WithSpanContextExt;
use near_primitives::shard_layout::ShardUId;
use near_primitives::state_part::PartId;
use near_primitives::state_sync::{CachedParts, StatePartKey};
use near_primitives::state_sync::StatePartKey;
use near_primitives::transaction::SignedTransaction;
use near_primitives::types::{BlockId, BlockReference, EpochId, EpochReference};
use near_primitives::utils::MaybeValidated;
Expand Down Expand Up @@ -858,35 +858,21 @@ fn test_state_sync_headers() {
None => return ControlFlow::Continue(()),
};
let state_response = state_response_info.take_state_response();
let cached_parts = state_response.cached_parts().clone();
let can_generate = state_response.can_generate();
assert!(state_response.part().is_none());
if let Some(_header) = state_response.take_header() {
if !can_generate {
tracing::info!(
?sync_hash,
shard_id,
?cached_parts,
can_generate,
"got header but cannot generate"
);
return ControlFlow::Continue(());
}
tracing::info!(
?sync_hash,
shard_id,
?cached_parts,
can_generate,
"got header"
);
tracing::info!(?sync_hash, shard_id, can_generate, "got header");
} else {
tracing::info!(
?sync_hash,
shard_id,
?cached_parts,
can_generate,
"got no header"
);
tracing::info!(?sync_hash, shard_id, can_generate, "got no header");
return ControlFlow::Continue(());
}

Expand Down Expand Up @@ -915,36 +901,19 @@ fn test_state_sync_headers() {
let part = state_response.part().clone();
assert!(state_response.take_header().is_none());
if let Some((part_id, _part)) = part {
if !can_generate
|| cached_parts != Some(CachedParts::AllParts)
|| part_id != 0
{
if !can_generate || cached_parts != None || part_id != 0 {
tracing::info!(
?sync_hash,
shard_id,
?cached_parts,
can_generate,
part_id,
"got part but shard info is unexpected"
);
return ControlFlow::Continue(());
}
tracing::info!(
?sync_hash,
shard_id,
?cached_parts,
can_generate,
part_id,
"got part"
);
tracing::info!(?sync_hash, shard_id, can_generate, part_id, "got part");
} else {
tracing::info!(
?sync_hash,
shard_id,
?cached_parts,
can_generate,
"got no part"
);
tracing::info!(?sync_hash, shard_id, can_generate, "got no part");
return ControlFlow::Continue(());
}
}
Expand Down
Loading