Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v2.1: [rpc] Fatal getSignaturesForAddress() when Bigtable errors (backport of #3700) #4443

Merged
merged 1 commit into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions rpc-client-api/src/custom_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub const JSON_RPC_SERVER_ERROR_UNSUPPORTED_TRANSACTION_VERSION: i64 = -32015;
pub const JSON_RPC_SERVER_ERROR_MIN_CONTEXT_SLOT_NOT_REACHED: i64 = -32016;
pub const JSON_RPC_SERVER_ERROR_EPOCH_REWARDS_PERIOD_ACTIVE: i64 = -32017;
pub const JSON_RPC_SERVER_ERROR_SLOT_NOT_EPOCH_BOUNDARY: i64 = -32018;
pub const JSON_RPC_SERVER_ERROR_LONG_TERM_STORAGE_UNREACHABLE: i64 = -32019;

#[derive(Error, Debug)]
pub enum RpcCustomError {
Expand Down Expand Up @@ -75,6 +76,8 @@ pub enum RpcCustomError {
},
#[error("SlotNotEpochBoundary")]
SlotNotEpochBoundary { slot: Slot },
#[error("LongTermStorageUnreachable")]
LongTermStorageUnreachable,
}

#[derive(Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -244,6 +247,11 @@ impl From<RpcCustomError> for Error {
),
data: None,
},
RpcCustomError::LongTermStorageUnreachable => Self {
code: ErrorCode::ServerError(JSON_RPC_SERVER_ERROR_LONG_TERM_STORAGE_UNREACHABLE),
message: "Failed to query long-term storage; please try again".to_string(),
data: None,
},
}
}
}
190 changes: 95 additions & 95 deletions rpc/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1688,118 +1688,118 @@ impl JsonRpcRequestProcessor {
let commitment = config.commitment.unwrap_or_default();
check_is_at_least_confirmed(commitment)?;

if self.config.enable_rpc_transaction_history {
let highest_super_majority_root = self
.block_commitment_cache
.read()
.unwrap()
.highest_super_majority_root();
let highest_slot = if commitment.is_confirmed() {
let confirmed_bank = self.get_bank_with_config(config)?;
confirmed_bank.slot()
} else {
let min_context_slot = config.min_context_slot.unwrap_or_default();
if highest_super_majority_root < min_context_slot {
return Err(RpcCustomError::MinContextSlotNotReached {
context_slot: highest_super_majority_root,
}
.into());
if !self.config.enable_rpc_transaction_history {
return Err(RpcCustomError::TransactionHistoryNotAvailable.into());
}

let highest_super_majority_root = self
.block_commitment_cache
.read()
.unwrap()
.highest_super_majority_root();
let highest_slot = if commitment.is_confirmed() {
let confirmed_bank = self.get_bank_with_config(config)?;
confirmed_bank.slot()
} else {
let min_context_slot = config.min_context_slot.unwrap_or_default();
if highest_super_majority_root < min_context_slot {
return Err(RpcCustomError::MinContextSlotNotReached {
context_slot: highest_super_majority_root,
}
highest_super_majority_root
};
.into());
}
highest_super_majority_root
};

let SignatureInfosForAddress {
infos: mut results,
found_before,
} = self
.blockstore
.get_confirmed_signatures_for_address2(address, highest_slot, before, until, limit)
.map_err(|err| Error::invalid_params(format!("{err}")))?;
let SignatureInfosForAddress {
infos: mut results,
found_before,
} = self
.blockstore
.get_confirmed_signatures_for_address2(address, highest_slot, before, until, limit)
.map_err(|err| Error::invalid_params(format!("{err}")))?;

let map_results = |results: Vec<ConfirmedTransactionStatusWithSignature>| {
results
.into_iter()
.map(|x| {
let mut item: RpcConfirmedTransactionStatusWithSignature = x.into();
if item.slot <= highest_super_majority_root {
item.confirmation_status =
Some(TransactionConfirmationStatus::Finalized);
} else {
item.confirmation_status =
Some(TransactionConfirmationStatus::Confirmed);
if item.block_time.is_none() {
let r_bank_forks = self.bank_forks.read().unwrap();
item.block_time = r_bank_forks
.get(item.slot)
.map(|bank| bank.clock().unix_timestamp);
}
let map_results = |results: Vec<ConfirmedTransactionStatusWithSignature>| {
results
.into_iter()
.map(|x| {
let mut item: RpcConfirmedTransactionStatusWithSignature = x.into();
if item.slot <= highest_super_majority_root {
item.confirmation_status = Some(TransactionConfirmationStatus::Finalized);
} else {
item.confirmation_status = Some(TransactionConfirmationStatus::Confirmed);
if item.block_time.is_none() {
let r_bank_forks = self.bank_forks.read().unwrap();
item.block_time = r_bank_forks
.get(item.slot)
.map(|bank| bank.clock().unix_timestamp);
}
item
})
.collect()
};

if results.len() < limit {
if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage {
let mut bigtable_before = before;
if !results.is_empty() {
limit -= results.len();
bigtable_before = results.last().map(|x| x.signature);
}
item
})
.collect()
};

// If the oldest address-signature found in Blockstore has not yet been
// uploaded to long-term storage, modify the storage query to return all latest
// signatures to prevent erroring on RowNotFound. This can race with upload.
if found_before && bigtable_before.is_some() {
match bigtable_ledger_storage
.get_signature_status(&bigtable_before.unwrap())
.await
{
Err(StorageError::SignatureNotFound) => {
bigtable_before = None;
}
Err(err) => {
warn!("{:?}", err);
return Ok(map_results(results));
}
Ok(_) => {}
if results.len() < limit {
if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage {
let mut bigtable_before = before;
if !results.is_empty() {
limit -= results.len();
bigtable_before = results.last().map(|x| x.signature);
}

// If the oldest address-signature found in Blockstore has not yet been
// uploaded to long-term storage, modify the storage query to return all latest
// signatures to prevent erroring on RowNotFound. This can race with upload.
if found_before && bigtable_before.is_some() {
match bigtable_ledger_storage
.get_signature_status(&bigtable_before.unwrap())
.await
{
Err(StorageError::SignatureNotFound) => {
bigtable_before = None;
}
Err(err) => {
warn!("Failed to query Bigtable: {:?}", err);
return Err(RpcCustomError::LongTermStorageUnreachable.into());
}
Ok(_) => {}
}
}

let bigtable_results = bigtable_ledger_storage
.get_confirmed_signatures_for_address(
&address,
bigtable_before.as_ref(),
until.as_ref(),
limit,
)
.await;
match bigtable_results {
Ok(bigtable_results) => {
let results_set: HashSet<_> =
results.iter().map(|result| result.signature).collect();
for (bigtable_result, _) in bigtable_results {
// In the upload race condition, latest address-signatures in
// long-term storage may include original `before` signature...
if before != Some(bigtable_result.signature)
let bigtable_results = bigtable_ledger_storage
.get_confirmed_signatures_for_address(
&address,
bigtable_before.as_ref(),
until.as_ref(),
limit,
)
.await;
match bigtable_results {
Ok(bigtable_results) => {
let results_set: HashSet<_> =
results.iter().map(|result| result.signature).collect();
for (bigtable_result, _) in bigtable_results {
// In the upload race condition, latest address-signatures in
// long-term storage may include original `before` signature...
if before != Some(bigtable_result.signature)
// ...or earlier Blockstore signatures
&& !results_set.contains(&bigtable_result.signature)
{
results.push(bigtable_result);
}
{
results.push(bigtable_result);
}
}
Err(err) => {
warn!("{:?}", err);
}
}
Err(StorageError::SignatureNotFound) => {}
Err(err) => {
warn!("Failed to query Bigtable: {:?}", err);
return Err(RpcCustomError::LongTermStorageUnreachable.into());
}
}
}

Ok(map_results(results))
} else {
Err(RpcCustomError::TransactionHistoryNotAvailable.into())
}

Ok(map_results(results))
}

pub async fn get_first_available_block(&self) -> Slot {
Expand Down
12 changes: 10 additions & 2 deletions storage-bigtable/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,11 @@ impl LedgerStorage {
Some(before_signature) => {
let TransactionInfo { slot, index, .. } = bigtable
.get_bincode_cell("tx", before_signature.to_string())
.await?;
.await
.map_err(|err| match err {
bigtable::Error::RowNotFound => Error::SignatureNotFound,
_ => err.into(),
})?;

(slot, index)
}
Expand All @@ -808,7 +812,11 @@ impl LedgerStorage {
Some(until_signature) => {
let TransactionInfo { slot, index, .. } = bigtable
.get_bincode_cell("tx", until_signature.to_string())
.await?;
.await
.map_err(|err| match err {
bigtable::Error::RowNotFound => Error::SignatureNotFound,
_ => err.into(),
})?;

(slot, index)
}
Expand Down
Loading