Skip to content

Commit

Permalink
"Remove unused sync_state.rs file and refactor consensus"
Browse files Browse the repository at this point in the history
The sync_state.rs file in the consensus/src directory has been removed as it is no longer needed. All references to SYNC_STATE which was previously declared in this file have been replaced with suitable alternatives. This was done to streamline the consensus and avoid unnecessary complexities.
  • Loading branch information
biryukovmaxim committed Aug 27, 2023
1 parent 004a064 commit 43bed76
Show file tree
Hide file tree
Showing 14 changed files with 50 additions and 127 deletions.
12 changes: 0 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions consensus/src/consensus/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ impl ConsensusServices {
params.anticone_finalization_depth(),
params.ghostdag_k,
notification_root,
params.daa_window_params,
));

let sync_manager = SyncManager::new(
Expand Down
2 changes: 0 additions & 2 deletions consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,3 @@ pub mod params;
pub mod pipeline;
pub mod processes;
pub mod test_helpers;

pub mod sync_state;
9 changes: 4 additions & 5 deletions consensus/src/pipeline/virtual_processor/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ use kaspa_muhash::MuHash;
use kaspa_notify::notifier::Notify;

use crate::model::stores::headers::CompactHeaderData;
use crate::sync_state::SYNC_STATE;
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use itertools::Itertools;
use kaspa_consensus_core::config::params::DAAWindowParams;
Expand Down Expand Up @@ -319,8 +318,9 @@ impl VirtualStateProcessor {
)))
.expect("expecting an open unbounded channel");

SYNC_STATE.is_synced_or(|| {
let CompactHeaderData { timestamp, daa_score, .. } = self.headers_store.get_compact_header_data(new_sink).unwrap();
{
let sink = self.virtual_stores.read().state.get().unwrap().ghostdag_data.selected_parent;
let CompactHeaderData { timestamp, daa_score, .. } = self.headers_store.get_compact_header_data(sink).unwrap();
let diff = -(unix_now() as i64)
+ timestamp as i64
+ self.daa_window_params.expected_daa_window_duration_in_milliseconds(daa_score) as i64;
Expand All @@ -331,8 +331,7 @@ impl VirtualStateProcessor {
))
.expect("expecting an open unbounded channel");
}
diff
});
}
}

pub(super) fn virtual_finality_point(&self, virtual_ghostdag_data: &GhostdagData, pruning_point: Hash) -> Hash {
Expand Down
23 changes: 18 additions & 5 deletions consensus/src/processes/pruning_proof/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use itertools::Itertools;
use parking_lot::RwLock;
use rocksdb::WriteBatch;

use kaspa_consensus_core::config::params::DAAWindowParams;
use kaspa_consensus_core::{
blockhash::{BlockHashExtensions, BlockHashes, ORIGIN},
errors::{
Expand All @@ -25,14 +26,15 @@ use kaspa_consensus_notify::{
notification::{Notification, SyncStateChangedNotification},
root::ConsensusNotificationRoot,
};
use kaspa_core::time::unix_now;
use kaspa_core::{debug, info, trace};
use kaspa_database::prelude::{ConnBuilder, StoreResultEmptyTuple, StoreResultExtensions};
use kaspa_hashes::Hash;
use kaspa_notify::notifier::Notify;
use kaspa_pow::calc_block_level;
use kaspa_utils::{binary_heap::BinaryHeapExtensions, vec::VecExtensions};

use crate::sync_state::SYNC_STATE;
use crate::model::stores::headers::CompactHeaderData;
use crate::{
consensus::{
services::{DbDagTraversalManager, DbGhostdagManager, DbParentsManager, DbWindowManager},
Expand Down Expand Up @@ -108,6 +110,7 @@ pub struct PruningProofManager {
anticone_finalization_depth: u64,
ghostdag_k: KType,
notification_root: Arc<ConsensusNotificationRoot>,
daa_window_params: DAAWindowParams,
}

impl PruningProofManager {
Expand All @@ -126,6 +129,7 @@ impl PruningProofManager {
anticone_finalization_depth: u64,
ghostdag_k: KType,
notification_root: Arc<ConsensusNotificationRoot>,
daa_window_params: DAAWindowParams,
) -> Self {
Self {
db,
Expand Down Expand Up @@ -157,6 +161,7 @@ impl PruningProofManager {
anticone_finalization_depth,
ghostdag_k,
notification_root,
daa_window_params,
}
}

Expand Down Expand Up @@ -414,11 +419,19 @@ impl PruningProofManager {

let mut selected_tip_by_level = vec![None; self.max_block_level as usize + 1];
for level in (0..=self.max_block_level).rev() {
if !SYNC_STATE.is_synced() {
self.notification_root
.notify(Notification::SyncStateChanged(SyncStateChangedNotification::new_proof(level, self.max_block_level)))
.expect("expecting an open unbounded channel");
{
let sink = self.virtual_stores.read().state.get().unwrap().ghostdag_data.selected_parent;
let CompactHeaderData { timestamp, daa_score, .. } = self.headers_store.get_compact_header_data(sink).unwrap();
let diff = -(unix_now() as i64)
+ timestamp as i64
+ self.daa_window_params.expected_daa_window_duration_in_milliseconds(daa_score) as i64;
if diff > 0 {
self.notification_root
.notify(Notification::SyncStateChanged(SyncStateChangedNotification::new_proof(level, self.max_block_level)))
.expect("expecting an open unbounded channel");
}
}

info!("Validating level {level} from the pruning point proof");
let level_idx = level as usize;
let mut selected_tip = None;
Expand Down
59 changes: 0 additions & 59 deletions consensus/src/sync_state.rs

This file was deleted.

1 change: 0 additions & 1 deletion indexes/utxoindex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ license.workspace = true

[dependencies]
kaspa-hashes.workspace = true
kaspa-consensus.workspace = true
kaspa-consensus-core.workspace = true
kaspa-consensusmanager.workspace = true
kaspa-consensus-notify.workspace = true
Expand Down
11 changes: 7 additions & 4 deletions indexes/utxoindex/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,14 @@ impl UtxoIndexApi for UtxoIndex {
info!("Resyncing the utxoindex...");

#[cfg(not(test))]
if !kaspa_consensus::sync_state::SYNC_STATE.is_synced() {
self.notification_root
.notify(Notification::SyncStateChanged(SyncStateChangedNotification::new_utxo_resync()))
.expect("expecting an open unbounded channel");
{
if !futures::executor::block_on(self.consensus_manager.consensus().session_blocking()).is_nearly_synced() {
self.notification_root
.notify(Notification::SyncStateChanged(SyncStateChangedNotification::new_utxo_resync()))
.expect("expecting an open unbounded channel");
}
}

self.store.delete_all()?;
let consensus = self.consensus_manager.consensus();
let session = futures::executor::block_on(consensus.session_blocking());
Expand Down
3 changes: 0 additions & 3 deletions kaspad/src/daemon.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::{fs, path::PathBuf, process::exit, sync::Arc, time::Duration};

use async_channel::unbounded;
use kaspa_consensus::sync_state::SYNC_STATE;
use kaspa_consensus_core::{
config::{Config, ConfigBuilder},
errors::config::{ConfigError, ConfigResult},
Expand Down Expand Up @@ -234,8 +233,6 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm
processing_counters.clone(),
));
let consensus_manager = Arc::new(ConsensusManager::new(consensus_factory));
_ = SYNC_STATE.consensus_manager.set(consensus_manager.clone());
SYNC_STATE.daa_window_params.set(config.params.daa_window_params).unwrap();

let consensus_monitor = Arc::new(ConsensusMonitor::new(processing_counters.clone(), tick_service.clone()));

Expand Down
1 change: 0 additions & 1 deletion protocol/flows/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ license.workspace = true

[dependencies]
kaspa-core.workspace = true
kaspa-consensus.workspace = true
kaspa-consensus-core.workspace = true
kaspa-consensus-notify.workspace = true
kaspa-p2p-lib.workspace = true
Expand Down
3 changes: 0 additions & 3 deletions protocol/flows/src/flow_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use crate::v5;
use async_trait::async_trait;
use kaspa_addressmanager::AddressManager;
use kaspa_connectionmanager::ConnectionManager;
use kaspa_consensus::sync_state::SYNC_STATE;
use kaspa_consensus_core::block::Block;
use kaspa_consensus_core::config::Config;
use kaspa_consensus_core::errors::block::RuleError;
Expand Down Expand Up @@ -183,8 +182,6 @@ impl FlowContext {
notification_root: Arc<ConsensusNotificationRoot>,
) -> Self {
let hub = Hub::new();
let moved_hub = hub.clone();
_ = SYNC_STATE.has_peers.set(Box::new(move || moved_hub.has_peers() as _));

let orphan_resolution_range = BASELINE_ORPHAN_RESOLUTION_RANGE + (config.bps() as f64).log2().min(3.0) as u32;

Expand Down
38 changes: 13 additions & 25 deletions protocol/flows/src/v5/ibd/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use kaspa_consensus_core::{
BlockHashSet,
};
use kaspa_consensus_notify::notification::{Notification, SyncStateChangedNotification};
use kaspa_consensus_notify::root::ConsensusNotificationRoot;
use kaspa_consensusmanager::{spawn_blocking, ConsensusProxy, StagingConsensus};
use kaspa_core::{debug, info, warn};
use kaspa_hashes::Hash;
Expand Down Expand Up @@ -46,8 +45,6 @@ pub struct IbdFlow {

// Receives relay blocks from relay flow which are out of orphan resolution range and hence trigger IBD
relay_receiver: Receiver<Block>,

notification_root: Option<Arc<ConsensusNotificationRoot>>,
}

#[async_trait::async_trait]
Expand All @@ -70,14 +67,8 @@ pub enum IbdType {
// TODO: define a peer banning strategy

impl IbdFlow {
pub fn new(
ctx: FlowContext,
router: Arc<Router>,
incoming_route: IncomingRoute,
relay_receiver: Receiver<Block>,
notification_root: Arc<ConsensusNotificationRoot>,
) -> Self {
Self { ctx, router, incoming_route, relay_receiver, notification_root: Some(notification_root) }
pub fn new(ctx: FlowContext, router: Arc<Router>, incoming_route: IncomingRoute, relay_receiver: Receiver<Block>) -> Self {
Self { ctx, router, incoming_route, relay_receiver }
}

async fn start_impl(&mut self) -> Result<(), ProtocolError> {
Expand Down Expand Up @@ -291,10 +282,9 @@ impl IbdFlow {
if passed > Duration::from_secs(1) {
info!("Processed {} trusted blocks in the last {:.2}s (total {})", i - last_index, passed.as_secs_f64(), i);

if !kaspa_consensus::sync_state::SYNC_STATE.is_synced() {
self.notification_root
.as_ref()
.unwrap()
if !self.ctx.consensus().session().await.async_is_nearly_synced().await {
self.ctx
.notification_root
.notify(Notification::SyncStateChanged(SyncStateChangedNotification::new_trust_sync(
(i - last_index) as u64,
i as u64,
Expand Down Expand Up @@ -328,10 +318,9 @@ impl IbdFlow {
relay_block.header.daa_score,
"block headers",
Some(|headers: usize, progress: i32| {
if !kaspa_consensus::sync_state::SYNC_STATE.is_synced() {
self.notification_root
.as_ref()
.unwrap()
if !futures::executor::block_on(self.ctx.consensus().session_blocking()).is_nearly_synced() {
self.ctx
.notification_root
.notify(Notification::SyncStateChanged(SyncStateChangedNotification::new_headers(
headers as u64,
progress as i64,
Expand Down Expand Up @@ -446,8 +435,7 @@ staging selected tip ({}) is too small or negative. Aborting IBD...",
RequestPruningPointUtxoSetMessage { pruning_point_hash: Some(pruning_point.into()) }
))
.await?;
let mut chunk_stream =
PruningPointUtxosetChunkStream::new(&self.router, &mut self.incoming_route, self.notification_root.as_ref().unwrap());
let mut chunk_stream = PruningPointUtxosetChunkStream::new(&self.router, &mut self.incoming_route, &self.ctx);
let mut multiset = MuHash::new();
while let Some(chunk) = chunk_stream.next().await? {
multiset = consensus
Expand All @@ -471,13 +459,14 @@ staging selected tip ({}) is too small or negative. Aborting IBD...",

let low_header = consensus.async_get_header(*hashes.first().expect("hashes was non empty")).await?;
let high_header = consensus.async_get_header(*hashes.last().expect("hashes was non empty")).await?;
let notification_root = self.notification_root.take().unwrap();
let notification_root = self.ctx.notification_root.clone();
let consensus_move = self.ctx.consensus().clone();
let mut progress_reporter = ProgressReporter::new(
low_header.daa_score,
high_header.daa_score,
"blocks",
Some(|blocks, progress| {
if !kaspa_consensus::sync_state::SYNC_STATE.is_synced() {
Some(move |blocks, progress| {
if !futures::executor::block_on(consensus_move.session_blocking()).is_nearly_synced() {
notification_root
.notify(Notification::SyncStateChanged(SyncStateChangedNotification::new_blocks(
blocks as u64,
Expand Down Expand Up @@ -506,7 +495,6 @@ staging selected tip ({}) is too small or negative. Aborting IBD...",
let prev_chunk_len = prev_jobs.len();
try_join_all(prev_jobs).await?;
progress_reporter.report_completion(prev_chunk_len);
self.notification_root = Some(notification_root);
self.ctx.on_new_block_template().await?;

Ok(())
Expand Down
Loading

0 comments on commit 43bed76

Please sign in to comment.