diff --git a/mithril-aggregator/src/dependency_injection/builder.rs b/mithril-aggregator/src/dependency_injection/builder.rs index 75050f8e72b..578b0e83492 100644 --- a/mithril-aggregator/src/dependency_injection/builder.rs +++ b/mithril-aggregator/src/dependency_injection/builder.rs @@ -106,6 +106,9 @@ pub struct DependenciesBuilder { /// Certificate repository. pub certificate_repository: Option>, + /// Open message repository. + pub open_message_repository: Option>, + /// Verification key store. pub verification_key_store: Option>, @@ -205,6 +208,7 @@ impl DependenciesBuilder { multi_signer: None, certificate_pending_store: None, certificate_repository: None, + open_message_repository: None, verification_key_store: None, protocol_parameters_store: None, cardano_cli_runner: None, @@ -430,6 +434,21 @@ impl DependenciesBuilder { Ok(self.certificate_repository.as_ref().cloned().unwrap()) } + async fn build_open_message_repository(&mut self) -> Result> { + Ok(Arc::new(OpenMessageRepository::new( + self.get_sqlite_connection().await?, + ))) + } + + /// Get a configured [OpenMessageRepository]. + pub async fn get_open_message_repository(&mut self) -> Result> { + if self.open_message_repository.is_none() { + self.open_message_repository = Some(self.build_open_message_repository().await?); + } + + Ok(self.open_message_repository.as_ref().cloned().unwrap()) + } + async fn build_verification_key_store(&mut self) -> Result> { Ok(Arc::new(SignerRegistrationStore::new( self.get_sqlite_connection().await?, @@ -1015,6 +1034,7 @@ impl DependenciesBuilder { multi_signer: self.get_multi_signer().await?, certificate_pending_store: self.get_certificate_pending_store().await?, certificate_repository: self.get_certificate_repository().await?, + open_message_repository: self.get_open_message_repository().await?, verification_key_store: self.get_verification_key_store().await?, protocol_parameters_store: self.get_protocol_parameters_store().await?, chain_observer: self.get_chain_observer().await?, @@ -1189,9 +1209,7 @@ impl DependenciesBuilder { /// Create [CertifierService] service pub async fn build_certifier_service(&mut self) -> Result> { - let open_message_repository = Arc::new(OpenMessageRepository::new( - self.get_sqlite_connection().await?, - )); + let open_message_repository = self.get_open_message_repository().await?; let single_signature_repository = Arc::new(SingleSignatureRepository::new( self.get_sqlite_connection().await?, )); diff --git a/mithril-aggregator/src/dependency_injection/containers.rs b/mithril-aggregator/src/dependency_injection/containers.rs index ad5d3d9938c..a1c1c2fa8c8 100644 --- a/mithril-aggregator/src/dependency_injection/containers.rs +++ b/mithril-aggregator/src/dependency_injection/containers.rs @@ -16,7 +16,6 @@ use mithril_common::{ BeaconProvider, }; -use crate::services::{EpochService, MessageService}; use crate::{ configuration::*, database::provider::{CertificateRepository, SignedEntityStorer, SignerGetter, StakePoolStore}, @@ -28,6 +27,10 @@ use crate::{ CertificatePendingStore, ProtocolParametersStorer, SignerRegisterer, SignerRegistrationRoundOpener, Snapshotter, VerificationKeyStorer, }; +use crate::{ + database::provider::OpenMessageRepository, + services::{EpochService, MessageService}, +}; /// MultiSignerWrapper wraps a [MultiSigner] pub type MultiSignerWrapper = Arc>; @@ -61,6 +64,9 @@ pub struct DependencyContainer { /// Certificate store. pub certificate_repository: Arc, + /// Open message store. + pub open_message_repository: Arc, + /// Verification key store. pub verification_key_store: Arc, diff --git a/mithril-aggregator/tests/open_message_expiration.rs b/mithril-aggregator/tests/open_message_expiration.rs new file mode 100644 index 00000000000..bb2b948463f --- /dev/null +++ b/mithril-aggregator/tests/open_message_expiration.rs @@ -0,0 +1,156 @@ +mod test_extensions; + +use std::time::Duration; + +use mithril_aggregator::Configuration; +use mithril_common::{ + entities::{ + Beacon, Epoch, ProtocolParameters, SignedEntityType, SignedEntityTypeDiscriminants, + StakeDistributionParty, + }, + test_utils::MithrilFixtureBuilder, +}; +use test_extensions::{utilities::get_test_dir, ExpectedCertificate, RuntimeTester}; + +#[tokio::test] +async fn open_message_expiration() { + let protocol_parameters = ProtocolParameters { + k: 5, + m: 150, + phi_f: 0.95, + }; + let configuration = Configuration { + protocol_parameters: protocol_parameters.clone(), + data_stores_directory: get_test_dir("create_certificate").join("aggregator.sqlite3"), + ..Configuration::new_sample() + }; + let mut tester = + RuntimeTester::build(Beacon::new("devnet".to_string(), 1, 1), configuration).await; + + comment!("create signers & declare stake distribution"); + let fixture = MithrilFixtureBuilder::default() + .with_signers(10) + .with_protocol_parameters(protocol_parameters.clone()) + .build(); + + tester.init_state_from_fixture(&fixture).await.unwrap(); + + comment!("Boostrap the genesis certificate"); + tester.register_genesis_certificate(&fixture).await.unwrap(); + + assert_last_certificate_eq!( + tester, + ExpectedCertificate::new_genesis( + Beacon::new("devnet".to_string(), 1, 1), + fixture.compute_and_encode_avk() + ) + ); + + comment!("Increase immutable number"); + tester.increase_immutable_number().await.unwrap(); + + comment!("start the runtime state machine"); + cycle!(tester, "ready"); + cycle!(tester, "signing"); + + comment!("register signers"); + tester + .register_signers(&fixture.signers_fixture()) + .await + .unwrap(); + cycle_err!(tester, "signing"); + + comment!("signers send their single signature"); + tester + .send_single_signatures( + SignedEntityTypeDiscriminants::MithrilStakeDistribution, + &fixture.signers_fixture(), + ) + .await + .unwrap(); + + comment!("The state machine should issue a certificate for the MithrilStakeDistribution"); + cycle!(tester, "ready"); + assert_last_certificate_eq!( + tester, + ExpectedCertificate::new( + Beacon::new("devnet".to_string(), 1, 2), + StakeDistributionParty::from_signers(fixture.signers_with_stake()).as_slice(), + fixture.compute_and_encode_avk(), + SignedEntityType::MithrilStakeDistribution(Epoch(1)), + ExpectedCertificate::genesis_identifier(&Beacon::new("devnet".to_string(), 1, 1)), + ) + ); + + comment!("The state machine should get back to signing to sign CardanoImmutableFilesFull"); + // todo!: remove this immutable increase: + // right now because we only have one state machine for all signed entity type we need it else + // the state machine will stay in the idle state since its beacon didn't change. + // With one state machine per signed entity type this problem will disappear. + tester.increase_immutable_number().await.unwrap(); + cycle!(tester, "signing"); + + comment!( + "Schedule the open message for CardanoImmutableFilesFull to expire, and wait until it does" + ); + let open_message_timeout = Duration::from_millis(100); + tester + .activate_open_message_expiration( + SignedEntityTypeDiscriminants::CardanoImmutableFilesFull, + open_message_timeout, + ) + .await + .unwrap(); + tokio::time::sleep(2 * open_message_timeout).await; + let signers_for_immutables = &fixture.signers_fixture()[0..=6]; + tester + .send_single_signatures( + SignedEntityTypeDiscriminants::CardanoImmutableFilesFull, + signers_for_immutables, + ) + .await + .unwrap(); + + comment!("The state machine should not issue a certificate for the CardanoImmutableFilesFull"); + cycle!(tester, "ready"); + assert_last_certificate_eq!( + tester, + ExpectedCertificate::new( + Beacon::new("devnet".to_string(), 1, 2), + StakeDistributionParty::from_signers(fixture.signers_with_stake()).as_slice(), + fixture.compute_and_encode_avk(), + SignedEntityType::MithrilStakeDistribution(Epoch(1)), + ExpectedCertificate::genesis_identifier(&Beacon::new("devnet".to_string(), 1, 1)), + ) + ); + + comment!("Increase the immutable file number"); + tester.increase_immutable_number().await.unwrap(); + cycle!(tester, "signing"); + + comment!("The state machine should get back to signing to sign CardanoImmutableFilesFull"); + let signers_for_immutables = &fixture.signers_fixture()[0..=6]; + tester + .send_single_signatures( + SignedEntityTypeDiscriminants::CardanoImmutableFilesFull, + signers_for_immutables, + ) + .await + .unwrap(); + + comment!("The state machine should issue a certificate for the CardanoImmutableFilesFull"); + cycle!(tester, "ready"); + assert_last_certificate_eq!( + tester, + ExpectedCertificate::new( + Beacon::new("devnet".to_string(), 1, 4), + &signers_for_immutables + .iter() + .map(|s| s.signer_with_stake.clone().into()) + .collect::>(), + fixture.compute_and_encode_avk(), + SignedEntityType::CardanoImmutableFilesFull(Beacon::new("devnet".to_string(), 1, 4)), + ExpectedCertificate::genesis_identifier(&Beacon::new("devnet".to_string(), 1, 1)), + ) + ); +} diff --git a/mithril-aggregator/tests/test_extensions/runtime_tester.rs b/mithril-aggregator/tests/test_extensions/runtime_tester.rs index 0791a4bedd8..553b1318799 100644 --- a/mithril-aggregator/tests/test_extensions/runtime_tester.rs +++ b/mithril-aggregator/tests/test_extensions/runtime_tester.rs @@ -1,5 +1,6 @@ use anyhow::{anyhow, Context}; -use mithril_aggregator::database::provider::SignedEntityRecord; +use chrono::Utc; +use mithril_aggregator::database::provider::{OpenMessageRepository, SignedEntityRecord}; use mithril_aggregator::{ dependency_injection::DependenciesBuilder, event_store::EventMessage, AggregatorRuntime, Configuration, DependencyContainer, DumbSnapshotUploader, DumbSnapshotter, @@ -22,6 +23,7 @@ use mithril_common::{ use slog::Drain; use slog_scope::debug; use std::sync::Arc; +use std::time::Duration; use tokio::sync::mpsc::UnboundedReceiver; use crate::test_extensions::{AggregatorObserver, ExpectedCertificate}; @@ -65,6 +67,7 @@ pub struct RuntimeTester { pub receiver: UnboundedReceiver, pub era_reader_adapter: Arc, pub observer: Arc, + pub open_message_repository: Arc, _logs_guard: slog_scope::GlobalLoggerGuard, } @@ -104,6 +107,7 @@ impl RuntimeTester { let runtime = deps_builder.create_aggregator_runner().await.unwrap(); let receiver = deps_builder.get_event_transmitter_receiver().await.unwrap(); let observer = Arc::new(AggregatorObserver::new(&mut deps_builder).await); + let open_message_repository = deps_builder.get_open_message_repository().await.unwrap(); Self { snapshot_uploader, @@ -117,6 +121,7 @@ impl RuntimeTester { receiver, era_reader_adapter, observer, + open_message_repository, _logs_guard: logger, } } @@ -360,6 +365,31 @@ impl RuntimeTester { .await; } + /// Activate open message expiration + pub async fn activate_open_message_expiration( + &self, + discriminant: SignedEntityTypeDiscriminants, + timeout: Duration, + ) -> StdResult<()> { + let signed_entity_type = self + .observer + .get_current_signed_entity_type(discriminant) + .await?; + let mut open_message = self + .open_message_repository + .get_open_message(&signed_entity_type) + .await + .with_context(|| "Querying open message should not fail")? + .ok_or(anyhow!("An open message should exist"))?; + open_message.expires_at = Some(Utc::now() + timeout); + self.open_message_repository + .update_open_message(&open_message) + .await + .with_context(|| "Saving open message should not fail")?; + + Ok(()) + } + /// Update the Era markers pub async fn set_era_markers(&self, markers: Vec) { self.era_reader_adapter.set_markers(markers)