Skip to content

Commit

Permalink
add docstrings
Browse files Browse the repository at this point in the history
  • Loading branch information
kobayurii committed Aug 13, 2024
1 parent 0ffc61f commit 5be2c36
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 18 deletions.
55 changes: 42 additions & 13 deletions cache-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,20 @@ impl RedisCacheStorage {
Ok(())
}

pub async fn del(&self, key: impl redis::ToRedisArgs + std::fmt::Debug) -> anyhow::Result<()> {
pub async fn delete(
&self,
key: impl redis::ToRedisArgs + std::fmt::Debug,
) -> anyhow::Result<()> {
redis::cmd("DEL")
.arg(&key)
.query_async(&mut self.client.clone())
.await?;
Ok(())
}

async fn rpush(
// Insert all the specified values at the tail of the list stored at key.
// If key does not exist, it is created as empty list before performing the push operation.
async fn insert_or_create(
&self,
key: impl redis::ToRedisArgs + std::fmt::Debug,
value: impl redis::ToRedisArgs + std::fmt::Debug,
Expand All @@ -79,7 +84,9 @@ impl RedisCacheStorage {
Ok(())
}

async fn rpushx(
// Inserts specified values at the tail of the list stored at key, only if key already exists and holds a list.
// In contrary to RPUSH, no operation will be performed when key does not yet exist.
async fn insert_or_ignore(
&self,
key: impl redis::ToRedisArgs + std::fmt::Debug,
value: impl redis::ToRedisArgs + std::fmt::Debug,
Expand All @@ -92,7 +99,8 @@ impl RedisCacheStorage {
Ok(())
}

async fn lrange<V: redis::FromRedisValue + std::fmt::Debug>(
// Returns the specified elements of the list stored at key.
async fn list<V: redis::FromRedisValue + std::fmt::Debug>(
&self,
key: impl redis::ToRedisArgs + std::fmt::Debug,
) -> anyhow::Result<V> {
Expand Down Expand Up @@ -190,6 +198,8 @@ impl TxIndexerCache {
})
}

// Try to find the transaction by the transaction hash in the cache storage.
// If the transaction is found by key prefix, return the transaction details by the first key.
pub async fn get_tx_by_tx_hash(
&self,
tx_hash: &near_indexer_primitives::CryptoHash,
Expand All @@ -205,10 +215,11 @@ impl TxIndexerCache {
}
None => anyhow::bail!("Key does not exists"),
};
let tx_details = self.get_tx_full(&tx_key).await?;
let tx_details = self.get_tx_with_outcomes(&tx_key).await?;
Ok(tx_details.into())
}

// Help method to get all the transactions in process and restore after tx indexer interruption.
pub async fn get_txs_in_process(
&self,
) -> anyhow::Result<Vec<readnode_primitives::TransactionKey>> {
Expand All @@ -221,6 +232,7 @@ impl TxIndexerCache {
.collect())
}

// get the transaction details without outcomes by the transaction key.
pub async fn get_tx(
&self,
transaction_key: &readnode_primitives::TransactionKey,
Expand All @@ -232,13 +244,14 @@ impl TxIndexerCache {
utils::from_slice::<readnode_primitives::CollectingTransactionDetails>(&result)
}

// get the transaction outcomes by the transaction key.
pub async fn get_tx_outcomes(
&self,
transaction_key: &readnode_primitives::TransactionKey,
) -> anyhow::Result<Vec<near_indexer_primitives::IndexerExecutionOutcomeWithReceipt>> {
Ok(self
.cache_storage
.lrange::<Vec<Vec<u8>>>(format!("outcomes_{}", transaction_key))
.list::<Vec<Vec<u8>>>(format!("outcomes_{}", transaction_key))
.await?
.iter()
.map(|outcome| {
Expand All @@ -250,58 +263,74 @@ impl TxIndexerCache {
.collect())
}

pub async fn get_tx_full(
// Get the transaction details with outcomes by the transaction key.
pub async fn get_tx_with_outcomes(
&self,
transaction_key: &readnode_primitives::TransactionKey,
) -> anyhow::Result<readnode_primitives::CollectingTransactionDetails> {
let mut tx = self.get_tx(transaction_key).await?;
for outcome in self.get_tx_outcomes(transaction_key).await? {
// Skip the outcome that is already in the transaction
if outcome.execution_outcome.id == tx.transaction_outcome.id {
continue;
};
tx.execution_outcomes.push(outcome.execution_outcome);
tx.receipts.push(outcome.receipt);
}
Ok(tx)
}

// Set the transaction details and outcomes by the transaction key.
pub async fn set_tx(
&self,
transaction_details: readnode_primitives::CollectingTransactionDetails,
) -> anyhow::Result<()> {
// Set the transaction details into the cache storage
let set_tx_future = self.cache_storage.set(
format!("transaction_{}", transaction_details.transaction_key()),
utils::to_vec(&transaction_details)?,
);
let set_outcomes_future = self.cache_storage.rpush(
// Set the transaction outcomes into the cache storage
let set_outcomes_future = self.cache_storage.insert_or_create(
format!("outcomes_{}", transaction_details.transaction_key()),
utils::to_vec(&transaction_details.transaction_outcome)?,
);
futures::try_join!(set_tx_future, set_outcomes_future,)?;
Ok(())
futures::future::join_all([set_tx_future.boxed(), set_outcomes_future.boxed()])
.await
.into_iter()
.collect::<anyhow::Result<_>>()
}

// Delete the transaction details and outcomes from cache storage by the transaction key.
pub async fn del_tx(
&self,
transaction_key: &readnode_primitives::TransactionKey,
) -> anyhow::Result<()> {
// Delete the transaction details
let del_tx_future = self
.cache_storage
.del(format!("transaction_{}", transaction_key));
.delete(format!("transaction_{}", transaction_key));
// Delete the transaction outcomes
let del_tx_outcomes_future = self
.cache_storage
.del(format!("outcomes_{}", transaction_key));
.delete(format!("outcomes_{}", transaction_key));

futures::future::join_all([del_tx_future.boxed(), del_tx_outcomes_future.boxed()])
.await
.into_iter()
.collect::<anyhow::Result<_>>()
}

// Add the transaction outcomes by the transaction key.
// This inserts the outcomes into the list stored at key, only if key already exists and holds a list.
// This is needed to avoid adding outcomes to the list if the transaction is already deleted.
pub async fn set_outcomes_and_receipts(
&self,
transaction_key: &readnode_primitives::TransactionKey,
indexer_execution_outcome_with_receipt: near_indexer_primitives::IndexerExecutionOutcomeWithReceipt,
) -> anyhow::Result<()> {
self.cache_storage
.rpushx(
.insert_or_ignore(
format!("outcomes_{}", transaction_key),
utils::to_vec(&indexer_execution_outcome_with_receipt)?,
)
Expand Down
2 changes: 1 addition & 1 deletion logic-state-indexer/src/configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub async fn get_start_block_height(
}
StartOptions::FromLatest => final_block_height(near_client).await?,
};
Ok(start_block_height - 100) // Start just a bit earlier to avoid missing blocks
Ok(start_block_height - 100) // Start just a bit earlier to overlap indexed blocks to ensure we don't miss anything in-between
}

pub(crate) async fn final_block_height(
Expand Down
6 changes: 6 additions & 0 deletions readnode-primitives/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ impl CollectingTransactionDetails {
TransactionKey::new(self.transaction.hash, self.block_height)
}

// Finding the final status of the transaction
// The final status for finalized transaction should be either SuccessValue or Failure
pub fn final_status(&self) -> Option<views::FinalExecutionStatus> {
let mut looking_for_id = self.transaction.hash;
let mut execution_outcomes = vec![self.transaction_outcome.clone()];
Expand All @@ -83,18 +85,22 @@ impl CollectingTransactionDetails {
execution_outcomes.iter().find_map(|outcome_with_id| {
if outcome_with_id.id == looking_for_id {
match &outcome_with_id.outcome.status {
// If transaction just created and include only one outcome, the status should be NotStarted
views::ExecutionStatusView::Unknown if num_outcomes == 1 => {
Some(views::FinalExecutionStatus::NotStarted)
}
// If transaction has more than one outcome, the status should be Started
views::ExecutionStatusView::Unknown => {
Some(views::FinalExecutionStatus::Started)
}
// The final status for finalized transaction should be either SuccessValue or Failure
views::ExecutionStatusView::Failure(e) => {
Some(views::FinalExecutionStatus::Failure(e.clone()))
}
views::ExecutionStatusView::SuccessValue(v) => {
Some(views::FinalExecutionStatus::SuccessValue(v.clone()))
}
// If status SuccessReceiptId we should find the next outcome by id and check the status
views::ExecutionStatusView::SuccessReceiptId(id) => {
looking_for_id = *id;
None
Expand Down
2 changes: 1 addition & 1 deletion tx-indexer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub(crate) async fn get_start_block_height(
}
StartOptions::FromLatest => final_block_height(rpc_client).await?,
};
Ok(start_block_height - 100) // Start just a bit earlier to avoid missing transactions
Ok(start_block_height - 100) // Start just a bit earlier to overlap indexed blocks to ensure we don't miss anything in-between
}

pub async fn final_block_height(rpc_client: &JsonRpcClient) -> anyhow::Result<u64> {
Expand Down
7 changes: 4 additions & 3 deletions tx-indexer/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ impl CacheStorage {
Ok(())
}

#[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip_all))]
async fn push_outcome_and_receipt_to_storage(
&self,
transaction_key: readnode_primitives::TransactionKey,
Expand Down Expand Up @@ -185,9 +186,9 @@ impl CacheStorage {
transaction_details
.execution_outcomes
.push(indexer_execution_outcome_with_receipt.execution_outcome);
let transaction_receipts_watching_count =
self.receipts_transaction_count(transaction_key).await?;
if transaction_receipts_watching_count == 0 {
// Check receipts counter and if all receipts and outcomes already collected
// then we move the transaction to save otherwise update it and wait for the rest of the receipts
if self.receipts_transaction_count(transaction_key).await? == 0 {
self.move_tx_to_save(transaction_details.clone()).await?;
} else {
self.update_tx(transaction_details.clone()).await?;
Expand Down

0 comments on commit 5be2c36

Please sign in to comment.