Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: removed try_join to await futures #337

Merged
merged 6 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions cache-storage/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use futures::FutureExt;

mod utils;

#[derive(Clone)]
Expand Down Expand Up @@ -140,8 +142,15 @@ impl BlocksByFinalityCache {
self.cache_storage.set(block_type, json_streamer_message);

// Wait for both futures to complete
futures::try_join!(update_height_feature, update_stream_msg_feature)?;
futures::future::join_all([
update_height_feature.boxed(),
update_stream_msg_feature.boxed(),
])
.await
.into_iter()
.collect::<anyhow::Result<_>>()?;
};

Ok(())
}

Expand Down Expand Up @@ -260,8 +269,11 @@ impl TxIndexerCache {
let del_tx_outcomes_future = self
.cache_storage
.del(format!("outcomes_{}", transaction_key));
futures::try_join!(del_tx_future, del_tx_outcomes_future,)?;
Ok(())

futures::future::join_all([del_tx_future.boxed(), del_tx_outcomes_future.boxed()])
.await
.into_iter()
.collect::<anyhow::Result<_>>()
}

pub async fn set_outcomes_and_receipts(
Expand Down
6 changes: 4 additions & 2 deletions database/src/base/state_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ pub trait StateIndexerDbManager {
let add_block_future = self.save_block(block_height, block_hash);
let add_chunks_future = self.save_chunks(block_height, chunks);

futures::try_join!(add_block_future, add_chunks_future)?;
Ok(())
futures::future::join_all([add_block_future, add_chunks_future])
.await
.into_iter()
.collect::<anyhow::Result<()>>()
}

async fn save_state_changes_data(
Expand Down
7 changes: 5 additions & 2 deletions database/src/base/tx_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ pub trait TxIndexerDbManager {
) -> anyhow::Result<()> {
let save_outcome_future = self.save_outcomes(shard_id, outcomes);
let save_receipt_future = self.save_receipts(shard_id, receipts);
futures::try_join!(save_outcome_future, save_receipt_future)?;
Ok(())

futures::future::join_all([save_outcome_future, save_receipt_future])
.await
.into_iter()
.collect::<anyhow::Result<()>>()
}

async fn update_meta(&self, indexer_id: &str, block_height: u64) -> anyhow::Result<()>;
Expand Down
11 changes: 9 additions & 2 deletions database/src/postgres/state_indexer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use bigdecimal::ToPrimitive;
use futures::FutureExt;

impl crate::PostgresDBManager {
async fn save_chunks_unique(
Expand Down Expand Up @@ -107,8 +108,14 @@ impl crate::StateIndexerDbManager for crate::PostgresDBManager {
) -> anyhow::Result<()> {
let save_chunks_unique_future = self.save_chunks_unique(block_height, chunks.clone());
let save_chunks_duplicate_future = self.save_chunks_duplicate(block_height, chunks);
futures::try_join!(save_chunks_unique_future, save_chunks_duplicate_future)?;
Ok(())

futures::future::join_all([
save_chunks_unique_future.boxed(),
save_chunks_duplicate_future.boxed(),
])
.await
.into_iter()
.collect::<anyhow::Result<()>>()
}

async fn get_block_height_by_hash(
Expand Down
69 changes: 47 additions & 22 deletions logic-state-indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::collections::HashMap;
use near_indexer_primitives::views::StateChangeValueView;
use near_indexer_primitives::CryptoHash;

use futures::FutureExt;
use itertools::Itertools;

#[macro_use]
Expand Down Expand Up @@ -32,7 +33,7 @@ struct StateChangesToStore {

impl StateChangesToStore {
// Unpack the state_changes_data into futures split by shard_id
// and store them asynchronously using try_join!
// and store them asynchronously using join_all
async fn save_data(
&self,
db_manager: &(impl database::StateIndexerDbManager + Sync + Send + 'static),
Expand All @@ -57,13 +58,16 @@ impl StateChangesToStore {
)
})
.collect();
futures::future::try_join_all(futures).await?;
futures::future::join_all(futures)
.await
.into_iter()
.collect::<anyhow::Result<_>>()?;
}
Ok(())
}

// Unpack the state_changes_access_key into futures split by shard_id
// and store them asynchronously using try_join!
// and store them asynchronously using join_all
async fn save_access_key(
&self,
db_manager: &(impl database::StateIndexerDbManager + Sync + Send + 'static),
Expand All @@ -88,13 +92,17 @@ impl StateChangesToStore {
)
})
.collect();
futures::future::try_join_all(futures).await?;

futures::future::join_all(futures)
.await
.into_iter()
.collect::<anyhow::Result<_>>()?;
}
Ok(())
}

// Unpack the state_changes_contract into futures split by shard_id
// and store them asynchronously using try_join!
// and store them asynchronously using join_all
async fn save_contract(
&self,
db_manager: &(impl database::StateIndexerDbManager + Sync + Send + 'static),
Expand All @@ -119,13 +127,17 @@ impl StateChangesToStore {
)
})
.collect();
futures::future::try_join_all(futures).await?;

futures::future::join_all(futures)
.await
.into_iter()
.collect::<anyhow::Result<_>>()?;
}
Ok(())
}

// Unpack the state_changes_account into futures split by shard_id
// and store them asynchronously using try_join!
// and store them asynchronously using join_all
async fn save_account(
&self,
db_manager: &(impl database::StateIndexerDbManager + Sync + Send + 'static),
Expand All @@ -150,7 +162,11 @@ impl StateChangesToStore {
)
})
.collect();
futures::future::try_join_all(futures).await?;

futures::future::join_all(futures)
.await
.into_iter()
.collect::<anyhow::Result<_>>()?;
}
Ok(())
}
Expand All @@ -165,12 +181,17 @@ impl StateChangesToStore {
let save_access_key_future = self.save_access_key(db_manager, block_height, block_hash);
let save_contract_future = self.save_contract(db_manager, block_height, block_hash);
let save_account_future = self.save_account(db_manager, block_height, block_hash);
futures::try_join!(
save_data_future,
save_access_key_future,
save_contract_future,
save_account_future
)?;

futures::future::join_all([
save_data_future.boxed(),
save_access_key_future.boxed(),
save_contract_future.boxed(),
save_account_future.boxed(),
])
.await
.into_iter()
.collect::<anyhow::Result<_>>()?;

Ok(())
}
}
Expand All @@ -190,10 +211,11 @@ struct ShardedStateChangesWithCause {
pub async fn handle_streamer_message(
streamer_message: near_indexer_primitives::StreamerMessage,
db_manager: &(impl database::StateIndexerDbManager + Sync + Send + 'static),
near_client: &(impl NearClient + std::fmt::Debug),
near_client: &(impl NearClient + std::fmt::Debug + Sync),
indexer_config: impl configuration::RightsizingConfig
+ configuration::IndexerConfig
+ std::fmt::Debug,
+ std::fmt::Debug
+ Sync,
stats: std::sync::Arc<tokio::sync::RwLock<metrics::Stats>>,
shard_layout: &near_primitives::shard_layout::ShardLayout,
) -> anyhow::Result<()> {
Expand Down Expand Up @@ -307,12 +329,15 @@ pub async fn handle_streamer_message(
let update_meta_future =
db_manager.update_meta(indexer_config.indexer_id().as_ref(), block_height);

futures::try_join!(
handle_epoch_future,
handle_block_future,
handle_state_change_future,
update_meta_future
)?;
futures::future::join_all([
handle_epoch_future.boxed(),
handle_block_future.boxed(),
handle_state_change_future.boxed(),
update_meta_future.boxed(),
])
.await
.into_iter()
.collect::<anyhow::Result<_>>()?;

metrics::BLOCK_PROCESSED_TOTAL.inc();
// Prometheus Gauge Metric type do not support u64
Expand Down
5 changes: 4 additions & 1 deletion rpc-server/src/modules/blocks/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -562,8 +562,11 @@ async fn fetch_shards_by_cache_block(
shard_id,
)
});
futures::future::try_join_all(fetch_shards_futures)

futures::future::join_all(fetch_shards_futures)
.await
.into_iter()
.collect::<Result<_, _>>()
.map_err(|err| {
anyhow::anyhow!(
"Failed to fetch shards for block {} with error: {}",
Expand Down
3 changes: 2 additions & 1 deletion rpc-server/src/modules/blocks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,8 @@ impl BlocksInfoByFinality {
let final_block_future = crate::utils::get_final_block(near_rpc_client, false);
let optimistic_block_future = crate::utils::get_final_block(near_rpc_client, true);
let validators_future = crate::utils::get_current_validators(near_rpc_client);
let (final_block, optimistic_block, validators) = tokio::try_join!(

let (final_block, optimistic_block, validators) = futures::try_join!(
final_block_future,
optimistic_block_future,
validators_future,
Expand Down
2 changes: 1 addition & 1 deletion rpc-server/src/modules/gas/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub async fn gas_price(
{
let result = match &cache_block {
Ok(block) => {
if let None = gas_price_request.block_id {
if gas_price_request.block_id.is_none() {
gas_price_request.block_id =
Some(near_primitives::types::BlockId::Height(block.block_height));
};
Expand Down
13 changes: 6 additions & 7 deletions rpc-server/src/modules/queries/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,12 +250,12 @@ async fn view_code(
is_optimistic
);
let (code, account) = if is_optimistic {
tokio::try_join!(
futures::try_join!(
optimistic_view_code(data, block, account_id, "query_view_code"),
optimistic_view_account(data, block, account_id, "query_view_code"),
)?
} else {
tokio::try_join!(
futures::try_join!(
database_view_code(data, block, account_id, "query_view_code"),
database_view_account(data, block, account_id, "query_view_code"),
)?
Expand Down Expand Up @@ -367,19 +367,18 @@ async fn function_call(
maybe_optimistic_data,
data.prefetch_state_size_limit,
)
.await;
.await
.map_err(|err| err.to_rpc_query_error(block.block_height, block.block_hash))?;

let call_results =
call_results.map_err(|err| err.to_rpc_query_error(block.block_height, block.block_hash))?;
Ok(near_jsonrpc::primitives::types::query::RpcQueryResponse {
kind: near_jsonrpc::primitives::types::query::QueryResponseKind::CallResult(
near_primitives::views::CallResult {
result: call_results.result,
logs: call_results.logs,
},
),
block_height: block.block_height,
block_hash: block.block_hash,
block_height: call_results.block_height,
block_hash: call_results.block_hash,
})
}

Expand Down
Loading
Loading