Skip to content

Commit

Permalink
fix: fix prune mode sync (#3138)
Browse files Browse the repository at this point in the history
<!--- Provide a general summary of your changes in the Title above -->

## Description
<!--- Describe your changes in detail -->
This fixes prune mode sync to work again after the bitmap change. The bitmap saved in the accumulated block data is not the completed bitmap anymore but rather the diff for that block. This means when a bitmap is required to know what the spent state of the utxo set is, we need to build a new bitmap rather than ask the accumulated block data like we used to do. 

This PR lets the blockchain_database send a completed bitmap from the tip out, that can be used to reconstruct a bitmap at the desired height. 


## Motivation and Context
<!--- Why is this change required? What problem does it solve? -->
<!--- If it fixes an open issue, please link to the issue here. -->

## How Has This Been Tested?
<!--- Please describe in detail how you tested your changes. -->
<!--- Include details of your testing environment, and the tests you ran to -->
<!--- see how your change affects other areas of the code, etc. -->

## Checklist:
<!--- Go over all the following points, and put an `x` in all the boxes that apply. -->
* [x] I'm merging against the `development` branch.
* [x] I have squashed my commits into a single commit.
  • Loading branch information
aviator-app[bot] authored Jul 27, 2021
2 parents 4f1509e + 8b03cd4 commit d0d1d61
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ use crate::{
use croaring::Bitmap;
use futures::StreamExt;
use log::*;
use std::convert::{TryFrom, TryInto};
use std::{
convert::{TryFrom, TryInto},
sync::Arc,
};
use tari_comms::PeerConnection;
use tari_crypto::{
commitment::HomomorphicCommitment,
Expand Down Expand Up @@ -206,9 +209,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
let kernel_pruned_set = block_data.dissolve().0;
let mut kernel_mmr = MerkleMountainRange::<HashDigest, _>::new(kernel_pruned_set);

// let mut kernel_sum = HomomorphicCommitment::default();
for kernel in kernels.drain(..) {
// kernel_sum = &kernel.excess + &kernel_sum;
kernel_mmr.push(kernel.hash())?;
}

Expand Down Expand Up @@ -531,6 +532,12 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {

let mut prev_mmr = 0;
let mut prev_kernel_mmr = 0;
let bitmap = Arc::new(
self.db()
.fetch_complete_deleted_bitmap_at(header.hash().clone())
.await?
.into_bitmap(),
);
for h in 0..=header.height() {
let curr_header = self.db().fetch_chain_header(h).await?;

Expand All @@ -544,11 +551,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
);
let (utxos, _) = self
.db()
.fetch_utxos_by_mmr_position(
prev_mmr,
curr_header.header().output_mmr_size - 1,
header.hash().clone(),
)
.fetch_utxos_by_mmr_position(prev_mmr, curr_header.header().output_mmr_size - 1, bitmap.clone())
.await?;
trace!(
target: LOG_TARGET,
Expand Down
13 changes: 10 additions & 3 deletions base_layer/core/src/base_node/sync/rpc/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use crate::{
};
use futures::{channel::mpsc, stream, SinkExt};
use log::*;
use std::{cmp, time::Instant};
use std::{cmp, sync::Arc, time::Instant};
use tari_comms::protocol::rpc::{Request, Response, RpcStatus, Streaming};
use tari_crypto::tari_utilities::hex::Hex;
use tokio::task;
Expand Down Expand Up @@ -448,7 +448,6 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
self.request.end_header_hash.to_hex()
))
})?;
let end_header_hash = end_header.hash();

if self.request.start > end_header.output_mmr_size - 1 {
return Err(RpcStatus::bad_request(format!(
Expand All @@ -467,7 +466,15 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
if prev_header.height > end_header.height {
return Err(RpcStatus::bad_request("start index is greater than end index"));
}
// we need to construct a temp bitmap for the height the client requested
let bitmap = self
.db
.fetch_complete_deleted_bitmap_at(end_header.hash())
.await
.map_err(|_| RpcStatus::not_found("Could not get tip deleted bitmap"))?
.into_bitmap();

let bitmap = Arc::new(bitmap);
loop {
let timer = Instant::now();
if prev_header.height == end_header.height {
Expand Down Expand Up @@ -513,7 +520,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
);
let (utxos, deleted_diff) = self
.db
.fetch_utxos_by_mmr_position(start, end, end_header_hash.clone())
.fetch_utxos_by_mmr_position(start, end, bitmap.clone())
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?;
trace!(
Expand Down
26 changes: 26 additions & 0 deletions base_layer/core/src/chain_storage/accumulated_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,32 @@ impl<'de> Visitor<'de> for DeletedBitmapVisitor {
}
}

/// Wrapper struct to get a completed bitmap with the height it was created at
#[derive(Debug, Clone)]
pub struct CompleteDeletedBitmap {
deleted: Bitmap,
height: u64,
hash: HashOutput,
}

impl CompleteDeletedBitmap {
pub fn new(deleted: Bitmap, height: u64, hash: HashOutput) -> CompleteDeletedBitmap {
CompleteDeletedBitmap { deleted, height, hash }
}

pub fn into_bitmap(self) -> Bitmap {
self.deleted
}

pub fn bitmap(&self) -> &Bitmap {
&self.deleted
}

pub fn dissolve(self) -> (Bitmap, u64, HashOutput) {
(self.deleted, self.height, self.hash)
}
}

pub struct BlockHeaderAccumulatedDataBuilder<'a> {
previous_accum: &'a BlockHeaderAccumulatedData,
hash: Option<HashOutput>,
Expand Down
7 changes: 4 additions & 3 deletions base_layer/core/src/chain_storage/async_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ use crate::{
ChainBlock,
ChainHeader,
ChainStorageError,
CompleteDeletedBitmap,
DbTransaction,
DeletedBitmap,
HistoricalBlock,
HorizonData,
MmrTree,
Expand Down Expand Up @@ -143,7 +143,7 @@ impl<B: BlockchainBackend + 'static> AsyncBlockchainDb<B> {

make_async_fn!(fetch_utxos(hashes: Vec<HashOutput>) -> Vec<Option<(TransactionOutput, bool)>>, "fetch_utxos");

make_async_fn!(fetch_utxos_by_mmr_position(start: u64, end: u64, end_header_hash: HashOutput) -> (Vec<PrunedOutput>, Bitmap), "fetch_utxos_by_mmr_position");
make_async_fn!(fetch_utxos_by_mmr_position(start: u64, end: u64, deleted: Arc<Bitmap>) -> (Vec<PrunedOutput>, Bitmap), "fetch_utxos_by_mmr_position");

//---------------------------------- Kernel --------------------------------------------//
make_async_fn!(fetch_kernel_by_excess_sig(excess_sig: Signature) -> Option<(TransactionKernel, HashOutput)>, "fetch_kernel_by_excess_sig");
Expand Down Expand Up @@ -216,7 +216,6 @@ impl<B: BlockchainBackend + 'static> AsyncBlockchainDb<B> {
make_async_fn!(fetch_block_accumulated_data_by_height(height: u64) -> BlockAccumulatedData, "fetch_block_accumulated_data_by_height");

//---------------------------------- Misc. --------------------------------------------//
make_async_fn!(fetch_deleted_bitmap() -> DeletedBitmap, "fetch_deleted_bitmap");

make_async_fn!(fetch_block_timestamps(start_hash: HashOutput) -> RollingVec<EpochTime>, "fetch_block_timestamps");

Expand All @@ -225,6 +224,8 @@ impl<B: BlockchainBackend + 'static> AsyncBlockchainDb<B> {
make_async_fn!(fetch_target_difficulties_for_next_block(current_block_hash: HashOutput) -> TargetDifficulties, "fetch_target_difficulties_for_next_block");

make_async_fn!(fetch_block_hashes_from_header_tip(n: usize, offset: usize) -> Vec<HashOutput>, "fetch_block_hashes_from_header_tip");

make_async_fn!(fetch_complete_deleted_bitmap_at(hash: HashOutput) -> CompleteDeletedBitmap, "fetch_deleted_bitmap");
}

impl<B: BlockchainBackend + 'static> From<BlockchainDatabase<B>> for AsyncBlockchainDb<B> {
Expand Down
38 changes: 26 additions & 12 deletions base_layer/core/src/chain_storage/blockchain_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
use crate::{
blocks::{Block, BlockHeader, NewBlockTemplate},
chain_storage::{
accumulated_data::{BlockAccumulatedData, BlockHeaderAccumulatedData},
accumulated_data::{BlockAccumulatedData, BlockHeaderAccumulatedData, CompleteDeletedBitmap},
consts::{
BLOCKCHAIN_DATABASE_ORPHAN_STORAGE_CAPACITY,
BLOCKCHAIN_DATABASE_PRUNED_MODE_PRUNING_INTERVAL,
Expand All @@ -35,7 +35,6 @@ use crate::{
BlockchainBackend,
ChainBlock,
ChainHeader,
DeletedBitmap,
HistoricalBlock,
HorizonData,
MmrTree,
Expand Down Expand Up @@ -337,15 +336,10 @@ where B: BlockchainBackend
&self,
start: u64,
end: u64,
end_header_hash: HashOutput,
deleted: Arc<Bitmap>,
) -> Result<(Vec<PrunedOutput>, Bitmap), ChainStorageError> {
let db = self.db_read_access()?;
let accum_data = db.fetch_block_accumulated_data(&end_header_hash).or_not_found(
"BlockAccumulatedData",
"hash",
end_header_hash.to_hex(),
)?;
db.fetch_utxos_by_mmr_position(start, end, accum_data.deleted())
db.fetch_utxos_by_mmr_position(start, end, deleted.as_ref())
}

/// Returns the block header at the given block height.
Expand Down Expand Up @@ -879,10 +873,30 @@ where B: BlockchainBackend
db.fetch_horizon_data()
}

/// Returns the full deleted bitmap at the current blockchain tip
pub fn fetch_deleted_bitmap(&self) -> Result<DeletedBitmap, ChainStorageError> {
pub fn fetch_complete_deleted_bitmap_at(
&self,
hash: HashOutput,
) -> Result<CompleteDeletedBitmap, ChainStorageError> {
let db = self.db_read_access()?;
db.fetch_deleted_bitmap()
let mut deleted = db.fetch_deleted_bitmap()?.into_bitmap();

let end_header =
fetch_header_by_block_hash(&*db, hash.clone()).or_not_found("BlockHeader", "start_hash", hash.to_hex())?;
let chain_metadata = db.fetch_chain_metadata()?;
let height = chain_metadata.height_of_longest_chain();
for i in end_header.height..height {
// order here does not matter, we dont have to go in reverse
deleted.xor_inplace(
db.fetch_block_accumulated_data_by_height(i + 1)
.or_not_found("BlockAccumulatedData", "height", height.to_string())?
.deleted(),
);
}
Ok(CompleteDeletedBitmap::new(
deleted,
height,
chain_metadata.best_block().clone(),
))
}
}

Expand Down
1 change: 1 addition & 0 deletions base_layer/core/src/chain_storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub use accumulated_data::{
BlockHeaderAccumulatedDataBuilder,
ChainBlock,
ChainHeader,
CompleteDeletedBitmap,
DeletedBitmap,
};

Expand Down
8 changes: 2 additions & 6 deletions integration_tests/features/Sync.feature
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,9 @@ Feature: Block Sync
Then all nodes are on the same chain at height 1505

@critical
Scenario: Pruned mode
Scenario: Pruned mode sync test
# TODO: Merge steps into single lines
Given I have a seed node SEED
Given I have a base node NODE1 connected to all seed nodes
When I mine a block on NODE1 with coinbase CB1
When I mine a block on NODE1 with coinbase CB2
Expand All @@ -96,14 +97,9 @@ Feature: Block Sync
When I mine a block on NODE1 with coinbase CB5
Then all nodes are at height 5
When I spend outputs CB1 via NODE1
# When I spend outputs CB2 via NODE1
# When I spend outputs CB3 via NODE1
And I mine 3 blocks on NODE1
Given I have a pruned node PNODE2 connected to node NODE1 with pruning horizon set to 5
Then all nodes are at height 8
# Spend txns so that they are pruned when tip moves
# When I spend outputs CB4 via PNODE2
# When I spend outputs CB5 via PNODE2
When I mine 15 blocks on PNODE2
Then all nodes are at height 23

Expand Down

0 comments on commit d0d1d61

Please sign in to comment.