Skip to content

Commit

Permalink
Feature/immediately handle subintent nullification (#1016)
Browse files Browse the repository at this point in the history
## Summary

The main intention was to ensure subintent handling is fixed/immediate
in the pending result cache and the mempool.

As part of this ticket, I worked through a few refactors:
* Encapsulated mempool related systems directly under the
`MempoolManager` where it can (A) safely handle atomicity of operations
and avoid deadlocks, and (B) enable us to eventually safely refactor and
possibly merge the cache and mempool. That was the intended direction
~18 months ago, I'm not quite sure why it ended up not being finished -
but I've pushed through with the merger anyway. I think it works nicely.
* Improved the scope of our locks to be smaller. In one particular case
in `add_if_committable_internal` I expect this to have a non-trivial
impact on throughput.
* Improved the pending rejection "is permanent" checks to require
passing an `AtState` so we can't use the wrong thing by mistake; and
commonise the code.

## Testing

* Existing tests pass
* Fixed the test added in
#1015

Might add some more unit tests tomorrow.
  • Loading branch information
dhedey authored Nov 11, 2024
2 parents b80edf2 + 851b910 commit afed42a
Show file tree
Hide file tree
Showing 42 changed files with 832 additions and 499 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@ pub(crate) async fn handle_lts_transaction_status(
let intent_hash = extract_transaction_intent_hash(&extraction_context, request.intent_hash)
.map_err(|err| err.into_response_error("intent_hash"))?;

let pending_transaction_result_cache =
state.state_manager.pending_transaction_result_cache.read();
let mut known_pending_payloads =
pending_transaction_result_cache.peek_all_known_payloads_for_intent(&intent_hash);
drop(pending_transaction_result_cache);
let mut known_pending_payloads = state
.state_manager
.mempool_manager
.all_known_pending_payloads_for_intent(&intent_hash);

let database = state.state_manager.database.snapshot();

Expand Down Expand Up @@ -110,12 +109,13 @@ pub(crate) async fn handle_lts_transaction_status(
}));
}

let mempool = state.state_manager.mempool.read();
let mempool_payloads_hashes = mempool.get_notarized_transaction_hashes_for_intent(&intent_hash);
drop(mempool);
let mempool_payload_hashes = state
.state_manager
.mempool_manager
.get_mempool_payload_hashes_for_intent(&intent_hash);

if !mempool_payloads_hashes.is_empty() {
let mempool_payloads = mempool_payloads_hashes
if !mempool_payload_hashes.is_empty() {
let mempool_payloads = mempool_payload_hashes
.iter()
.map(|payload_hash| {
Ok(models::LtsTransactionPayloadDetails {
Expand All @@ -128,7 +128,7 @@ pub(crate) async fn handle_lts_transaction_status(
})
.collect::<Result<Vec<_>, MappingError>>()?;

let mempool_payloads_hashes: HashSet<_> = mempool_payloads_hashes.into_iter().collect();
let mempool_payloads_hashes: HashSet<_> = mempool_payload_hashes.into_iter().collect();

let known_payloads_not_in_mempool = known_pending_payloads
.into_iter()
Expand Down Expand Up @@ -201,7 +201,7 @@ fn map_rejected_payloads_due_to_known_commit(
.into_iter()
.map(|(notarized_transaction_hash, transaction_record)| {
let error_string_to_use = transaction_record
.most_applicable_status()
.most_applicable_rejection()
.map(|reason| reason.to_string())
// Note: in theory, we should not see the "no rejection" for any transaction here,
// since we only enter this method after seeing their intent hash committed by a
Expand All @@ -211,7 +211,6 @@ fn map_rejected_payloads_due_to_known_commit(
.unwrap_or_else(|| {
MempoolRejectionReason::TransactionIntentAlreadyCommitted(
AlreadyCommittedError {
notarized_transaction_hash,
committed_state_version,
committed_notarized_transaction_hash:
*committed_notarized_transaction_hash,
Expand All @@ -237,12 +236,13 @@ fn map_pending_payloads_not_in_mempool(
known_payloads_not_in_mempool
.into_iter()
.map(|(payload_hash, transaction_record)| {
Ok(match transaction_record.most_applicable_status() {
let applicable_attempt = transaction_record.most_applicable_status();
Ok(match applicable_attempt.rejection.as_ref() {
Some(reason) => models::LtsTransactionPayloadDetails {
payload_hash: to_api_notarized_transaction_hash(&payload_hash),
payload_hash_bech32m: to_api_hash_bech32m(context, &payload_hash)?,
state_version: None,
status: if reason.is_permanent_for_payload() {
status: if applicable_attempt.marks_permanent_rejection_for_payload() {
models::LtsTransactionPayloadStatus::PermanentlyRejected
} else {
models::LtsTransactionPayloadStatus::TransientlyRejected
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,14 @@ pub(crate) async fn handle_lts_transaction_submit(
},
)),
Err(MempoolAddError::Duplicate(_)) => Ok(models::LtsTransactionSubmitResponse::new(true)),
Err(MempoolAddError::Rejected(rejection)) => {
Err(MempoolAddError::Rejected(rejection, notarized_transaction_hash)) => {
if let Some(already_committed_error) = rejection.transaction_intent_already_committed_error() {
let is_same_transaction = Some(already_committed_error.committed_notarized_transaction_hash) == notarized_transaction_hash;
Err(detailed_error(
StatusCode::BAD_REQUEST,
"The transaction intent has already been committed",
LtsTransactionSubmitErrorDetails::LtsTransactionSubmitIntentAlreadyCommitted {
committed_as: Box::new(to_api_committed_intent_metadata(&mapping_context, already_committed_error)?)
committed_as: Box::new(to_api_committed_intent_metadata(&mapping_context, already_committed_error, is_same_transaction)?)
}
))
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ pub(crate) async fn handle_mempool_list(
assert_unbounded_endpoints_flag_enabled(&state)?;
let mapping_context = MappingContext::new(&state.network);

let mempool = state.state_manager.mempool.read();
Ok(Json(models::MempoolListResponse {
contents: mempool
.all_hashes_iter()
contents: state
.state_manager
.mempool_manager
.get_mempool_all_hashes()
.iter()
.map(|(intent_hash, payload_hash)| {
Ok(models::MempoolTransactionHashes {
intent_hash: to_api_transaction_intent_hash(intent_hash),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@ pub(crate) async fn handle_mempool_transaction(

let mut current_total_size = response.get_json_size();

let mempool = state.state_manager.mempool.read();
for payload_hash_str in request.payload_hashes.into_iter() {
let payload_hash =
extract_notarized_transaction_hash(&extraction_context, payload_hash_str)
.map_err(|err| err.into_response_error("payload_hashes"))?;

let (hex, error) = match mempool.get_payload(&payload_hash) {
let (hex, error) = match state
.state_manager
.mempool_manager
.get_mempool_payload(&payload_hash)
{
Some(mempool_transaction) => {
(Some(hex::encode(mempool_transaction.raw.as_slice())), None)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ fn to_api_parsed_notarized_transaction(
.map(|error| {
Box::new(models::ParsedNotarizedTransactionAllOfValidationError {
reason: format!("{error:?}"),
is_permanent: error.is_permanent_for_payload(),
is_permanent: error.is_permanent_for_payload(&AtState::Static),
})
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@ pub(crate) async fn handle_transaction_status(
let intent_hash = extract_transaction_intent_hash(&extraction_context, request.intent_hash)
.map_err(|err| err.into_response_error("intent_hash"))?;

let pending_transaction_result_cache =
state.state_manager.pending_transaction_result_cache.read();
let mut known_pending_payloads =
pending_transaction_result_cache.peek_all_known_payloads_for_intent(&intent_hash);
drop(pending_transaction_result_cache);
let mut known_pending_payloads = state
.state_manager
.mempool_manager
.all_known_pending_payloads_for_intent(&intent_hash);

let database = state.state_manager.database.snapshot();

Expand Down Expand Up @@ -109,12 +108,13 @@ pub(crate) async fn handle_transaction_status(
}));
}

let mempool = state.state_manager.mempool.read();
let mempool_payloads_hashes = mempool.get_notarized_transaction_hashes_for_intent(&intent_hash);
drop(mempool);
let mempool_payload_hashes = state
.state_manager
.mempool_manager
.get_mempool_payload_hashes_for_intent(&intent_hash);

if !mempool_payloads_hashes.is_empty() {
let mempool_payloads = mempool_payloads_hashes
if !mempool_payload_hashes.is_empty() {
let mempool_payloads = mempool_payload_hashes
.iter()
.map(|payload_hash| {
Ok(models::TransactionPayloadDetails {
Expand All @@ -127,7 +127,7 @@ pub(crate) async fn handle_transaction_status(
})
.collect::<Result<Vec<_>, MappingError>>()?;

let mempool_payloads_hashes: HashSet<_> = mempool_payloads_hashes.into_iter().collect();
let mempool_payloads_hashes: HashSet<_> = mempool_payload_hashes.into_iter().collect();

let known_payloads_not_in_mempool = known_pending_payloads
.into_iter()
Expand Down Expand Up @@ -203,7 +203,7 @@ fn map_rejected_payloads_due_to_known_commit(
.into_iter()
.map(|(notarized_transaction_hash, transaction_record)| {
let error_string_to_use = transaction_record
.most_applicable_status()
.most_applicable_rejection()
.map(|reason| reason.to_string())
// Note: in theory, we should not see the "no rejection" for any transaction here,
// since we only enter this method after seeing their intent hash committed by a
Expand All @@ -213,7 +213,6 @@ fn map_rejected_payloads_due_to_known_commit(
.unwrap_or_else(|| {
MempoolRejectionReason::TransactionIntentAlreadyCommitted(
AlreadyCommittedError {
notarized_transaction_hash,
committed_state_version,
committed_notarized_transaction_hash:
*committed_notarized_transaction_hash,
Expand All @@ -239,12 +238,13 @@ fn map_pending_payloads_not_in_mempool(
known_payloads_not_in_mempool
.into_iter()
.map(|(payload_hash, transaction_record)| {
Ok(match transaction_record.most_applicable_status() {
let attempt = transaction_record.most_applicable_status();
Ok(match attempt.rejection.as_ref() {
Some(reason) => models::TransactionPayloadDetails {
payload_hash: to_api_notarized_transaction_hash(&payload_hash),
payload_hash_bech32m: to_api_hash_bech32m(context, &payload_hash)?,
state_version: None,
status: if reason.is_permanent_for_payload() {
status: if attempt.marks_permanent_rejection_for_payload() {
models::TransactionPayloadStatus::PermanentlyRejected
} else {
models::TransactionPayloadStatus::TransientlyRejected
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,18 @@ pub(crate) async fn handle_transaction_submit(
},
)),
Err(MempoolAddError::Duplicate(_)) => Ok(models::TransactionSubmitResponse::new(true)),
Err(MempoolAddError::Rejected(rejection)) => {
Err(MempoolAddError::Rejected(rejection, notarized_transaction_hash)) => {
if let Some(already_committed_error) = rejection.transaction_intent_already_committed_error() {
let is_same_transaction = Some(already_committed_error.committed_notarized_transaction_hash) == notarized_transaction_hash;
Err(detailed_error(
StatusCode::BAD_REQUEST,
"The transaction intent has already been committed",
TransactionSubmitErrorDetails::TransactionSubmitIntentAlreadyCommitted {
committed_as: Box::new(to_api_committed_intent_metadata(&mapping_context, already_committed_error)?)
committed_as: Box::new(to_api_committed_intent_metadata(
&mapping_context,
already_committed_error,
is_same_transaction,
)?)
}
))
} else {
Expand Down Expand Up @@ -90,6 +95,7 @@ pub(crate) async fn handle_transaction_submit(
pub fn to_api_committed_intent_metadata(
context: &MappingContext,
error: &AlreadyCommittedError,
is_same_transaction: bool,
) -> Result<models::CommittedIntentMetadata, MappingError> {
Ok(models::CommittedIntentMetadata {
state_version: to_api_state_version(error.committed_state_version)?,
Expand All @@ -100,7 +106,6 @@ pub fn to_api_committed_intent_metadata(
context,
&error.committed_notarized_transaction_hash,
)?,
is_same_transaction: error.committed_notarized_transaction_hash
== error.notarized_transaction_hash,
is_same_transaction,
})
}
4 changes: 2 additions & 2 deletions core-rust/p2p/src/address_book_components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ define_single_versioned! {
outer_attributes: [
#[derive(ScryptoSborAssertion)]
#[sbor_assert(backwards_compatible(
cuttlefish = "FILE:CF_SCHEMA_versioned_address_book_entry.bin"
cuttlefish = "FILE:CF_SCHEMA_versioned_address_book_entry_cuttlefish.bin"
))]
]
}
Expand All @@ -123,7 +123,7 @@ define_single_versioned! {
outer_attributes: [
#[derive(ScryptoSborAssertion)]
#[sbor_assert(backwards_compatible(
cuttlefish = "FILE:CF_SCHEMA_versioned_high_priority_peers.bin"
cuttlefish = "FILE:CF_SCHEMA_versioned_high_priority_peers_cuttlefish.bin"
))]
]
}
2 changes: 1 addition & 1 deletion core-rust/p2p/src/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ use radix_common::prelude::*;
/// Status of the migration
#[repr(u8)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Sbor, ScryptoSborAssertion)]
#[sbor_assert(backwards_compatible(cuttlefish = "FILE:CF_SCHEMA_migration_status.bin"))]
#[sbor_assert(backwards_compatible(cuttlefish = "FILE:CF_SCHEMA_migration_status_cuttlefish.bin"))]
pub enum MigrationStatus {
Completed,
}
2 changes: 1 addition & 1 deletion core-rust/p2p/src/safety_store_components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ define_single_versioned! {
outer_attributes: [
#[derive(ScryptoSborAssertion)]
#[sbor_assert(backwards_compatible(
cuttlefish = "FILE:CF_SCHEMA_versioned_safety_state.bin"
cuttlefish = "FILE:CF_SCHEMA_versioned_safety_state_cuttlefish.bin"
))]
]
}
Expand Down
Binary file not shown.
29 changes: 10 additions & 19 deletions core-rust/state-manager/src/committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ pub struct Committer {
transaction_executor_factory: Arc<TransactionExecutorFactory>,
mempool_manager: Arc<MempoolManager>,
execution_cache_manager: Arc<ExecutionCacheManager>,
pending_transaction_result_cache: Arc<RwLock<PendingTransactionResultCache>>,
protocol_manager: Arc<ProtocolManager>,
ledger_metrics: Arc<LedgerMetrics>,
}
Expand All @@ -85,7 +84,6 @@ impl Committer {
ledger_transaction_validator: Arc<RwLock<TransactionValidator>>,
mempool_manager: Arc<MempoolManager>,
execution_cache_manager: Arc<ExecutionCacheManager>,
pending_transaction_result_cache: Arc<RwLock<PendingTransactionResultCache>>,
protocol_manager: Arc<ProtocolManager>,
ledger_metrics: Arc<LedgerMetrics>,
) -> Self {
Expand All @@ -95,7 +93,6 @@ impl Committer {
transaction_executor_factory,
mempool_manager,
execution_cache_manager,
pending_transaction_result_cache,
protocol_manager,
ledger_metrics,
}
Expand Down Expand Up @@ -229,11 +226,13 @@ impl Committer {
.expect("cannot execute transaction to be committed");

if let Some(user_hashes) = hashes.as_user() {
committed_user_transactions.push(CommittedUserTransactionIdentifiers {
let identifiers = CommittedUserTransactionIdentifiers {
state_version: series_executor.latest_state_version(),
transaction_intent_hash: user_hashes.transaction_intent_hash,
notarized_transaction_hash: user_hashes.notarized_transaction_hash,
});
};
let nullifications = commit.local_receipt.local_execution.nullifications.clone();
committed_user_transactions.push((identifiers, nullifications));
}
transactions_metrics_data.push(TransactionMetricsData::new(&raw, &commit));

Expand Down Expand Up @@ -263,21 +262,13 @@ impl Committer {
.access_exclusively()
.progress_base(&end_state.ledger_hashes.transaction_root);

self.mempool_manager.remove_committed(
committed_user_transactions
.iter()
.map(|txn| &txn.transaction_intent_hash),
);

if let Some(epoch_change) = end_state.epoch_change {
self.mempool_manager
.remove_txns_where_end_epoch_expired(epoch_change.epoch);
}

let num_user_transactions = committed_user_transactions.len() as u32;
self.pending_transaction_result_cache
.write()
.track_committed_transactions(SystemTime::now(), committed_user_transactions);

self.mempool_manager.handle_committed_transactions(
SystemTime::now(),
end_state.epoch_change,
committed_user_transactions,
);

self.ledger_metrics.update(
commit_state_version,
Expand Down
8 changes: 4 additions & 4 deletions core-rust/state-manager/src/jni/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@ extern "system" fn Java_com_radixdlt_mempool_RustMempool_getCount(
request_payload: jbyteArray,
) -> jbyteArray {
jni_sbor_coded_call(&env, request_payload, |_no_args: ()| -> i32 {
let mempool = JNINodeRustEnvironment::get_mempool(&env, j_node_rust_env);
let read_mempool = mempool.read();
read_mempool.get_count().try_into().unwrap()
let mempool_count =
JNINodeRustEnvironment::get_mempool_manager(&env, j_node_rust_env).get_mempool_count();
mempool_count.try_into().unwrap()
})
}

Expand Down Expand Up @@ -211,7 +211,7 @@ impl From<MempoolAddError> for MempoolAddErrorJava {
tip_basis_points,
},
MempoolAddError::Duplicate(hash) => MempoolAddErrorJava::Duplicate(hash),
MempoolAddError::Rejected(rejection) => {
MempoolAddError::Rejected(rejection, _) => {
MempoolAddErrorJava::Rejected(rejection.reason.to_string())
}
}
Expand Down
7 changes: 0 additions & 7 deletions core-rust/state-manager/src/jni/node_rust_environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,13 +238,6 @@ impl JNINodeRustEnvironment {
Self::get(env, j_node_rust_env).safety_store_store.clone()
}

pub fn get_mempool(env: &JNIEnv, j_node_rust_env: JObject) -> Arc<RwLock<PriorityMempool>> {
Self::get(env, j_node_rust_env)
.state_manager
.mempool
.clone()
}

pub fn get_mempool_manager(env: &JNIEnv, j_node_rust_env: JObject) -> Arc<MempoolManager> {
Self::get(env, j_node_rust_env)
.state_manager
Expand Down
Loading

0 comments on commit afed42a

Please sign in to comment.