Skip to content

Commit

Permalink
Refactor DAA parameters into dedicated struct
Browse files Browse the repository at this point in the history
Modified the consensus parameters to group Difficulty Adjustment Algorithm (DAA) parameters into their dedicated struct. The changes affect how these parameters are retrieved and set across the mining, testing and consensus codebase. Structured DAA parameters within their own entity enhances code readability and manageability.
  • Loading branch information
biryukovmaxim committed Aug 26, 2023
1 parent 66e409f commit bac2a1e
Show file tree
Hide file tree
Showing 15 changed files with 168 additions and 62 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,5 @@ pub mod params;
pub mod pipeline;
pub mod processes;
pub mod test_helpers;

pub mod sync_state;
6 changes: 3 additions & 3 deletions consensus/src/pipeline/virtual_processor/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl TestContext {

pub fn build_block_template_row(&mut self, nonces: impl Iterator<Item = usize>) -> &mut Self {
for nonce in nonces {
self.simulated_time += self.consensus.params().target_time_per_block;
self.simulated_time += self.consensus.params().daa_window_params.target_time_per_block;
self.current_templates.push_back(self.build_block_template(nonce as u64, self.simulated_time));
}
self
Expand All @@ -69,7 +69,7 @@ impl TestContext {
pub async fn build_and_insert_disqualified_chain(&mut self, mut parents: Vec<Hash>, len: usize) -> Hash {
// The chain will be disqualified since build_block_with_parents builds utxo-invalid blocks
for _ in 0..len {
self.simulated_time += self.consensus.params().target_time_per_block;
self.simulated_time += self.consensus.params().daa_window_params.target_time_per_block;
let b = self.build_block_with_parents(parents, 0, self.simulated_time);
parents = vec![b.header.hash];
self.validate_and_insert_block(b.to_immutable()).await;
Expand Down Expand Up @@ -205,7 +205,7 @@ async fn double_search_disqualified_test() {
.edit_consensus_params(|p| {
p.max_block_parents = 4;
p.mergeset_size_limit = 10;
p.min_difficulty_window_len = p.legacy_difficulty_window_size;
p.min_difficulty_window_len = p.daa_window_params.legacy_difficulty_window_size;
})
.build();
let mut ctx = TestContext::new(TestConsensus::new(&config));
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/processes/coinbase.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ mod tests {
params.max_coinbase_payload_len,
params.deflationary_phase_daa_score,
params.pre_deflationary_phase_base_subsidy,
params.target_time_per_block,
params.daa_window_params.target_time_per_block,
)
}

Expand Down
10 changes: 6 additions & 4 deletions consensus/src/processes/pruning_proof/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use kaspa_notify::notifier::Notify;
use kaspa_pow::calc_block_level;
use kaspa_utils::{binary_heap::BinaryHeapExtensions, vec::VecExtensions};

use crate::sync_state::SYNC_STATE;
use crate::{
consensus::{
services::{DbDagTraversalManager, DbGhostdagManager, DbParentsManager, DbWindowManager},
Expand Down Expand Up @@ -413,10 +414,11 @@ impl PruningProofManager {

let mut selected_tip_by_level = vec![None; self.max_block_level as usize + 1];
for level in (0..=self.max_block_level).rev() {
self.notification_root
.notify(Notification::SyncStateChanged(SyncStateChangedNotification::new_proof(level, self.max_block_level)))
.expect("expecting an open unbounded channel");

if !SYNC_STATE.is_synced() {
self.notification_root
.notify(Notification::SyncStateChanged(SyncStateChangedNotification::new_proof(level, self.max_block_level)))
.expect("expecting an open unbounded channel");
}
info!("Validating level {level} from the pruning point proof");
let level_idx = level as usize;
let mut selected_tip = None;
Expand Down
61 changes: 61 additions & 0 deletions consensus/src/sync_state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use kaspa_consensus_core::config::params::DAAWindowParams;
use kaspa_consensusmanager::ConsensusManager;
use kaspa_core::time::unix_now;
use once_cell::sync::{Lazy, OnceCell};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;

pub static SYNC_STATE: Lazy<SyncState> = Lazy::new(|| SyncState::default());

#[derive(Default)]
pub struct SyncState {
pub consensus_manager: OnceCell<Arc<ConsensusManager>>,
pub daa_window_params: OnceCell<DAAWindowParams>,
pub has_peers: OnceCell<Box<dyn Fn() -> bool + 'static + Sync + Send>>,

is_nearly_synced: Arc<AtomicBool>,
}

impl SyncState {
pub fn is_synced(&self) -> bool {
self.is_nearly_synced.load(Ordering::Acquire) && self.has_peers.get().is_some_and(|has_peers| has_peers())
}

// diff is sink timestamp + expected_daa_window_duration_in_milliseconds(daa_score) - unix_now()
pub(super) fn is_synced_or(&self, check_diff: impl FnOnce() -> i64) -> bool {
let (is_nearly_synced, has_peers) =
(self.is_nearly_synced.load(Ordering::Acquire), self.has_peers.get().is_some_and(|has_peers| has_peers()));
if !is_nearly_synced && has_peers {
let diff = check_diff();
if diff > 0 {
if let Ok(_) = self.is_nearly_synced.compare_exchange(false, true, Ordering::SeqCst, Ordering::Relaxed) {
self.watch(diff);
}
}
}
is_nearly_synced && has_peers
}

fn watch(&self, mut diff: i64) {
let is_nearly_synced = Arc::clone(&self.is_nearly_synced);
let daa_window_params = self.daa_window_params.get().unwrap().clone();
let consensus_manager = Arc::clone(&self.consensus_manager.get().unwrap());

tokio::spawn(async move {
while diff > 0 {
tokio::time::sleep(Duration::from_millis(diff as u64)).await;

let session = consensus_manager.consensus().session().await;
let sink = session.async_get_sink().await;
let h = session.async_get_header(sink).await.unwrap();
drop(session);

diff = -(unix_now() as i64)
+ daa_window_params.expected_daa_window_duration_in_milliseconds(h.daa_score) as i64
+ h.timestamp as i64;
}
is_nearly_synced.store(false, Ordering::Release);
});
}
}
1 change: 1 addition & 0 deletions indexes/utxoindex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ license.workspace = true

[dependencies]
kaspa-hashes.workspace = true
kaspa-consensus.workspace = true
kaspa-consensus-core.workspace = true
kaspa-consensusmanager.workspace = true
kaspa-consensus-notify.workspace = true
Expand Down
9 changes: 5 additions & 4 deletions indexes/utxoindex/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,11 @@ impl UtxoIndexApi for UtxoIndex {
info!("Resyncing the utxoindex...");

#[cfg(not(test))]
self.notification_root
.notify(Notification::SyncStateChanged(SyncStateChangedNotification::new_utxo_resync()))
.expect("expecting an open unbounded channel");

if !kaspa_consensus::sync_state::SYNC_STATE.is_synced() {
self.notification_root
.notify(Notification::SyncStateChanged(SyncStateChangedNotification::new_utxo_resync()))
.expect("expecting an open unbounded channel");
}
self.store.delete_all()?;
let consensus = self.consensus_manager.consensus();
let session = futures::executor::block_on(consensus.session_blocking());
Expand Down
15 changes: 12 additions & 3 deletions kaspad/src/daemon.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{fs, path::PathBuf, process::exit, sync::Arc, time::Duration};

use async_channel::unbounded;
use kaspa_consensus::sync_state::SYNC_STATE;
use kaspa_consensus_core::{
config::{Config, ConfigBuilder},
errors::config::{ConfigError, ConfigResult},
Expand Down Expand Up @@ -233,6 +234,9 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm
processing_counters.clone(),
));
let consensus_manager = Arc::new(ConsensusManager::new(consensus_factory));
_ = SYNC_STATE.consensus_manager.set(consensus_manager.clone());
SYNC_STATE.daa_window_params.set(config.params.daa_window_params).unwrap();

let consensus_monitor = Arc::new(ConsensusMonitor::new(processing_counters.clone(), tick_service.clone()));

let perf_monitor_builder = PerfMonitorBuilder::new()
Expand All @@ -253,16 +257,21 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm
let index_service: Option<Arc<IndexService>> = if args.utxoindex {
// Use only a single thread for none-consensus databases
let utxoindex_db = kaspa_database::prelude::ConnBuilder::default().with_db_path(utxoindex_db_dir).build();
let utxoindex = UtxoIndexProxy::new(UtxoIndex::new(consensus_manager.clone(), utxoindex_db).unwrap());
let utxoindex =
UtxoIndexProxy::new(UtxoIndex::new(consensus_manager.clone(), utxoindex_db, notification_root.clone()).unwrap());
let index_service = Arc::new(IndexService::new(&notify_service.notifier(), Some(utxoindex)));
Some(index_service)
} else {
None
};

let address_manager = AddressManager::new(config.clone(), meta_db);
let mining_manager =
MiningManagerProxy::new(Arc::new(MiningManager::new(config.target_time_per_block, false, config.max_block_mass, None)));
let mining_manager = MiningManagerProxy::new(Arc::new(MiningManager::new(
config.daa_window_params.target_time_per_block,
false,
config.max_block_mass,
None,
)));

let flow_context = Arc::new(FlowContext::new(
consensus_manager.clone(),
Expand Down
6 changes: 3 additions & 3 deletions mining/src/mempool/check_transaction_standard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ mod tests {
for test in tests.iter() {
for net in NetworkType::iter() {
let params: Params = net.into();
let mut config = Config::build_default(params.target_time_per_block, false, params.max_block_mass);
let mut config = Config::build_default(params.daa_window_params.target_time_per_block, false, params.max_block_mass);
config.minimum_relay_transaction_fee = test.minimum_relay_transaction_fee;
let mempool = Mempool::new(config);

Expand Down Expand Up @@ -363,7 +363,7 @@ mod tests {
for test in tests {
for net in NetworkType::iter() {
let params: Params = net.into();
let mut config = Config::build_default(params.target_time_per_block, false, params.max_block_mass);
let mut config = Config::build_default(params.daa_window_params.target_time_per_block, false, params.max_block_mass);
config.minimum_relay_transaction_fee = test.minimum_relay_transaction_fee;
let mempool = Mempool::new(config);

Expand Down Expand Up @@ -542,7 +542,7 @@ mod tests {
for test in tests {
for net in NetworkType::iter() {
let params: Params = net.into();
let config = Config::build_default(params.target_time_per_block, false, params.max_block_mass);
let config = Config::build_default(params.daa_window_params.target_time_per_block, false, params.max_block_mass);
let mempool = Mempool::new(config);

// Ensure standard-ness is as expected.
Expand Down
1 change: 1 addition & 0 deletions protocol/flows/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ license.workspace = true

[dependencies]
kaspa-core.workspace = true
kaspa-consensus.workspace = true
kaspa-consensus-core.workspace = true
kaspa-consensus-notify.workspace = true
kaspa-p2p-lib.workspace = true
Expand Down
3 changes: 3 additions & 0 deletions protocol/flows/src/flow_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::v5;
use async_trait::async_trait;
use kaspa_addressmanager::AddressManager;
use kaspa_connectionmanager::ConnectionManager;
use kaspa_consensus::sync_state::SYNC_STATE;
use kaspa_consensus_core::block::Block;
use kaspa_consensus_core::config::Config;
use kaspa_consensus_core::errors::block::RuleError;
Expand Down Expand Up @@ -182,6 +183,8 @@ impl FlowContext {
notification_root: Arc<ConsensusNotificationRoot>,
) -> Self {
let hub = Hub::new();
let moved_hub = hub.clone();
_ = SYNC_STATE.has_peers.set(Box::new(move || moved_hub.has_peers() as _));

let orphan_resolution_range = BASELINE_ORPHAN_RESOLUTION_RANGE + (config.bps() as f64).log2().min(3.0) as u32;

Expand Down
45 changes: 29 additions & 16 deletions protocol/flows/src/v5/ibd/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,14 +290,17 @@ impl IbdFlow {
let passed = now.duration_since(last_time);
if passed > Duration::from_secs(1) {
info!("Processed {} trusted blocks in the last {:.2}s (total {})", i - last_index, passed.as_secs_f64(), i);
self.notification_root
.as_ref()
.unwrap()
.notify(Notification::SyncStateChanged(SyncStateChangedNotification::new_trust_sync(
(i - last_index) as u64,
i as u64,
)))
.expect("expecting an open unbounded channel");

if !kaspa_consensus::sync_state::SYNC_STATE.is_synced() {
self.notification_root
.as_ref()
.unwrap()
.notify(Notification::SyncStateChanged(SyncStateChangedNotification::new_trust_sync(
(i - last_index) as u64,
i as u64,
)))
.expect("expecting an open unbounded channel");
}

last_time = now;
last_index = i;
Expand Down Expand Up @@ -325,11 +328,16 @@ impl IbdFlow {
relay_block.header.daa_score,
"block headers",
Some(|headers: usize, progress: i32| {
self.notification_root
.as_ref()
.unwrap()
.notify(Notification::SyncStateChanged(SyncStateChangedNotification::new_headers(headers as u64, progress as i64)))
.expect("expecting an open unbounded channel")
if !kaspa_consensus::sync_state::SYNC_STATE.is_synced() {
self.notification_root
.as_ref()
.unwrap()
.notify(Notification::SyncStateChanged(SyncStateChangedNotification::new_headers(
headers as u64,
progress as i64,
)))
.expect("expecting an open unbounded channel")
}
}),
);

Expand Down Expand Up @@ -469,9 +477,14 @@ staging selected tip ({}) is too small or negative. Aborting IBD...",
high_header.daa_score,
"blocks",
Some(|blocks, progress| {
notification_root
.notify(Notification::SyncStateChanged(SyncStateChangedNotification::new_blocks(blocks as u64, progress as i64)))
.expect("expecting an open unbounded channel")
if !kaspa_consensus::sync_state::SYNC_STATE.is_synced() {
notification_root
.notify(Notification::SyncStateChanged(SyncStateChangedNotification::new_blocks(
blocks as u64,
progress as i64,
)))
.expect("expecting an open unbounded channel")
}
}),
);

Expand Down
14 changes: 8 additions & 6 deletions protocol/flows/src/v5/ibd/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,14 @@ impl<'a, 'b> PruningPointUtxosetChunkStream<'a, 'b> {
self.utxo_count += chunk.len();
if self.i % IBD_BATCH_SIZE == 0 {
info!("Received {} UTXO set chunks so far, totaling in {} UTXOs", self.i, self.utxo_count);
self.notification_root
.notify(Notification::SyncStateChanged(SyncStateChangedNotification::new_utxo_sync(
self.i as u64,
self.utxo_count as u64,
)))
.expect("expecting an open unbounded channel");
if !kaspa_consensus::sync_state::SYNC_STATE.is_synced() {
self.notification_root
.notify(Notification::SyncStateChanged(SyncStateChangedNotification::new_utxo_sync(
self.i as u64,
self.utxo_count as u64,
)))
.expect("expecting an open unbounded channel");
}
self.router
.enqueue(make_message!(
Payload::RequestNextPruningPointUtxoSetChunk,
Expand Down
Loading

0 comments on commit bac2a1e

Please sign in to comment.