Skip to content

Commit

Permalink
Add UTXO resync notification feature
Browse files Browse the repository at this point in the history
Implemented the UTXO resync notification feature in the UTXO Index, notifying the user when the UTXO index starts resyncing. This is done by adding a new UtxoResyncState in the SyncStateChangedNotification enum and notifying the user when the resync event occurs. Dependencies were updated in Cargo.toml and the notification was integrated into the GRPC layer.
  • Loading branch information
biryukovmaxim committed Aug 24, 2023
1 parent 9612922 commit ce08cdf
Show file tree
Hide file tree
Showing 8 changed files with 47 additions and 5 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions consensus/notify/src/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ pub enum SyncStateChangedNotification {
Proof { current: u8, max: u8 }, // todo other variants
Headers { headers: u64, progress: i64 },
Blocks { blocks: u64, progress: i64 },
UtxoResync,
}

impl SyncStateChangedNotification {
Expand All @@ -194,4 +195,8 @@ impl SyncStateChangedNotification {
pub fn new_blocks(blocks: u64, progress: i64) -> SyncStateChangedNotification {
Self::Blocks { blocks, progress }
}

pub fn new_resync_utxo() -> SyncStateChangedNotification {
Self::UtxoResync
}
}
7 changes: 5 additions & 2 deletions indexes/utxoindex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@ license.workspace = true

[dependencies]
kaspa-hashes.workspace = true
thiserror.workspace = true
kaspa-consensus-core.workspace = true
kaspa-consensusmanager.workspace = true
kaspa-consensus-notify.workspace = true
kaspa-notify.workspace = true
kaspa-core.workspace = true
kaspa-utils.workspace = true
kaspa-index-core.workspace = true
kaspa-database.workspace = true
kaspa-consensusmanager.workspace = true

thiserror.workspace = true
log.workspace = true
rocksdb.workspace = true
serde.workspace = true
Expand Down
28 changes: 26 additions & 2 deletions indexes/utxoindex/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ use crate::{
IDENT,
};
use kaspa_consensus_core::{tx::ScriptPublicKeys, utxo::utxo_diff::UtxoDiff, BlockHashSet};

#[cfg(not(test))]
use kaspa_consensus_notify::notification::{Notification, SyncStateChangedNotification};
#[cfg(not(test))]
use kaspa_consensus_notify::root::ConsensusNotificationRoot;
#[cfg(not(test))]
use kaspa_notify::notifier::Notify;

use kaspa_consensusmanager::{ConsensusManager, ConsensusResetHandler};
use kaspa_core::{info, trace};
use kaspa_database::prelude::{StoreError, StoreResult, DB};
Expand All @@ -26,12 +34,23 @@ const RESYNC_CHUNK_SIZE: usize = 2048; //Increased from 1k (used in go-kaspad),
pub struct UtxoIndex {
consensus_manager: Arc<ConsensusManager>,
store: Store,
#[cfg(not(test))]
notification_root: Arc<ConsensusNotificationRoot>,
}

impl UtxoIndex {
/// Creates a new [`UtxoIndex`] within a [`RwLock`]
pub fn new(consensus_manager: Arc<ConsensusManager>, db: Arc<DB>) -> UtxoIndexResult<Arc<RwLock<Self>>> {
let mut utxoindex = Self { consensus_manager: consensus_manager.clone(), store: Store::new(db) };
pub fn new(
consensus_manager: Arc<ConsensusManager>,
db: Arc<DB>,
#[cfg(not(test))] notification_root: Arc<ConsensusNotificationRoot>,
) -> UtxoIndexResult<Arc<RwLock<Self>>> {
let mut utxoindex = Self {
consensus_manager: consensus_manager.clone(),
store: Store::new(db),
#[cfg(not(test))]
notification_root,
};
if !utxoindex.is_synced()? {
utxoindex.resync()?;
}
Expand Down Expand Up @@ -128,6 +147,11 @@ impl UtxoIndexApi for UtxoIndex {
fn resync(&mut self) -> UtxoIndexResult<()> {
info!("Resyncing the utxoindex...");

#[cfg(not(test))]
self.notification_root
.notify(Notification::SyncStateChanged(SyncStateChangedNotification::new_resync_utxo()))
.expect("expecting an open unbounded channel");

self.store.delete_all()?;
let consensus = self.consensus_manager.consensus();
let session = futures::executor::block_on(consensus.session_blocking());
Expand Down
1 change: 1 addition & 0 deletions rpc/core/src/convert/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ impl From<&consensus_notify::SyncStateChangedNotification> for SyncStateChangedN
consensus_notify::SyncStateChangedNotification::Blocks { blocks, progress } => {
Self::Blocks { blocks: *blocks, progress: *progress }
}
consensus_notify::SyncStateChangedNotification::UtxoResync => Self::UtxoResync,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions rpc/core/src/model/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1090,6 +1090,7 @@ pub enum SyncStateChangedNotification {
#[allow(dead_code)]
progress: i64,
},
UtxoResync,
}

///
Expand Down
5 changes: 4 additions & 1 deletion rpc/grpc/core/proto/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,7 @@ message SyncStateChangedNotificationMessage {
ProofState proof = 1;
HeadersState headers = 2;
BlocksState blocks = 3;
UtxoResyncState utxoResync = 4;
}
}

Expand All @@ -793,4 +794,6 @@ message HeadersState {
message BlocksState {
uint64 blocks = 1;
int64 progress = 2;
}
}

message UtxoResyncState {}
3 changes: 3 additions & 0 deletions rpc/grpc/core/src/convert/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ from!(item: &kaspa_rpc_core::SyncStateChangedNotification, SyncStateChangedNotif
progress: *progress
}))
},
SyncStateChangedNotification::UtxoResync => SyncStateChangedNotificationMessage {
sync_state: Some(sync_state_changed_notification_message::SyncState::UtxoResync(crate::protowire::UtxoResyncState{}))
},
}
});

Expand Down

0 comments on commit ce08cdf

Please sign in to comment.