Skip to content

Commit

Permalink
Merge branch 'development' into fix-burn-test
Browse files Browse the repository at this point in the history
  • Loading branch information
stringhandler authored Sep 27, 2022
2 parents 354e417 + 0dad9e8 commit d6d325a
Show file tree
Hide file tree
Showing 34 changed files with 497 additions and 233 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,5 @@ buildtools/Output/

clients/base_node_grpc_client/package-lock.json
clients/validator_node_grpc_client/package-lock.json
clients/wallet_grpc_client/package-lock.json
clients/wallet_grpc_client/package-lock.json
pie/
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,7 @@ impl TryFrom<TransactionInput> for grpc::TransactionInput {
.commitment()
.map_err(|_| "Non-compact Transaction input should contain commitment".to_string())?
.to_vec(),
hash: input
.canonical_hash()
.map_err(|_| "Non-compact Transaction input should be able to be hashed".to_string())?
.to_vec(),
hash: input.canonical_hash().to_vec(),

script: input
.script()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl CommandContext {
io::stdout().flush().await?;
// we can only check till the pruning horizon, 0 is archive node so it needs to check every block.
if height > horizon_height {
match self.node_service.get_block(height).await {
match self.node_service.get_block(height, false).await {
Err(err) => {
// We need to check the data itself, as FetchMatchingBlocks will suppress any error, only
// logging it.
Expand Down
4 changes: 2 additions & 2 deletions applications/tari_base_node/src/commands/command/get_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl CommandContext {
pub async fn get_block(&self, height: u64, format: Format) -> Result<(), Error> {
let block = self
.blockchain_db
.fetch_blocks(height..=height)
.fetch_blocks(height..=height, false)
.await?
.pop()
.ok_or(ArgsError::NotFoundAt { height })?;
Expand All @@ -90,7 +90,7 @@ impl CommandContext {
pub async fn get_block_by_hash(&self, hash: HashOutput, format: Format) -> Result<(), Error> {
let block = self
.blockchain_db
.fetch_block_by_hash(hash)
.fetch_block_by_hash(hash, false)
.await?
.ok_or(ArgsError::NotFound)?;
match format {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,13 @@ impl CommandContext {

let block = self
.node_service
.get_block(height)
.get_block(height, true)
.await?
.ok_or_else(|| anyhow!("Error in db, block not found at height {}", height))?;

let prev_block = self
.node_service
.get_block(height - 1)
.get_block(height - 1, true)
.await?
.ok_or_else(|| anyhow!("Error in db, block not found at height {}", height))?;

Expand Down
6 changes: 3 additions & 3 deletions applications/tari_base_node/src/grpc/base_node_grpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer {
for (start, end) in page_iter {
debug!(target: LOG_TARGET, "Page: {}-{}", start, end);
// TODO: Better error handling
let result_data = match handler.get_blocks(start..=end).await {
let result_data = match handler.get_blocks(start..=end, true).await {
Err(err) => {
warn!(target: LOG_TARGET, "Internal base node service error: {}", err);
return;
Expand Down Expand Up @@ -850,7 +850,7 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer {
task::spawn(async move {
let page_iter = NonOverlappingIntegerPairIter::new(start, end + 1, GET_BLOCKS_PAGE_SIZE);
for (start, end) in page_iter {
let blocks = match handler.get_blocks(start..=end).await {
let blocks = match handler.get_blocks(start..=end, false).await {
Err(err) => {
warn!(
target: LOG_TARGET,
Expand Down Expand Up @@ -1455,7 +1455,7 @@ async fn get_block_group(

let (start, end) = get_heights(&height_request, handler.clone()).await?;

let blocks = match handler.get_blocks(start..=end).await {
let blocks = match handler.get_blocks(start..=end, false).await {
Err(err) => {
warn!(
target: LOG_TARGET,
Expand Down
2 changes: 1 addition & 1 deletion applications/tari_base_node/src/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ async fn do_recovery<D: BlockchainBackend + 'static>(
io::stdout().flush().unwrap();
trace!(target: LOG_TARGET, "Asking for block with height: {}", counter);
let block = source_database
.fetch_block(counter)
.fetch_block(counter, true)
.map_err(|e| anyhow!("Could not get block from recovery db: {}", e))?
.try_into_block()?;
trace!(target: LOG_TARGET, "Adding block: {}", block);
Expand Down
22 changes: 16 additions & 6 deletions base_layer/core/src/base_node/comms_interface/comms_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,24 @@ pub enum NodeCommsRequest {
FetchHeaders(RangeInclusive<u64>),
FetchHeadersByHashes(Vec<HashOutput>),
FetchMatchingUtxos(Vec<HashOutput>),
FetchMatchingBlocks(RangeInclusive<u64>),
FetchBlocksByHash(Vec<HashOutput>),
FetchMatchingBlocks {
range: RangeInclusive<u64>,
compact: bool,
},
FetchBlocksByHash {
block_hashes: Vec<HashOutput>,
compact: bool,
},
FetchBlocksByKernelExcessSigs(Vec<Signature>),
FetchBlocksByUtxos(Vec<Commitment>),
GetHeaderByHash(HashOutput),
GetBlockByHash(HashOutput),
GetNewBlockTemplate(GetNewBlockTemplateRequest),
GetNewBlock(NewBlockTemplate),
FetchKernelByExcessSig(Signature),
FetchMempoolTransactionsByExcessSigs { excess_sigs: Vec<PrivateKey> },
FetchMempoolTransactionsByExcessSigs {
excess_sigs: Vec<PrivateKey>,
},
}

#[derive(Debug, Serialize, Deserialize)]
Expand All @@ -75,10 +83,12 @@ impl Display for NodeCommsRequest {
},
FetchHeadersByHashes(v) => write!(f, "FetchHeadersByHashes (n={})", v.len()),
FetchMatchingUtxos(v) => write!(f, "FetchMatchingUtxos (n={})", v.len()),
FetchMatchingBlocks(range) => {
write!(f, "FetchMatchingBlocks ({:?})", range)
FetchMatchingBlocks { range, compact } => {
write!(f, "FetchMatchingBlocks ({:?}, {})", range, compact)
},
FetchBlocksByHash { block_hashes, compact } => {
write!(f, "FetchBlocksByHash (n={}, {})", block_hashes.len(), compact)
},
FetchBlocksByHash(v) => write!(f, "FetchBlocksByHash (n={})", v.len()),
FetchBlocksByKernelExcessSigs(v) => write!(f, "FetchBlocksByKernelExcessSigs (n={})", v.len()),
FetchBlocksByUtxos(v) => write!(f, "FetchBlocksByUtxos (n={})", v.len()),
GetHeaderByHash(v) => write!(f, "GetHeaderByHash({})", v.to_hex()),
Expand Down
3 changes: 3 additions & 0 deletions base_layer/core/src/base_node/comms_interface/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use tari_common_types::types::FixedHash;
use tari_comms_dht::outbound::DhtOutboundError;
use tari_service_framework::reply_channel::TransportChannelError;
use thiserror::Error;
Expand Down Expand Up @@ -67,4 +68,6 @@ pub enum CommsInterfaceError {
BlockError(#[from] BlockError),
#[error("Invalid request for {request}: {details}")]
InvalidRequest { request: &'static str, details: String },
#[error("Peer sent invalid full block {hash}: {details}")]
InvalidFullBlock { hash: FixedHash, details: String },
}
90 changes: 78 additions & 12 deletions base_layer/core/src/base_node/comms_interface/inbound_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use crate::{
consensus::{ConsensusConstants, ConsensusManager},
mempool::Mempool,
proof_of_work::{Difficulty, PowAlgorithm},
transactions::aggregated_body::AggregateBody,
validation::helpers,
};

Expand Down Expand Up @@ -164,20 +165,20 @@ where B: BlockchainBackend + 'static
}
Ok(NodeCommsResponse::TransactionOutputs(res))
},
NodeCommsRequest::FetchMatchingBlocks(range) => {
let blocks = self.blockchain_db.fetch_blocks(range).await?;
NodeCommsRequest::FetchMatchingBlocks { range, compact } => {
let blocks = self.blockchain_db.fetch_blocks(range, compact).await?;
Ok(NodeCommsResponse::HistoricalBlocks(blocks))
},
NodeCommsRequest::FetchBlocksByHash(block_hashes) => {
NodeCommsRequest::FetchBlocksByHash { block_hashes, compact } => {
let mut blocks = Vec::with_capacity(block_hashes.len());
for block_hash in block_hashes {
let block_hex = block_hash.to_hex();
debug!(
target: LOG_TARGET,
"A peer has requested a block with hash {}", block_hex
"A peer has requested a block with hash {} (compact = {})", block_hex, compact
);

match self.blockchain_db.fetch_block_by_hash(block_hash).await {
match self.blockchain_db.fetch_block_by_hash(block_hash, compact).await {
Ok(Some(block)) => blocks.push(block),
Ok(None) => warn!(
target: LOG_TARGET,
Expand Down Expand Up @@ -269,7 +270,7 @@ where B: BlockchainBackend + 'static
Ok(NodeCommsResponse::BlockHeader(header))
},
NodeCommsRequest::GetBlockByHash(hash) => {
let block = self.blockchain_db.fetch_block_by_hash(hash).await?;
let block = self.blockchain_db.fetch_block_by_hash(hash, false).await?;
Ok(NodeCommsResponse::HistoricalBlock(Box::new(block)))
},
NodeCommsRequest::GetNewBlockTemplate(request) => {
Expand Down Expand Up @@ -422,7 +423,7 @@ where B: BlockchainBackend + 'static
&mut self,
source_peer: NodeId,
new_block: NewBlock,
) -> Result<Arc<Block>, CommsInterfaceError> {
) -> Result<Block, CommsInterfaceError> {
let NewBlock {
header,
coinbase_kernel,
Expand All @@ -436,7 +437,7 @@ where B: BlockchainBackend + 'static
.with_coinbase_utxo(coinbase_output, coinbase_kernel)
.with_header(header)
.build();
return Ok(Arc::new(block));
return Ok(block);
}

let block_hash = header.hash();
Expand All @@ -454,6 +455,7 @@ where B: BlockchainBackend + 'static
current_meta.best_block().to_hex(),
source_peer,
);
metrics::compact_block_tx_misses(header.height).set(excess_sigs.len() as i64);
let block = self.request_full_block_from_peer(source_peer, block_hash).await?;
return Ok(block);
}
Expand Down Expand Up @@ -549,22 +551,22 @@ where B: BlockchainBackend + 'static
return Ok(block);
}

Ok(Arc::new(block))
Ok(block)
}

async fn request_full_block_from_peer(
&mut self,
source_peer: NodeId,
block_hash: BlockHash,
) -> Result<Arc<Block>, CommsInterfaceError> {
) -> Result<Block, CommsInterfaceError> {
let mut historical_block = self
.outbound_nci
.request_blocks_by_hashes_from_peer(vec![block_hash], Some(source_peer.clone()))
.await?;

return match historical_block.pop() {
Some(block) => {
let block = Arc::new(block.try_into_block()?);
let block = block.try_into_block()?;
Ok(block)
},
None => {
Expand Down Expand Up @@ -600,7 +602,7 @@ where B: BlockchainBackend + 'static
/// source_peer - the peer that sent this new block message, or None if the block was generated by a local miner
pub async fn handle_block(
&mut self,
block: Arc<Block>,
block: Block,
source_peer: Option<NodeId>,
) -> Result<BlockHash, CommsInterfaceError> {
let block_hash = block.hash();
Expand All @@ -618,6 +620,8 @@ where B: BlockchainBackend + 'static
);
debug!(target: LOG_TARGET, "Incoming block: {}", block);
let timer = Instant::now();
let block = self.hydrate_block(block).await?;

let add_block_result = self.blockchain_db.add_block(block.clone()).await;
// Create block event on block event stream
match add_block_result {
Expand Down Expand Up @@ -691,6 +695,68 @@ where B: BlockchainBackend + 'static
}
}

async fn hydrate_block(&mut self, block: Block) -> Result<Arc<Block>, CommsInterfaceError> {
let block_hash = block.hash();
let block_height = block.header.height;
if block.body.inputs().is_empty() {
debug!(
target: LOG_TARGET,
"Block #{} ({}) contains no inputs so nothing to hydrate",
block_height,
block_hash.to_hex(),
);
return Ok(Arc::new(block));
}

let timer = Instant::now();
let (header, mut inputs, outputs, kernels) = block.dissolve();

let db = self.blockchain_db.inner().db_read_access()?;
for input in &mut inputs {
if !input.is_compact() {
continue;
}

let output_mined_info =
db.fetch_output(&input.output_hash())?
.ok_or_else(|| CommsInterfaceError::InvalidFullBlock {
hash: block_hash,
details: format!("Output {} to be spent does not exist in db", input.output_hash()),
})?;

match output_mined_info.output {
PrunedOutput::Pruned { .. } => {
return Err(CommsInterfaceError::InvalidFullBlock {
hash: block_hash,
details: format!("Output {} to be spent is pruned", input.output_hash()),
});
},
PrunedOutput::NotPruned { output } => {
input.add_output_data(
output.version,
output.features,
output.commitment,
output.script,
output.sender_offset_public_key,
output.covenant,
output.encrypted_value,
output.minimum_value_promise,
);
},
}
}
debug!(
target: LOG_TARGET,
"Hydrated block #{} ({}) with {} input(s) in {:.2?}",
block_height,
block_hash.to_hex(),
inputs.len(),
timer.elapsed()
);
let block = Block::new(header, AggregateBody::new(inputs, outputs, kernels));
Ok(Arc::new(block))
}

fn publish_block_event(&self, event: BlockEvent) {
if let Err(event) = self.block_event_sender.send(Arc::new(event)) {
debug!(target: LOG_TARGET, "No event subscribers. Event {} dropped.", event.0)
Expand Down
14 changes: 11 additions & 3 deletions base_layer/core/src/base_node/comms_interface/local_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,11 @@ impl LocalNodeCommsInterface {
pub async fn get_blocks(
&mut self,
range: RangeInclusive<u64>,
compact: bool,
) -> Result<Vec<HistoricalBlock>, CommsInterfaceError> {
match self
.request_sender
.call(NodeCommsRequest::FetchMatchingBlocks(range))
.call(NodeCommsRequest::FetchMatchingBlocks { range, compact })
.await??
{
NodeCommsResponse::HistoricalBlocks(blocks) => Ok(blocks),
Expand All @@ -96,10 +97,17 @@ impl LocalNodeCommsInterface {
}

/// Request the block header at the given height
pub async fn get_block(&mut self, height: u64) -> Result<Option<HistoricalBlock>, CommsInterfaceError> {
pub async fn get_block(
&mut self,
height: u64,
compact: bool,
) -> Result<Option<HistoricalBlock>, CommsInterfaceError> {
match self
.request_sender
.call(NodeCommsRequest::FetchMatchingBlocks(height..=height))
.call(NodeCommsRequest::FetchMatchingBlocks {
range: height..=height,
compact,
})
.await??
{
NodeCommsResponse::HistoricalBlocks(mut blocks) => Ok(blocks.pop()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,22 @@ impl OutboundNodeCommsInterface {
}
}

/// Fetch the Blocks corresponding to the provided block hashes from a specific base node. The requested blocks
/// could be chain blocks or orphan blocks.
/// Fetch the Blocks corresponding to the provided block hashes from a specific base node.
pub async fn request_blocks_by_hashes_from_peer(
&mut self,
block_hashes: Vec<BlockHash>,
node_id: Option<NodeId>,
) -> Result<Vec<HistoricalBlock>, CommsInterfaceError> {
if let NodeCommsResponse::HistoricalBlocks(blocks) = self
.request_sender
.call((NodeCommsRequest::FetchBlocksByHash(block_hashes), node_id))
.call((
NodeCommsRequest::FetchBlocksByHash {
block_hashes,
// We always request compact inputs from peer
compact: true,
},
node_id,
))
.await??
{
Ok(blocks)
Expand Down
Loading

0 comments on commit d6d325a

Please sign in to comment.