Skip to content

Commit

Permalink
1. Add retry to get objects from s3 bucket
Browse files Browse the repository at this point in the history
2. Logs inmprovements for shadow_data_consistency. Add Full objects in log if data mismatch
  • Loading branch information
kobayurii committed Jul 21, 2023
1 parent 80b6ff1 commit 3835ee2
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 33 deletions.
92 changes: 59 additions & 33 deletions rpc-server/src/modules/blocks/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,54 @@ use crate::config::ServerContext;
use crate::modules::blocks::methods::fetch_block;
use crate::modules::blocks::CacheBlock;
use crate::storage::ScyllaDBManager;
use anyhow::Context;
use near_primitives::views::{StateChangeValueView, StateChangesRequestView};
use num_traits::ToPrimitive;

/// Fetch object from s3 with retry
/// Try to get 3 times if we have some problems with network
async fn fetch_object_from_s3_with_retry<T>(
s3_client: &aws_sdk_s3::Client,
s3_bucket_name: &str,
key: String,
) -> anyhow::Result<T>
where
T: serde::de::DeserializeOwned,
{
let mut retry_count: u8 = 0;
let s3_response = loop {
match s3_client
.get_object()
.bucket(s3_bucket_name)
.key(&key)
.send()
.await
{
Ok(response) => break Ok(response),
Err(err) => match err {
// Do retry on network errors
aws_sdk_s3::types::SdkError::ConstructionFailure(_)
| aws_sdk_s3::types::SdkError::TimeoutError(_)
| aws_sdk_s3::types::SdkError::DispatchFailure(_)
| aws_sdk_s3::types::SdkError::ResponseError(_) => {
tracing::debug!("Error to get object from s3, Retry");
if retry_count < 3 {
retry_count += 1;
continue;
} else {
// Stop retrying after 3 times
break Err(err);
}
}
// Stop retrying if we get response from s3 with error related to object
_ => break Err(err),
},
}
}?;
let body_bytes = s3_response.body.collect().await?.into_bytes();

Ok(serde_json::from_slice::<T>(body_bytes.as_ref())?)
}

#[cfg_attr(
feature = "tracing-instrumentation",
tracing::instrument(skip(s3_client))
Expand All @@ -17,24 +61,15 @@ pub async fn fetch_block_from_s3(
) -> Result<near_primitives::views::BlockView, near_jsonrpc_primitives::types::blocks::RpcBlockError>
{
tracing::debug!("`fetch_block_from_s3` call");
match s3_client
.get_object()
.bucket(s3_bucket_name)
.key(format!("{:0>12}/block.json", block_height))
.send()
.await

match fetch_object_from_s3_with_retry::<near_primitives::views::BlockView>(
s3_client,
s3_bucket_name,
format!("{:0>12}/block.json", block_height),
)
.await
{
Ok(response) => {
let body_bytes = response.body.collect().await.unwrap().into_bytes();
match serde_json::from_slice::<near_primitives::views::BlockView>(body_bytes.as_ref()) {
Ok(block) => Ok(block),
Err(err) => Err(
near_jsonrpc_primitives::types::blocks::RpcBlockError::UnknownBlock {
error_message: err.to_string(),
},
),
}
}
Ok(block) => Ok(block),
Err(err) => Err(
near_jsonrpc_primitives::types::blocks::RpcBlockError::UnknownBlock {
error_message: err.to_string(),
Expand All @@ -54,22 +89,13 @@ pub async fn fetch_shard_from_s3(
shard_id: near_primitives::types::ShardId,
) -> anyhow::Result<near_indexer_primitives::IndexerShard> {
tracing::debug!("`fetch_shard_from_s3` call");
let response = s3_client
.get_object()
.bucket(s3_bucket_name)
.key(format!("{:0>12}/shard_{shard_id}.json", block_height))
.send()
.await
.with_context(|| "Error to get object from s3")?;
let body_bytes = response
.body
.collect()
.await
.with_context(|| "Invalid data from s3")?
.into_bytes();

serde_json::from_slice::<near_indexer_primitives::IndexerShard>(body_bytes.as_ref())
.with_context(|| "Invalid serialised data")
fetch_object_from_s3_with_retry::<near_indexer_primitives::IndexerShard>(
s3_client,
s3_bucket_name,
format!("{:0>12}/shard_{shard_id}.json", block_height),
)
.await
}

#[cfg_attr(
Expand Down
6 changes: 6 additions & 0 deletions rpc-server/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,12 @@ where
let near_rpc_json = json_sort_value(near_rpc_response_json);

if let Err(err) = assert_json_matches_no_panic(&read_rpc_json, &near_rpc_json, config) {
tracing::warn!(
target: "shadow_data_consistency",
"Shadow data check: DATA MISMATCH\n READ_RPC_DATA: {:?}\n NEAR_RPC_DATA: {:?}\n",
read_rpc_json,
near_rpc_json,
);
return Err(ShadowDataConsistencyError::ResultsDontMatch(err));
};
Ok(())
Expand Down

0 comments on commit 3835ee2

Please sign in to comment.