Skip to content

Commit

Permalink
1. Add retry to get objects from s3 bucket
Browse files Browse the repository at this point in the history
2. Logs inmprovements for shadow_data_consistency. Add Full objects in log if data mismatch
  • Loading branch information
kobayurii committed Jul 23, 2023
1 parent 80b6ff1 commit ba203e6
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 65 deletions.
62 changes: 49 additions & 13 deletions rpc-server/src/modules/blocks/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::modules::blocks::utils::{
scylla_db_convert_chunk_hash_to_block_height_and_shard_id,
};
#[cfg(feature = "shadow_data_consistency")]
use crate::utils::shadow_compare_results;
use crate::utils::{shadow_compare_results, ResponseState};
use jsonrpc_v2::{Data, Params};

use crate::utils::proxy_rpc_call;
Expand Down Expand Up @@ -62,12 +62,21 @@ pub async fn chunk(
{
let near_rpc_client = data.near_rpc_client.clone();
let error_meta = format!("CHUNK: {:?}", params);
let mut response_state = ResponseState::Successful;
let read_rpc_response_json = match &result {
Ok(res) => serde_json::to_value(&res.chunk_view),
Err(err) => serde_json::to_value(err),
Err(err) => {
response_state = ResponseState::Failure;
serde_json::to_value(err)
}
};
let comparison_result =
shadow_compare_results(read_rpc_response_json, near_rpc_client, params).await;
let comparison_result = shadow_compare_results(
read_rpc_response_json,
near_rpc_client,
params,
response_state,
)
.await;

match comparison_result {
Ok(_) => {
Expand Down Expand Up @@ -172,6 +181,7 @@ async fn block_call(
{
let near_rpc_client = data.near_rpc_client.clone();
let error_meta = format!("BLOCK: {:?}", params);
let mut response_state = ResponseState::Successful;
let read_rpc_response_json = match &result {
Ok(res) => {
if let near_primitives::types::BlockReference::Finality(_) = params.block_reference
Expand All @@ -182,10 +192,18 @@ async fn block_call(
};
serde_json::to_value(&res.block_view)
}
Err(err) => serde_json::to_value(err),
Err(err) => {
response_state = ResponseState::Failure;
serde_json::to_value(err)
}
};
let comparison_result =
shadow_compare_results(read_rpc_response_json, near_rpc_client, params).await;
let comparison_result = shadow_compare_results(
read_rpc_response_json,
near_rpc_client,
params,
response_state,
)
.await;

match comparison_result {
Ok(_) => {
Expand Down Expand Up @@ -225,12 +243,21 @@ async fn changes_in_block_call(
)
}
let error_meta = format!("CHANGES_IN_BLOCK: {:?}", params);
let mut response_state = ResponseState::Successful;
let read_rpc_response_json = match &result {
Ok(res) => serde_json::to_value(res),
Err(err) => serde_json::to_value(err),
Err(err) => {
response_state = ResponseState::Failure;
serde_json::to_value(err)
}
};
let comparison_result =
shadow_compare_results(read_rpc_response_json, near_rpc_client, params).await;
let comparison_result = shadow_compare_results(
read_rpc_response_json,
near_rpc_client,
params,
response_state,
)
.await;

match comparison_result {
Ok(_) => {
Expand Down Expand Up @@ -270,12 +297,21 @@ async fn changes_in_block_by_type_call(
)
}
let error_meta = format!("CHANGES_IN_BLOCK_BY_TYPE: {:?}", params);
let mut response_state = ResponseState::Successful;
let read_rpc_response_json = match &result {
Ok(res) => serde_json::to_value(res),
Err(err) => serde_json::to_value(err),
Err(err) => {
response_state = ResponseState::Failure;
serde_json::to_value(err)
}
};
let comparison_result =
shadow_compare_results(read_rpc_response_json, near_rpc_client, params).await;
let comparison_result = shadow_compare_results(
read_rpc_response_json,
near_rpc_client,
params,
response_state,
)
.await;

match comparison_result {
Ok(_) => {
Expand Down
92 changes: 59 additions & 33 deletions rpc-server/src/modules/blocks/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,54 @@ use crate::config::ServerContext;
use crate::modules::blocks::methods::fetch_block;
use crate::modules::blocks::CacheBlock;
use crate::storage::ScyllaDBManager;
use anyhow::Context;
use near_primitives::views::{StateChangeValueView, StateChangesRequestView};
use num_traits::ToPrimitive;

/// Fetch object from s3 with retry
/// Try to get 3 times if we have some problems with network
async fn fetch_object_from_s3_with_retry<T>(
s3_client: &aws_sdk_s3::Client,
s3_bucket_name: &str,
key: String,
) -> anyhow::Result<T>
where
T: serde::de::DeserializeOwned,
{
let mut retry_count: u8 = 0;
let s3_response = loop {
match s3_client
.get_object()
.bucket(s3_bucket_name)
.key(&key)
.send()
.await
{
Ok(response) => break Ok(response),
Err(err) => match err {
// Do retry on network errors
aws_sdk_s3::types::SdkError::ConstructionFailure(_)
| aws_sdk_s3::types::SdkError::TimeoutError(_)
| aws_sdk_s3::types::SdkError::DispatchFailure(_)
| aws_sdk_s3::types::SdkError::ResponseError(_) => {
tracing::debug!("Error to get object from s3, Retry");
if retry_count < 3 {
retry_count += 1;
continue;
} else {
// Stop retrying after 3 times
break Err(err);
}
}
// Stop retrying if we get response from s3 with error related to object
_ => break Err(err),
},
}
}?;
let body_bytes = s3_response.body.collect().await?.into_bytes();

Ok(serde_json::from_slice::<T>(body_bytes.as_ref())?)
}

#[cfg_attr(
feature = "tracing-instrumentation",
tracing::instrument(skip(s3_client))
Expand All @@ -17,24 +61,15 @@ pub async fn fetch_block_from_s3(
) -> Result<near_primitives::views::BlockView, near_jsonrpc_primitives::types::blocks::RpcBlockError>
{
tracing::debug!("`fetch_block_from_s3` call");
match s3_client
.get_object()
.bucket(s3_bucket_name)
.key(format!("{:0>12}/block.json", block_height))
.send()
.await

match fetch_object_from_s3_with_retry::<near_primitives::views::BlockView>(
s3_client,
s3_bucket_name,
format!("{:0>12}/block.json", block_height),
)
.await
{
Ok(response) => {
let body_bytes = response.body.collect().await.unwrap().into_bytes();
match serde_json::from_slice::<near_primitives::views::BlockView>(body_bytes.as_ref()) {
Ok(block) => Ok(block),
Err(err) => Err(
near_jsonrpc_primitives::types::blocks::RpcBlockError::UnknownBlock {
error_message: err.to_string(),
},
),
}
}
Ok(block) => Ok(block),
Err(err) => Err(
near_jsonrpc_primitives::types::blocks::RpcBlockError::UnknownBlock {
error_message: err.to_string(),
Expand All @@ -54,22 +89,13 @@ pub async fn fetch_shard_from_s3(
shard_id: near_primitives::types::ShardId,
) -> anyhow::Result<near_indexer_primitives::IndexerShard> {
tracing::debug!("`fetch_shard_from_s3` call");
let response = s3_client
.get_object()
.bucket(s3_bucket_name)
.key(format!("{:0>12}/shard_{shard_id}.json", block_height))
.send()
.await
.with_context(|| "Error to get object from s3")?;
let body_bytes = response
.body
.collect()
.await
.with_context(|| "Invalid data from s3")?
.into_bytes();

serde_json::from_slice::<near_indexer_primitives::IndexerShard>(body_bytes.as_ref())
.with_context(|| "Invalid serialised data")
fetch_object_from_s3_with_retry::<near_indexer_primitives::IndexerShard>(
s3_client,
s3_bucket_name,
format!("{:0>12}/shard_{shard_id}.json", block_height),
)
.await
}

#[cfg_attr(
Expand Down
17 changes: 13 additions & 4 deletions rpc-server/src/modules/queries/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::modules::queries::utils::{
};
use crate::utils::proxy_rpc_call;
#[cfg(feature = "shadow_data_consistency")]
use crate::utils::shadow_compare_results;
use crate::utils::{shadow_compare_results, ResponseState};
use borsh::BorshSerialize;
use jsonrpc_v2::{Data, Params};

Expand Down Expand Up @@ -110,6 +110,7 @@ async fn query_call(
let request_copy = params.request.clone();
let error_meta = format!("QUERY: {:?}", params);
let near_rpc_client = data.near_rpc_client.clone();
let mut response_state = ResponseState::Successful;
if let near_primitives::types::BlockReference::Finality(_) = params.block_reference {
params.block_reference = near_primitives::types::BlockReference::from(
near_primitives::types::BlockId::Height(block.block_height),
Expand All @@ -118,11 +119,19 @@ async fn query_call(

let read_rpc_response_json = match &result {
Ok(res) => serde_json::to_value(res),
Err(err) => serde_json::to_value(err),
Err(err) => {
response_state = ResponseState::Failure;
serde_json::to_value(err)
}
};

let comparison_result =
shadow_compare_results(read_rpc_response_json, near_rpc_client, params).await;
let comparison_result = shadow_compare_results(
read_rpc_response_json,
near_rpc_client,
params,
response_state,
)
.await;

match comparison_result {
Ok(_) => {
Expand Down
18 changes: 13 additions & 5 deletions rpc-server/src/modules/receipts/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use jsonrpc_v2::{Data, Params};
use crate::config::ServerContext;
use crate::errors::RPCError;
#[cfg(feature = "shadow_data_consistency")]
use crate::utils::shadow_compare_results;
use crate::utils::{shadow_compare_results, ResponseState};

/// Fetches a receipt by it's ID (as is, without a status or execution outcome)
#[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip(data)))]
Expand All @@ -20,14 +20,22 @@ pub async fn receipt(
{
let near_rpc_client = data.near_rpc_client.clone();
let error_meta = format!("TX: {:?}", params);

let mut response_state = ResponseState::Successful;
let read_rpc_response_json = match &result {
Ok(res) => serde_json::to_value(res),
Err(err) => serde_json::to_value(err),
Err(err) => {
response_state = ResponseState::Failure;
serde_json::to_value(err)
}
};

let comparison_result =
shadow_compare_results(read_rpc_response_json, near_rpc_client, params).await;
let comparison_result = shadow_compare_results(
read_rpc_response_json,
near_rpc_client,
params,
response_state,
)
.await;

match comparison_result {
Ok(_) => {
Expand Down
17 changes: 13 additions & 4 deletions rpc-server/src/modules/transactions/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::modules::transactions::{
};
use crate::utils::proxy_rpc_call;
#[cfg(feature = "shadow_data_consistency")]
use crate::utils::shadow_compare_results;
use crate::utils::{shadow_compare_results, ResponseState};
use jsonrpc_v2::{Data, Params};
use near_primitives::views::FinalExecutionOutcomeViewEnum::{
FinalExecutionOutcome, FinalExecutionOutcomeWithReceipt,
Expand All @@ -29,10 +29,13 @@ pub async fn tx(
{
let near_rpc_client = data.near_rpc_client.clone();
let error_meta = format!("TX: {:?}", params);

let mut response_state = ResponseState::Successful;
let read_rpc_response_json = match &result {
Ok(res) => serde_json::to_value(res),
Err(err) => serde_json::to_value(err),
Err(err) => {
response_state = ResponseState::Failure;
serde_json::to_value(err)
}
};

let comparison_result = shadow_compare_results(
Expand All @@ -44,6 +47,7 @@ pub async fn tx(
near_jsonrpc_client::methods::tx::RpcTransactionStatusRequest {
transaction_info: tx_status_request.transaction_info,
},
response_state,
)
.await;

Expand Down Expand Up @@ -78,10 +82,14 @@ pub async fn tx_status(
{
let near_rpc_client = data.near_rpc_client.clone();
let error_meta = format!("EXPERIMENTAL_TX_STATUS: {:?}", params);
let mut response_state = ResponseState::Successful;

let read_rpc_response_json = match &result {
Ok(res) => serde_json::to_value(res),
Err(err) => serde_json::to_value(err),
Err(err) => {
response_state = ResponseState::Failure;
serde_json::to_value(err)
}
};

let comparison_result = shadow_compare_results(
Expand All @@ -93,6 +101,7 @@ pub async fn tx_status(
near_jsonrpc_client::methods::EXPERIMENTAL_tx_status::RpcTransactionStatusRequest {
transaction_info: tx_status_request.transaction_info,
},
response_state,
)
.await;

Expand Down
Loading

0 comments on commit ba203e6

Please sign in to comment.