Skip to content

Commit

Permalink
fix: show warnings on console (#3225)
Browse files Browse the repository at this point in the history
Show warnings and errors on apps as well as logs

Also moved a struct that was in the middle of a method into its own file
  • Loading branch information
stringhandler authored Aug 23, 2021
1 parent b5b6223 commit 3291021
Show file tree
Hide file tree
Showing 15 changed files with 374 additions and 250 deletions.
79 changes: 69 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion applications/tari_base_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ chrono = "0.4"
config = { version = "0.9.3" }
futures = { version = "^0.3.1", default-features = false, features = ["alloc"]}
log = { version = "0.4.8", features = ["std"] }
log4rs = { version = "0.8.3", features = ["toml_format", "rolling_file_appender", "compound_policy", "size_trigger", "fixed_window_roller"] }
regex = "1"
rustyline = "6.0"
rustyline-derive = "0.3"
Expand All @@ -40,3 +39,5 @@ tonic = "0.2"
[features]
avx2 = ["tari_core/avx2", "tari_crypto/avx2", "tari_p2p/avx2", "tari_wallet/avx2", "tari_comms/avx2", "tari_comms_dht/avx2"]
safe = []


Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl BlockSync {
StateEvent::BlocksSynchronized
},
Err(err) => {
debug!(target: LOG_TARGET, "Block sync failed: {}", err);
warn!(target: LOG_TARGET, "Block sync failed: {}", err);
StateEvent::BlockSyncFailed
},
}
Expand Down
3 changes: 3 additions & 0 deletions base_layer/core/src/base_node/sync/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@

#[cfg(feature = "base_node")]
mod service;
#[cfg(feature = "base_node")]
mod sync_utxos_task;

#[cfg(feature = "base_node")]
pub use service::BaseNodeSyncRpcService;

Expand Down
177 changes: 2 additions & 175 deletions base_layer/core/src/base_node/sync/rpc/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use crate::{
base_node::sync::rpc::BaseNodeSyncService,
base_node::sync::rpc::{sync_utxos_task::SyncUtxosTask, BaseNodeSyncService},
chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, OrNotFound},
crypto::tari_utilities::Hashable,
iterators::NonOverlappingIntegerPairIter,
proto,
proto::base_node::{
Expand All @@ -32,14 +31,13 @@ use crate::{
SyncBlocksRequest,
SyncHeadersRequest,
SyncKernelsRequest,
SyncUtxo,
SyncUtxosRequest,
SyncUtxosResponse,
},
};
use futures::{channel::mpsc, stream, SinkExt};
use log::*;
use std::{cmp, sync::Arc, time::Instant};
use std::cmp;
use tari_comms::protocol::rpc::{Request, Response, RpcStatus, Streaming};
use tari_crypto::tari_utilities::hex::Hex;
use tokio::task;
Expand Down Expand Up @@ -415,177 +413,6 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
req.include_deleted_bitmaps
);

struct SyncUtxosTask<B> {
db: AsyncBlockchainDb<B>,
request: SyncUtxosRequest,
}

impl<B> SyncUtxosTask<B>
where B: BlockchainBackend + 'static
{
pub fn new(db: AsyncBlockchainDb<B>, request: SyncUtxosRequest) -> Self {
Self { db, request }
}

pub async fn run(self, mut tx: mpsc::Sender<Result<SyncUtxosResponse, RpcStatus>>) {
if let Err(err) = self.start_streaming(&mut tx).await {
let _ = tx.send(Err(err)).await;
}
}

async fn start_streaming(
&self,
tx: &mut mpsc::Sender<Result<SyncUtxosResponse, RpcStatus>>,
) -> Result<(), RpcStatus> {
let end_header = self
.db
.fetch_header_by_block_hash(self.request.end_header_hash.clone())
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?
.ok_or_else(|| {
RpcStatus::not_found(format!(
"End header hash {} is was not found",
self.request.end_header_hash.to_hex()
))
})?;

if self.request.start > end_header.output_mmr_size - 1 {
return Err(RpcStatus::bad_request(format!(
"start index {} cannot be greater than the end header's output MMR size ({})",
self.request.start, end_header.output_mmr_size
)));
}

let prev_header = self
.db
.fetch_header_containing_utxo_mmr(self.request.start)
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?;
let (mut prev_header, _) = prev_header.into_parts();

if prev_header.height > end_header.height {
return Err(RpcStatus::bad_request("start index is greater than end index"));
}
// we need to construct a temp bitmap for the height the client requested
let bitmap = self
.db
.fetch_complete_deleted_bitmap_at(end_header.hash())
.await
.map_err(|_| RpcStatus::not_found("Could not get tip deleted bitmap"))?
.into_bitmap();

let bitmap = Arc::new(bitmap);
loop {
let timer = Instant::now();
if prev_header.height == end_header.height {
break;
}

let current_header = self
.db
.fetch_header(prev_header.height + 1)
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?
.ok_or_else(|| {
RpcStatus::general(format!(
"Potential data consistency issue: header {} not found",
prev_header.height + 1
))
})?;

debug!(
target: LOG_TARGET,
"previous header = {} ({}) current header = {} ({})",
prev_header.height,
prev_header.hash().to_hex(),
current_header.height,
current_header.hash().to_hex()
);

let start = cmp::max(self.request.start, prev_header.output_mmr_size);
let end = current_header.output_mmr_size - 1;

if tx.is_closed() {
debug!(target: LOG_TARGET, "Exiting sync_utxos early because client has gone",);
break;
}

debug!(
target: LOG_TARGET,
"Streaming UTXOs {}-{} ({}) for block #{}",
start,
end,
end.saturating_sub(start).saturating_add(1),
current_header.height
);
let (utxos, deleted_diff) = self
.db
.fetch_utxos_by_mmr_position(start, end, bitmap.clone())
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?;
trace!(
target: LOG_TARGET,
"Loaded {} UTXO(s) and |deleted_diff| = {}",
utxos.len(),
deleted_diff.cardinality(),
);
let mut utxos = stream::iter(
utxos
.into_iter()
.enumerate()
// Only include pruned UTXOs if include_pruned_utxos is true
.filter(|(_, utxo)| self.request.include_pruned_utxos || !utxo.is_pruned())
.map(|(i, utxo)| {
SyncUtxosResponse {
utxo_or_deleted: Some(proto::base_node::sync_utxos_response::UtxoOrDeleted::Utxo(
SyncUtxo::from(utxo)
)),
mmr_index: start + i as u64,
}
})
.map(Ok)
.map(Ok),
);

// Ensure task stops if the peer prematurely stops their RPC session
if tx.send_all(&mut utxos).await.is_err() {
break;
}

if self.request.include_deleted_bitmaps {
let bitmaps = SyncUtxosResponse {
utxo_or_deleted: Some(proto::base_node::sync_utxos_response::UtxoOrDeleted::DeletedDiff(
deleted_diff.serialize(),
)),
mmr_index: 0,
};

if tx.send(Ok(bitmaps)).await.is_err() {
break;
}
}
debug!(
target: LOG_TARGET,
"Streamed utxos {} to {} in {:.2?} (including stream backpressure)",
start,
end,
timer.elapsed()
);

prev_header = current_header;
}

debug!(
target: LOG_TARGET,
"UTXO sync completed to UTXO {} (Header hash = {})",
prev_header.output_mmr_size,
prev_header.hash().to_hex()
);

Ok(())
}
}

let (tx, rx) = mpsc::channel(200);
task::spawn(SyncUtxosTask::new(self.db(), request.into_message()).run(tx));

Expand Down
Loading

0 comments on commit 3291021

Please sign in to comment.