diff --git a/Cargo.lock b/Cargo.lock index fb57e72b00..edf18218a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2828,11 +2828,13 @@ dependencies = [ "futures", "kaspa-consensus", "kaspa-consensus-core", + "kaspa-consensus-notify", "kaspa-consensusmanager", "kaspa-core", "kaspa-database", "kaspa-hashes", "kaspa-index-core", + "kaspa-notify", "kaspa-utils", "log", "parking_lot", diff --git a/consensus/notify/src/notification.rs b/consensus/notify/src/notification.rs index 586501f01a..27c4d4d190 100644 --- a/consensus/notify/src/notification.rs +++ b/consensus/notify/src/notification.rs @@ -180,6 +180,7 @@ pub enum SyncStateChangedNotification { Proof { current: u8, max: u8 }, // todo other variants Headers { headers: u64, progress: i64 }, Blocks { blocks: u64, progress: i64 }, + UtxoResync, } impl SyncStateChangedNotification { @@ -194,4 +195,8 @@ impl SyncStateChangedNotification { pub fn new_blocks(blocks: u64, progress: i64) -> SyncStateChangedNotification { Self::Blocks { blocks, progress } } + + pub fn new_resync_utxo() -> SyncStateChangedNotification { + Self::UtxoResync + } } diff --git a/indexes/utxoindex/Cargo.toml b/indexes/utxoindex/Cargo.toml index 83bba5c2f1..8be9701c18 100644 --- a/indexes/utxoindex/Cargo.toml +++ b/indexes/utxoindex/Cargo.toml @@ -9,13 +9,16 @@ license.workspace = true [dependencies] kaspa-hashes.workspace = true -thiserror.workspace = true kaspa-consensus-core.workspace = true +kaspa-consensusmanager.workspace = true +kaspa-consensus-notify.workspace = true +kaspa-notify.workspace = true kaspa-core.workspace = true kaspa-utils.workspace = true kaspa-index-core.workspace = true kaspa-database.workspace = true -kaspa-consensusmanager.workspace = true + +thiserror.workspace = true log.workspace = true rocksdb.workspace = true serde.workspace = true diff --git a/indexes/utxoindex/src/index.rs b/indexes/utxoindex/src/index.rs index 5174c824bd..9c6508d5f5 100644 --- a/indexes/utxoindex/src/index.rs +++ b/indexes/utxoindex/src/index.rs @@ -7,6 +7,14 @@ use crate::{ IDENT, }; use kaspa_consensus_core::{tx::ScriptPublicKeys, utxo::utxo_diff::UtxoDiff, BlockHashSet}; + +#[cfg(not(test))] +use kaspa_consensus_notify::notification::{Notification, SyncStateChangedNotification}; +#[cfg(not(test))] +use kaspa_consensus_notify::root::ConsensusNotificationRoot; +#[cfg(not(test))] +use kaspa_notify::notifier::Notify; + use kaspa_consensusmanager::{ConsensusManager, ConsensusResetHandler}; use kaspa_core::{info, trace}; use kaspa_database::prelude::{StoreError, StoreResult, DB}; @@ -26,12 +34,23 @@ const RESYNC_CHUNK_SIZE: usize = 2048; //Increased from 1k (used in go-kaspad), pub struct UtxoIndex { consensus_manager: Arc, store: Store, + #[cfg(not(test))] + notification_root: Arc, } impl UtxoIndex { /// Creates a new [`UtxoIndex`] within a [`RwLock`] - pub fn new(consensus_manager: Arc, db: Arc) -> UtxoIndexResult>> { - let mut utxoindex = Self { consensus_manager: consensus_manager.clone(), store: Store::new(db) }; + pub fn new( + consensus_manager: Arc, + db: Arc, + #[cfg(not(test))] notification_root: Arc, + ) -> UtxoIndexResult>> { + let mut utxoindex = Self { + consensus_manager: consensus_manager.clone(), + store: Store::new(db), + #[cfg(not(test))] + notification_root, + }; if !utxoindex.is_synced()? { utxoindex.resync()?; } @@ -128,6 +147,11 @@ impl UtxoIndexApi for UtxoIndex { fn resync(&mut self) -> UtxoIndexResult<()> { info!("Resyncing the utxoindex..."); + #[cfg(not(test))] + self.notification_root + .notify(Notification::SyncStateChanged(SyncStateChangedNotification::new_resync_utxo())) + .expect("expecting an open unbounded channel"); + self.store.delete_all()?; let consensus = self.consensus_manager.consensus(); let session = futures::executor::block_on(consensus.session_blocking()); diff --git a/rpc/core/src/convert/notification.rs b/rpc/core/src/convert/notification.rs index 9f01bfe051..52e0fb7772 100644 --- a/rpc/core/src/convert/notification.rs +++ b/rpc/core/src/convert/notification.rs @@ -104,6 +104,7 @@ impl From<&consensus_notify::SyncStateChangedNotification> for SyncStateChangedN consensus_notify::SyncStateChangedNotification::Blocks { blocks, progress } => { Self::Blocks { blocks: *blocks, progress: *progress } } + consensus_notify::SyncStateChangedNotification::UtxoResync => Self::UtxoResync, } } } diff --git a/rpc/core/src/model/message.rs b/rpc/core/src/model/message.rs index 62b48fc399..3e460c0fcd 100644 --- a/rpc/core/src/model/message.rs +++ b/rpc/core/src/model/message.rs @@ -1090,6 +1090,7 @@ pub enum SyncStateChangedNotification { #[allow(dead_code)] progress: i64, }, + UtxoResync, } /// diff --git a/rpc/grpc/core/proto/rpc.proto b/rpc/grpc/core/proto/rpc.proto index c3eba2132b..0c03aca4bd 100644 --- a/rpc/grpc/core/proto/rpc.proto +++ b/rpc/grpc/core/proto/rpc.proto @@ -777,6 +777,7 @@ message SyncStateChangedNotificationMessage { ProofState proof = 1; HeadersState headers = 2; BlocksState blocks = 3; + UtxoResyncState utxoResync = 4; } } @@ -793,4 +794,6 @@ message HeadersState { message BlocksState { uint64 blocks = 1; int64 progress = 2; -} \ No newline at end of file +} + +message UtxoResyncState {} \ No newline at end of file diff --git a/rpc/grpc/core/src/convert/notification.rs b/rpc/grpc/core/src/convert/notification.rs index a3b84787c2..8074d9d5d7 100644 --- a/rpc/grpc/core/src/convert/notification.rs +++ b/rpc/grpc/core/src/convert/notification.rs @@ -94,6 +94,9 @@ from!(item: &kaspa_rpc_core::SyncStateChangedNotification, SyncStateChangedNotif progress: *progress })) }, + SyncStateChangedNotification::UtxoResync => SyncStateChangedNotificationMessage { + sync_state: Some(sync_state_changed_notification_message::SyncState::UtxoResync(crate::protowire::UtxoResyncState{})) + }, } });