Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use RocksDB's snapshots instead of RwLock on database #832

Merged
merged 12 commits into from
Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 0 additions & 13 deletions core-rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 7 additions & 8 deletions core-rust/core-api-server/src/core_api/conversions/lts.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
use models::*;
use radix_engine::{
system::system_modules::costing::RoyaltyRecipient,
transaction::BalanceChange,
types::{Decimal, GlobalAddress, IndexMap, ResourceAddress},
};

use state_manager::store::{traits::SubstateNodeAncestryStore, StateManagerDatabase};
use state_manager::store::traits::SubstateNodeAncestryStore;
use state_manager::{
CommittedTransactionIdentifiers, LedgerTransactionOutcome, LocalTransactionReceipt,
StateVersion, TransactionTreeHash,
ReadableRocks, StateManagerDatabase, StateVersion, TransactionTreeHash,
};

use radix_engine::transaction::{FeeDestination, FeeSource, TransactionFeeSummary};
Expand All @@ -18,7 +17,7 @@ use crate::core_api::*;

#[tracing::instrument(skip_all)]
pub fn to_api_lts_committed_transaction_outcome(
database: &StateManagerDatabase,
database: &StateManagerDatabase<impl ReadableRocks>,
context: &MappingContext,
state_version: StateVersion,
receipt: LocalTransactionReceipt,
Expand Down Expand Up @@ -89,14 +88,14 @@ pub fn to_api_lts_committed_transaction_outcome(
pub fn to_api_lts_entity_non_fungible_balance_changes(
context: &MappingContext,
global_balance_summary: &IndexMap<GlobalAddress, IndexMap<ResourceAddress, BalanceChange>>,
) -> Result<Vec<LtsEntityNonFungibleBalanceChanges>, MappingError> {
) -> Result<Vec<models::LtsEntityNonFungibleBalanceChanges>, MappingError> {
let mut changes = Vec::new();
for (address, balance_changes) in global_balance_summary.iter() {
for (resource, balance_change) in balance_changes.iter() {
match balance_change {
BalanceChange::Fungible(_) => {}
BalanceChange::NonFungible { added, removed } => {
changes.push(LtsEntityNonFungibleBalanceChanges {
changes.push(models::LtsEntityNonFungibleBalanceChanges {
entity_address: to_api_global_address(context, address)?,
resource_address: to_api_resource_address(context, resource)?,
added: added
Expand All @@ -116,7 +115,7 @@ pub fn to_api_lts_entity_non_fungible_balance_changes(
}

pub fn to_api_lts_fungible_balance_changes(
database: &StateManagerDatabase,
database: &StateManagerDatabase<impl ReadableRocks>,
context: &MappingContext,
fee_summary: &TransactionFeeSummary,
fee_source: &FeeSource,
Expand All @@ -136,7 +135,7 @@ pub fn to_api_lts_fungible_balance_changes(
/// Uses the [`SubstateNodeAncestryStore`] (from the given DB) to transform the input
/// `vault ID -> payment` map into a `global address -> balance change` map.
fn resolve_global_fee_balance_changes(
database: &StateManagerDatabase,
database: &StateManagerDatabase<impl ReadableRocks>,
fee_source: &FeeSource,
) -> Result<IndexMap<GlobalAddress, Decimal>, MappingError> {
let paying_vaults = &fee_source.paying_vaults;
Expand Down
13 changes: 6 additions & 7 deletions core-rust/core-api-server/src/core_api/conversions/receipt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,16 @@ use radix_engine::transaction::{
};
use radix_engine_queries::typed_substate_layout::*;
use radix_engine_store_interface::db_key_mapper::{MappedSubstateDatabase, SpreadPrefixKeyMapper};
use state_manager::store::StateManagerDatabase;
use transaction::prelude::TransactionCostingParameters;

use state_manager::{
ApplicationEvent, BySubstate, DetailedTransactionOutcome, LedgerStateChanges,
LocalTransactionReceipt, PartitionChangeAction, PartitionReference, SubstateChangeAction,
SubstateReference,
LocalTransactionReceipt, PartitionChangeAction, PartitionReference, ReadableRocks,
StateManagerDatabase, SubstateChangeAction, SubstateReference,
};

pub fn to_api_receipt(
database: Option<&StateManagerDatabase>,
database: Option<&StateManagerDatabase<impl ReadableRocks>>,
context: &MappingContext,
receipt: LocalTransactionReceipt,
) -> Result<models::TransactionReceipt, MappingError> {
Expand Down Expand Up @@ -311,7 +310,7 @@ pub fn to_api_next_epoch(

#[tracing::instrument(skip_all)]
pub fn to_api_state_updates(
database: Option<&StateManagerDatabase>,
database: Option<&StateManagerDatabase<impl ReadableRocks>>,
context: &MappingContext,
system_structures: &BySubstate<SubstateSystemStructure>,
state_changes: &LedgerStateChanges,
Expand Down Expand Up @@ -468,7 +467,7 @@ pub struct StateMappingLookups {

impl StateMappingLookups {
pub fn create_from_database(
database: Option<&StateManagerDatabase>,
database: Option<&StateManagerDatabase<impl ReadableRocks>>,
changes_to_map: &[(SubstateReference, TypedSubstateKey, &SubstateChangeAction)],
) -> Result<Self, MappingError> {
let Some(database) = database else {
Expand Down Expand Up @@ -516,7 +515,7 @@ impl StateMappingLookups {
}

fn create_blueprint_type_lookups(
database: &StateManagerDatabase,
database: &StateManagerDatabase<impl ReadableRocks>,
typed_values: &[TypedSubstateValue],
) -> Result<IndexMap<BlueprintId, IndexMap<String, ScopedTypeId>>, MappingError> {
// Step 1 - work out what database reads we need to do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub(crate) async fn handle_lts_state_account_all_fungible_resource_balances(
));
}

let database = state.state_manager.database.read_current();
let database = state.state_manager.database.snapshot();
let header = read_current_ledger_header(database.deref());

let type_info: Option<TypeInfoSubstate> = read_optional_substate::<TypeInfoSubstate>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub(crate) async fn handle_lts_state_account_deposit_behaviour(
.map_err(|err| err.into_response_error("badge"))?;

// If the above checks were al fine, open database (and capture the "at state" information):
let database = state.state_manager.database.read_current();
let database = state.state_manager.database.snapshot();
let header = read_current_ledger_header(database.deref());

// Read out the field that must exist for non-virtual addresses:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub(crate) async fn handle_lts_state_account_fungible_resource_balance(
));
}

let database = state.state_manager.database.read_current();
let database = state.state_manager.database.snapshot();

if !account_address.as_node_id().is_global_virtual() {
read_optional_substate::<TypeInfoSubstate>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub(crate) async fn handle_lts_stream_account_transaction_outcomes(
)));
}

let database = state.state_manager.database.read_current();
let database = state.state_manager.database.snapshot();

if !database.is_local_transaction_execution_index_enabled() {
return Err(client_error(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub(crate) async fn handle_lts_stream_transaction_outcomes(

let limit = limit.try_into().expect("limit out of usize bounds");

let database = state.state_manager.database.read_current();
let database = state.state_manager.database.snapshot();

if !database.is_local_transaction_execution_index_enabled() {
return Err(client_error(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub(crate) async fn handle_lts_transaction_construction(
assert_matching_network(&request.network, &state.network)?;
let mapping_context = MappingContext::new(&state.network);

let database = state.state_manager.database.read_current();
let database = state.state_manager.database.snapshot();

let consensus_manager_substate =
read_mandatory_main_field_substate::<ConsensusManagerStateFieldPayload>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ pub(crate) async fn handle_lts_transaction_status(
pending_transaction_result_cache.peek_all_known_payloads_for_intent(&intent_hash);
drop(pending_transaction_result_cache);

// TODO(locks): consider this (and other) usages of DB "read current" lock. *Carefully* migrate
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I take it this has been considered and deemed not worth it given how cheap snapshots are?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That TODO was mostly about avoiding the read lock, and now it is easiest to achieve by snapshots, so yes - most of these places were migrated to snapshots.
(although not all - e.g. for vertex store, after looking at how it is used, I went for direct access. The same for StateManager's boot-up. In this review, I would like to ask you for careful re-analysis of all "direct access" cases, whether they are really safe, because who knows what knowledge I am missing.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmh so we're now allowing (locking-wise) concurrent writes to VertexStoreCf (in save_vertex_store and commit).
While I don't think it can cause issues at the moment given the context in which those methods are used (e.g. we only include vertex store in commit if it originates from consensus, and that's the same thread that calls save_vertex_store), I'd still suggest to change that to lock just in case (e.g. if some assumptions change in the future). In normal operating conditions this lock will always be non-contentious.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, let me add the lock, since it indeed writes to the same "db region" as commit 👍

// all applicable ones to "historical, non-locked" DB access.
let database = state.state_manager.database.read_current();
let database = state.state_manager.database.snapshot();

if !database.is_local_transaction_execution_index_enabled() {
return Err(client_error(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub(crate) async fn handle_state_access_controller(
return Err(client_error("Only access controller addresses work for this endpoint. Try another endpoint instead."));
}

let database = state.state_manager.database.read_current();
let database = state.state_manager.database.snapshot();

let access_controller_state_substate = read_optional_main_field_substate(
database.deref(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub(crate) async fn handle_state_account(
return Err(client_error("Only account addresses starting account_ currently work with this endpoint. Try another endpoint instead."));
}

let database = state.state_manager.database.read_current();
let database = state.state_manager.database.snapshot();
let type_info = read_optional_substate(
database.deref(),
component_address.as_node_id(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub(crate) async fn handle_state_component(
return Err(client_error("Only component addresses starting component_ currently work with this endpoint. Try another endpoint instead."));
}

let database = state.state_manager.database.read_current();
let database = state.state_manager.database.snapshot();
let type_info_substate = read_optional_substate(
database.deref(),
component_address.as_node_id(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use radix_engine::blueprints::consensus_manager::*;
use radix_engine::types::*;

use state_manager::protocol::ProtocolVersionName;
use state_manager::StateManagerDatabase;
use state_manager::{ReadableRocks, StateManagerDatabase};
use std::ops::Deref;

#[tracing::instrument(skip(state))]
Expand All @@ -13,7 +13,7 @@ pub(crate) async fn handle_state_consensus_manager(
) -> Result<Json<models::StateConsensusManagerResponse>, ResponseError<()>> {
assert_matching_network(&request.network, &state.network)?;
let mapping_context = MappingContext::new(&state.network);
let database = state.state_manager.database.read_current();
let database = state.state_manager.database.snapshot();

let config_substate = read_mandatory_main_field_substate(
database.deref(),
Expand Down Expand Up @@ -85,7 +85,7 @@ pub(crate) async fn handle_state_consensus_manager(
}

fn collect_current_validators_by_signalled_protocol_version(
database: &StateManagerDatabase,
database: &StateManagerDatabase<impl ReadableRocks>,
substate: ConsensusManagerCurrentValidatorSetFieldSubstate,
) -> Result<ValidatorsBySignalledProtocolVersion, ResponseError<()>> {
let mut validators = ValidatorsBySignalledProtocolVersion::default();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub(crate) async fn handle_state_non_fungible(
return Err(client_error("Resource is not a non-fungible resource"));
}

let database = state.state_manager.database.read_current();
let database = state.state_manager.database.snapshot();

let id_type =
read_optional_main_field_substate::<NonFungibleResourceManagerIdTypeFieldPayload>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub(crate) async fn handle_state_package(
let package_address = extract_package_address(&extraction_context, &request.package_address)
.map_err(|err| err.into_response_error("package_address"))?;

let database = state.state_manager.database.read_current();
let database = state.state_manager.database.snapshot();

let owner_role_substate = read_optional_substate(
database.deref(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub(crate) async fn handle_state_resource(
let resource_address = extract_resource_address(&extraction_context, &request.resource_address)
.map_err(|err| err.into_response_error("resource_address"))?;

let database = state.state_manager.database.read_current();
let database = state.state_manager.database.snapshot();

let resource_node_id = resource_address.as_node_id();
let is_fungible =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub(crate) async fn handle_state_validator(
));
}

let database = state.state_manager.database.read_current();
let database = state.state_manager.database.snapshot();

let validator_state_substate = read_optional_main_field_substate(
database.deref(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub(crate) async fn handle_status_network_status(
assert_matching_network(&request.network, &state.network)?;
let mapping_context = MappingContext::new(&state.network);

let database = state.state_manager.database.read_current();
let database = state.state_manager.database.snapshot();
let (current_state_version, current_ledger_hashes) = database.get_top_ledger_hashes();
let current_protocol_version = state
.state_manager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub(crate) async fn handle_status_scenarios(
assert_matching_network(&request.network, &state.network)?;
let context = MappingContext::new(&state.network);

let database = state.state_manager.database.access_non_locked_historical();
let database = state.state_manager.database.access_direct();
let scenarios = database.list_all_scenarios();

Ok(Json(models::ScenariosResponse {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use crate::core_api::*;

use state_manager::store::{traits::*, StateManagerDatabase};
use state_manager::{LedgerProof, LedgerProofOrigin, StateVersion};
use state_manager::store::traits::*;
use state_manager::{
LedgerProof, LedgerProofOrigin, ReadableRocks, StateManagerDatabase, StateVersion,
};

use transaction::prelude::*;

Expand Down Expand Up @@ -31,7 +33,7 @@ pub(crate) async fn handle_stream_proofs(
extract_continuation_token::<StateVersion>(request.continuation_token)
.map_err(|err| err.into_response_error("continuation_token"))?;

let database = state.state_manager.database.read_current();
let database = state.state_manager.database.snapshot();

use models::StreamProofsFilter::*;
let mut proofs_iter = match *filter {
Expand Down Expand Up @@ -77,7 +79,7 @@ pub(crate) async fn handle_stream_proofs(
}

fn iterate_all_proofs<'a>(
database: &'a StateManagerDatabase,
database: &'a StateManagerDatabase<impl ReadableRocks>,
continue_from_state_version: Option<StateVersion>,
from_state_version: StateVersion,
) -> Result<
Expand All @@ -90,7 +92,7 @@ fn iterate_all_proofs<'a>(
}

fn iterate_end_of_epoch_proofs<'a>(
database: &'a StateManagerDatabase,
database: &'a StateManagerDatabase<impl ReadableRocks>,
continue_from_state_version: Option<StateVersion>,
from_epoch: Epoch,
) -> Result<
Expand All @@ -111,7 +113,7 @@ fn iterate_end_of_epoch_proofs<'a>(
}

fn iterate_protocol_update_initialization_proofs<'a>(
database: &'a StateManagerDatabase,
database: &'a StateManagerDatabase<impl ReadableRocks>,
continue_from_state_version: Option<StateVersion>,
from_state_version: StateVersion,
) -> Result<
Expand All @@ -124,7 +126,7 @@ fn iterate_protocol_update_initialization_proofs<'a>(
}

fn iterate_protocol_update_execution_proofs<'a>(
database: &'a StateManagerDatabase,
database: &'a StateManagerDatabase<impl ReadableRocks>,
continue_from_state_version: Option<StateVersion>,
from_state_version: StateVersion,
protocol_version: Option<String>,
Expand Down Expand Up @@ -152,7 +154,7 @@ fn iterate_protocol_update_execution_proofs<'a>(
}

fn extract_from_state_version(
database: &StateManagerDatabase,
database: &StateManagerDatabase<impl ReadableRocks>,
from_state_version: Option<i64>,
) -> Result<StateVersion, ResponseError<models::StreamProofsErrorDetails>> {
let Some(from_state_version) = from_state_version else {
Expand Down Expand Up @@ -180,7 +182,7 @@ fn extract_from_state_version(

fn extract_from_epoch(
mapping_context: &MappingContext,
database: &StateManagerDatabase,
database: &StateManagerDatabase<impl ReadableRocks>,
from_epoch: Option<i64>,
) -> Result<Epoch, ResponseError<models::StreamProofsErrorDetails>> {
let Some(from_epoch) = from_epoch else {
Expand Down
Loading
Loading