Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor!: mapping for deleted mmr position to height/hash for perf #3394

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions applications/tari_app_utilities/src/utilities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ pub enum ExitCodes {
NoPassword,
#[error("Tor connection is offline")]
TorOffline,
#[error("Database is in inconsistent state: {0}")]
DbInconsistentState(String),
}

impl ExitCodes {
Expand All @@ -94,6 +96,29 @@ impl ExitCodes {
Self::ConversionError(_) => 111,
Self::IncorrectPassword | Self::NoPassword => 112,
Self::TorOffline => 113,
Self::DbInconsistentState(_) => 115,
}
}

pub fn eprint_details(&self) {
use ExitCodes::*;
match self {
TorOffline => {
eprintln!("Unable to connect to the Tor control port.");
eprintln!(
"Please check that you have the Tor proxy running and that access to the Tor control port is \
turned on.",
);
eprintln!("If you are unsure of what to do, use the following command to start the Tor proxy:");
eprintln!(
"tor --allow-missing-torrc --ignore-missing-torrc --clientonly 1 --socksport 9050 --controlport \
127.0.0.1:9051 --log \"notice stdout\" --clientuseipv6 1",
);
},

e => {
eprintln!("{}", e);
},
}
}
}
Expand Down
23 changes: 9 additions & 14 deletions applications/tari_base_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ use tari_app_utilities::{
};
use tari_common::{configuration::bootstrap::ApplicationType, ConfigBootstrap, GlobalConfig};
use tari_comms::{peer_manager::PeerFeatures, tor::HiddenServiceControllerError};
use tari_core::chain_storage::ChainStorageError;
use tari_shutdown::{Shutdown, ShutdownSignal};
use tokio::{
runtime,
Expand All @@ -128,7 +129,7 @@ const LOG_TARGET: &str = "base_node::app";
/// Application entry point
fn main() {
if let Err(exit_code) = main_inner() {
eprintln!("{:?}", exit_code);
exit_code.eprint_details();
error!(
target: LOG_TARGET,
"Exiting with code ({}): {:?}",
Expand Down Expand Up @@ -205,21 +206,15 @@ async fn run_node(node_config: Arc<GlobalConfig>, bootstrap: ConfigBootstrap) ->
.await
.map_err(|err| {
for boxed_error in err.chain() {
if let Some(HiddenServiceControllerError::TorControlPortOffline) =
boxed_error.downcast_ref::<HiddenServiceControllerError>()
{
println!("Unable to connect to the Tor control port.");
println!(
"Please check that you have the Tor proxy running and that access to the Tor control port is \
turned on.",
);
println!("If you are unsure of what to do, use the following command to start the Tor proxy:");
println!(
"tor --allow-missing-torrc --ignore-missing-torrc --clientonly 1 --socksport 9050 --controlport \
127.0.0.1:9051 --log \"notice stdout\" --clientuseipv6 1",
);
if let Some(HiddenServiceControllerError::TorControlPortOffline) = boxed_error.downcast_ref() {
return ExitCodes::TorOffline;
}
if let Some(ChainStorageError::DatabaseResyncRequired(reason)) = boxed_error.downcast_ref() {
return ExitCodes::DbInconsistentState(format!(
"You may need to resync your database because {}",
reason
));
}

// todo: find a better way to do this
if boxed_error.to_string().contains("Invalid force sync peer") {
Expand Down
46 changes: 24 additions & 22 deletions base_layer/core/src/base_node/rpc/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
use crate::{
base_node::{rpc::BaseNodeWalletService, state_machine_service::states::StateInfo, StateMachineHandle},
chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, PrunedOutput, UtxoMinedInfo},
crypto::tari_utilities::Hashable,
mempool::{service::MempoolHandle, TxStorageResponse},
proto,
proto::{
Expand All @@ -47,7 +46,7 @@ use crate::{
},
transactions::transaction::Transaction,
};
use std::convert::{TryFrom, TryInto};
use std::convert::TryFrom;
use tari_common_types::types::Signature;
use tari_comms::protocol::rpc::{Request, Response, RpcStatus};

Expand Down Expand Up @@ -307,13 +306,13 @@ impl<B: BlockchainBackend + 'static> BaseNodeWalletService for BaseNodeWalletRpc

let db = self.db();
let mut res = Vec::with_capacity(message.output_hashes.len());
for (pruned_output, spent) in (db
let utxos = db
.fetch_utxos(message.output_hashes)
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?)
.into_iter()
.flatten()
{
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?
.into_iter()
.flatten();
for (pruned_output, spent) in utxos {
if let PrunedOutput::NotPruned { output } = pruned_output {
if !spent {
res.push(output);
Expand Down Expand Up @@ -396,15 +395,9 @@ impl<B: BlockchainBackend + 'static> BaseNodeWalletService for BaseNodeWalletRpc
}
}

let metadata = self
.db
.get_chain_metadata()
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?;

let deleted_bitmap = self
.db
.fetch_complete_deleted_bitmap_at(metadata.best_block().clone())
.fetch_deleted_bitmap_at_tip()
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?;

Expand All @@ -416,8 +409,8 @@ impl<B: BlockchainBackend + 'static> BaseNodeWalletService for BaseNodeWalletRpc
// TODO: in future, bitmap may support higher than u32
return Err(RpcStatus::bad_request("position must fit into a u32"));
}
let pos = position.try_into().unwrap();
if deleted_bitmap.bitmap().contains(pos) {
let position = position as u32;
if deleted_bitmap.bitmap().contains(position) {
deleted_positions.push(position);
} else {
not_deleted_positions.push(position);
Expand All @@ -429,20 +422,29 @@ impl<B: BlockchainBackend + 'static> BaseNodeWalletService for BaseNodeWalletRpc
if message.include_deleted_block_data {
let headers = self
.db
.fetch_headers_of_deleted_positions(deleted_positions.clone())
.fetch_header_hash_by_deleted_mmr_positions(deleted_positions.clone())
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?;
for header in headers.iter() {
heights_deleted_at.push(header.height);
blocks_deleted_in.push(header.hash());

heights_deleted_at.reserve(headers.len());
blocks_deleted_in.reserve(headers.len());
for (height, hash) in headers.into_iter().flatten() {
heights_deleted_at.push(height);
blocks_deleted_in.push(hash);
}
}

let metadata = self
.db
.get_chain_metadata()
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?;

Ok(Response::new(QueryDeletedResponse {
height_of_longest_chain: metadata.height_of_longest_chain(),
best_block: metadata.best_block().clone(),
deleted_positions,
not_deleted_positions,
deleted_positions: deleted_positions.into_iter().map(|v| v as u64).collect(),
not_deleted_positions: not_deleted_positions.into_iter().map(|v| v as u64).collect(),
blocks_deleted_in,
heights_deleted_at,
}))
Expand Down
5 changes: 4 additions & 1 deletion base_layer/core/src/chain_storage/async_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use crate::{
DbBasicStats,
DbTotalSizeStats,
DbTransaction,
DeletedBitmap,
HistoricalBlock,
HorizonData,
MmrTree,
Expand Down Expand Up @@ -235,7 +236,9 @@ impl<B: BlockchainBackend + 'static> AsyncBlockchainDb<B> {

make_async_fn!(fetch_complete_deleted_bitmap_at(hash: HashOutput) -> CompleteDeletedBitmap, "fetch_deleted_bitmap");

make_async_fn!(fetch_headers_of_deleted_positions(mmr_position: Vec<u64>) -> Vec<BlockHeader>, "fetch_headers_of_deleted_positions");
make_async_fn!(fetch_deleted_bitmap_at_tip() -> DeletedBitmap, "fetch_deleted_bitmap_at_tip");

make_async_fn!(fetch_header_hash_by_deleted_mmr_positions(mmr_positions: Vec<u32>) -> Vec<Option<(u64, HashOutput)>>, "fetch_headers_of_deleted_positions");

make_async_fn!(get_stats() -> DbBasicStats, "get_stats");

Expand Down
6 changes: 6 additions & 0 deletions base_layer/core/src/chain_storage/blockchain_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,4 +168,10 @@ pub trait BlockchainBackend: Send + Sync {
/// Returns total size information about each internal database. This call may be very slow and will obtain a read
/// lock for the duration.
fn fetch_total_size_stats(&self) -> Result<DbTotalSizeStats, ChainStorageError>;

/// Returns a (block height/hash) tuple for each mmr position of the height it was spent, or None if it is not spent
fn fetch_header_hash_by_deleted_mmr_positions(
&self,
mmr_positions: Vec<u32>,
) -> Result<Vec<Option<(u64, HashOutput)>>, ChainStorageError>;
}
56 changes: 10 additions & 46 deletions base_layer/core/src/chain_storage/blockchain_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use crate::{
ChainHeader,
DbBasicStats,
DbTotalSizeStats,
DeletedBitmap,
HistoricalBlock,
HorizonData,
MmrTree,
Expand Down Expand Up @@ -945,58 +946,21 @@ where B: BlockchainBackend
))
}

// TODO Update this method to make use of a (mmr_position, block_height) index in the lmdb backend instead of a
// linear search
pub fn fetch_headers_of_deleted_positions(
&self,
mut mmr_positions: Vec<u64>,
) -> Result<Vec<BlockHeader>, ChainStorageError> {
pub fn fetch_deleted_bitmap_at_tip(&self) -> Result<DeletedBitmap, ChainStorageError> {
let db = self.db_read_access()?;
db.fetch_deleted_bitmap()
}

pub fn fetch_header_hash_by_deleted_mmr_positions(
&self,
mmr_positions: Vec<u32>,
) -> Result<Vec<Option<(u64, HashOutput)>>, ChainStorageError> {
if mmr_positions.is_empty() {
return Ok(Vec::new());
}

let chain_metadata = db.fetch_chain_metadata()?;
let mut height = chain_metadata.height_of_longest_chain();

mmr_positions.sort_unstable();

let mut headers = Vec::with_capacity(mmr_positions.len());

let mut target = mmr_positions.pop().expect("mmr_positions cannot be empty here");

loop {
if target > u32::MAX as u64 {
// TODO: in future, bitmap may support higher than u32
return Err(ChainStorageError::InvalidArguments {
func: "fetch_header_of_deleted_position",
arg: "mmr_positions",
message: "mmr_positions should fit into u32".into(),
});
}
if db
.fetch_block_accumulated_data_by_height(height)
.or_not_found("BlockAccumulatedData", "height", height.to_string())?
.deleted()
.contains(target as u32)
{
headers.push(fetch_header(&(*db), height)?);
if let Some(pos) = mmr_positions.pop() {
target = pos;
} else {
break;
}
}

if height > 0 {
height -= 1;
} else {
break;
}
}

Ok(headers)
let db = self.db_read_access()?;
db.fetch_header_hash_by_deleted_mmr_positions(mmr_positions)
}

pub fn get_stats(&self) -> Result<DbBasicStats, ChainStorageError> {
Expand Down
2 changes: 2 additions & 0 deletions base_layer/core/src/chain_storage/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ pub enum ChainStorageError {
DbResizeRequired,
#[error("DB transaction was too large ({0} operations)")]
DbTransactionTooLarge(usize),
#[error("DB needs to be resynced: {0}")]
DatabaseResyncRequired(&'static str),
}

impl ChainStorageError {
Expand Down
Loading