Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvement(rpc-server): Add retrying to fetch object from s3 #94

Merged
merged 1 commit into from
Jul 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions rpc-server/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::modules::blocks::CacheBlock;
use clap::Parser;

pub const DEFAULT_RETRY_COUNT: u8 = 3;

#[derive(Parser)]
#[clap(author, version, about, long_about = None)]
pub struct Opts {
Expand Down
92 changes: 59 additions & 33 deletions rpc-server/src/modules/blocks/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,54 @@ use crate::config::ServerContext;
use crate::modules::blocks::methods::fetch_block;
use crate::modules::blocks::CacheBlock;
use crate::storage::ScyllaDBManager;
use anyhow::Context;
use near_primitives::views::{StateChangeValueView, StateChangesRequestView};
use num_traits::ToPrimitive;

/// Fetch object from s3 with retry
/// Try to get 3 times if we have some problems with network
async fn fetch_object_from_s3_with_retry<T>(
s3_client: &aws_sdk_s3::Client,
s3_bucket_name: &str,
key: String,
) -> anyhow::Result<T>
where
T: serde::de::DeserializeOwned,
{
let mut retry_count: u8 = 0;
let s3_response = loop {
match s3_client
.get_object()
.bucket(s3_bucket_name)
.key(&key)
.send()
.await
{
Ok(response) => break Ok(response),
Err(err) => match err {
// Do retry on network errors
aws_sdk_s3::types::SdkError::ConstructionFailure(_)
| aws_sdk_s3::types::SdkError::TimeoutError(_)
| aws_sdk_s3::types::SdkError::DispatchFailure(_)
| aws_sdk_s3::types::SdkError::ResponseError(_) => {
tracing::debug!("Error to get object from s3, Retry");
if retry_count < crate::config::DEFAULT_RETRY_COUNT {
retry_count += 1;
continue;
} else {
// Stop retrying after 3 times
break Err(err);
}
}
// Stop retrying if we get response from s3 with error related to object
_ => break Err(err),
},
}
}?;
let body_bytes = s3_response.body.collect().await?.into_bytes();

Ok(serde_json::from_slice::<T>(body_bytes.as_ref())?)
}

#[cfg_attr(
feature = "tracing-instrumentation",
tracing::instrument(skip(s3_client))
Expand All @@ -17,24 +61,15 @@ pub async fn fetch_block_from_s3(
) -> Result<near_primitives::views::BlockView, near_jsonrpc_primitives::types::blocks::RpcBlockError>
{
tracing::debug!("`fetch_block_from_s3` call");
match s3_client
.get_object()
.bucket(s3_bucket_name)
.key(format!("{:0>12}/block.json", block_height))
.send()
.await

match fetch_object_from_s3_with_retry::<near_primitives::views::BlockView>(
s3_client,
s3_bucket_name,
format!("{:0>12}/block.json", block_height), // path to the block in the bucket
)
.await
{
Ok(response) => {
let body_bytes = response.body.collect().await.unwrap().into_bytes();
match serde_json::from_slice::<near_primitives::views::BlockView>(body_bytes.as_ref()) {
Ok(block) => Ok(block),
Err(err) => Err(
near_jsonrpc_primitives::types::blocks::RpcBlockError::UnknownBlock {
error_message: err.to_string(),
},
),
}
}
Ok(block) => Ok(block),
Err(err) => Err(
near_jsonrpc_primitives::types::blocks::RpcBlockError::UnknownBlock {
error_message: err.to_string(),
Expand All @@ -54,22 +89,13 @@ pub async fn fetch_shard_from_s3(
shard_id: near_primitives::types::ShardId,
) -> anyhow::Result<near_indexer_primitives::IndexerShard> {
tracing::debug!("`fetch_shard_from_s3` call");
let response = s3_client
.get_object()
.bucket(s3_bucket_name)
.key(format!("{:0>12}/shard_{shard_id}.json", block_height))
.send()
.await
.with_context(|| "Error to get object from s3")?;
let body_bytes = response
.body
.collect()
.await
.with_context(|| "Invalid data from s3")?
.into_bytes();

serde_json::from_slice::<near_indexer_primitives::IndexerShard>(body_bytes.as_ref())
.with_context(|| "Invalid serialised data")
fetch_object_from_s3_with_retry::<near_indexer_primitives::IndexerShard>(
s3_client,
s3_bucket_name,
format!("{:0>12}/shard_{shard_id}.json", block_height), // path to the shard in the bucket
)
.await
}

#[cfg_attr(
Expand Down
Loading