Skip to content

Commit

Permalink
Add aggregator integration test for open message expiration
Browse files Browse the repository at this point in the history
  • Loading branch information
jpraynaud committed Dec 15, 2023
1 parent 8674e03 commit 6290cbd
Show file tree
Hide file tree
Showing 4 changed files with 215 additions and 5 deletions.
24 changes: 21 additions & 3 deletions mithril-aggregator/src/dependency_injection/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ pub struct DependenciesBuilder {
/// Certificate repository.
pub certificate_repository: Option<Arc<CertificateRepository>>,

/// Open message repository.
pub open_message_repository: Option<Arc<OpenMessageRepository>>,

/// Verification key store.
pub verification_key_store: Option<Arc<dyn VerificationKeyStorer>>,

Expand Down Expand Up @@ -205,6 +208,7 @@ impl DependenciesBuilder {
multi_signer: None,
certificate_pending_store: None,
certificate_repository: None,
open_message_repository: None,
verification_key_store: None,
protocol_parameters_store: None,
cardano_cli_runner: None,
Expand Down Expand Up @@ -430,6 +434,21 @@ impl DependenciesBuilder {
Ok(self.certificate_repository.as_ref().cloned().unwrap())
}

async fn build_open_message_repository(&mut self) -> Result<Arc<OpenMessageRepository>> {
Ok(Arc::new(OpenMessageRepository::new(
self.get_sqlite_connection().await?,
)))
}

/// Get a configured [OpenMessageRepository].
pub async fn get_open_message_repository(&mut self) -> Result<Arc<OpenMessageRepository>> {
if self.open_message_repository.is_none() {
self.open_message_repository = Some(self.build_open_message_repository().await?);
}

Ok(self.open_message_repository.as_ref().cloned().unwrap())
}

async fn build_verification_key_store(&mut self) -> Result<Arc<dyn VerificationKeyStorer>> {
Ok(Arc::new(SignerRegistrationStore::new(
self.get_sqlite_connection().await?,
Expand Down Expand Up @@ -1015,6 +1034,7 @@ impl DependenciesBuilder {
multi_signer: self.get_multi_signer().await?,
certificate_pending_store: self.get_certificate_pending_store().await?,
certificate_repository: self.get_certificate_repository().await?,
open_message_repository: self.get_open_message_repository().await?,
verification_key_store: self.get_verification_key_store().await?,
protocol_parameters_store: self.get_protocol_parameters_store().await?,
chain_observer: self.get_chain_observer().await?,
Expand Down Expand Up @@ -1189,9 +1209,7 @@ impl DependenciesBuilder {

/// Create [CertifierService] service
pub async fn build_certifier_service(&mut self) -> Result<Arc<dyn CertifierService>> {
let open_message_repository = Arc::new(OpenMessageRepository::new(
self.get_sqlite_connection().await?,
));
let open_message_repository = self.get_open_message_repository().await?;
let single_signature_repository = Arc::new(SingleSignatureRepository::new(
self.get_sqlite_connection().await?,
));
Expand Down
8 changes: 7 additions & 1 deletion mithril-aggregator/src/dependency_injection/containers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use mithril_common::{
BeaconProvider,
};

use crate::services::{EpochService, MessageService};
use crate::{
configuration::*,
database::provider::{CertificateRepository, SignedEntityStorer, SignerGetter, StakePoolStore},
Expand All @@ -28,6 +27,10 @@ use crate::{
CertificatePendingStore, ProtocolParametersStorer, SignerRegisterer,
SignerRegistrationRoundOpener, Snapshotter, VerificationKeyStorer,
};
use crate::{
database::provider::OpenMessageRepository,
services::{EpochService, MessageService},
};

/// MultiSignerWrapper wraps a [MultiSigner]
pub type MultiSignerWrapper = Arc<RwLock<dyn MultiSigner>>;
Expand Down Expand Up @@ -61,6 +64,9 @@ pub struct DependencyContainer {
/// Certificate store.
pub certificate_repository: Arc<CertificateRepository>,

/// Open message store.
pub open_message_repository: Arc<OpenMessageRepository>,

/// Verification key store.
pub verification_key_store: Arc<dyn VerificationKeyStorer>,

Expand Down
156 changes: 156 additions & 0 deletions mithril-aggregator/tests/open_message_expiration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
mod test_extensions;

use std::time::Duration;

use mithril_aggregator::Configuration;
use mithril_common::{
entities::{
Beacon, Epoch, ProtocolParameters, SignedEntityType, SignedEntityTypeDiscriminants,
StakeDistributionParty,
},
test_utils::MithrilFixtureBuilder,
};
use test_extensions::{utilities::get_test_dir, ExpectedCertificate, RuntimeTester};

#[tokio::test]
async fn open_message_expiration() {
let protocol_parameters = ProtocolParameters {
k: 5,
m: 150,
phi_f: 0.95,
};
let configuration = Configuration {
protocol_parameters: protocol_parameters.clone(),
data_stores_directory: get_test_dir("create_certificate").join("aggregator.sqlite3"),
..Configuration::new_sample()
};
let mut tester =
RuntimeTester::build(Beacon::new("devnet".to_string(), 1, 1), configuration).await;

comment!("create signers & declare stake distribution");
let fixture = MithrilFixtureBuilder::default()
.with_signers(10)
.with_protocol_parameters(protocol_parameters.clone())
.build();

tester.init_state_from_fixture(&fixture).await.unwrap();

comment!("Boostrap the genesis certificate");
tester.register_genesis_certificate(&fixture).await.unwrap();

assert_last_certificate_eq!(
tester,
ExpectedCertificate::new_genesis(
Beacon::new("devnet".to_string(), 1, 1),
fixture.compute_and_encode_avk()
)
);

comment!("Increase immutable number");
tester.increase_immutable_number().await.unwrap();

comment!("start the runtime state machine");
cycle!(tester, "ready");
cycle!(tester, "signing");

comment!("register signers");
tester
.register_signers(&fixture.signers_fixture())
.await
.unwrap();
cycle_err!(tester, "signing");

comment!("signers send their single signature");
tester
.send_single_signatures(
SignedEntityTypeDiscriminants::MithrilStakeDistribution,
&fixture.signers_fixture(),
)
.await
.unwrap();

comment!("The state machine should issue a certificate for the MithrilStakeDistribution");
cycle!(tester, "ready");
assert_last_certificate_eq!(
tester,
ExpectedCertificate::new(
Beacon::new("devnet".to_string(), 1, 2),
StakeDistributionParty::from_signers(fixture.signers_with_stake()).as_slice(),
fixture.compute_and_encode_avk(),
SignedEntityType::MithrilStakeDistribution(Epoch(1)),
ExpectedCertificate::genesis_identifier(&Beacon::new("devnet".to_string(), 1, 1)),
)
);

comment!("The state machine should get back to signing to sign CardanoImmutableFilesFull");
// todo!: remove this immutable increase:
// right now because we only have one state machine for all signed entity type we need it else
// the state machine will stay in the idle state since its beacon didn't change.
// With one state machine per signed entity type this problem will disappear.
tester.increase_immutable_number().await.unwrap();
cycle!(tester, "signing");

comment!(
"Schedule the open message for CardanoImmutableFilesFull to expire, and wait until it does"
);
let open_message_timeout = Duration::from_millis(100);
tester
.activate_open_message_expiration(
SignedEntityTypeDiscriminants::CardanoImmutableFilesFull,
open_message_timeout,
)
.await
.unwrap();
tokio::time::sleep(2 * open_message_timeout).await;
let signers_for_immutables = &fixture.signers_fixture()[0..=6];
tester
.send_single_signatures(
SignedEntityTypeDiscriminants::CardanoImmutableFilesFull,
signers_for_immutables,
)
.await
.unwrap();

comment!("The state machine should not issue a certificate for the CardanoImmutableFilesFull");
cycle!(tester, "ready");
assert_last_certificate_eq!(
tester,
ExpectedCertificate::new(
Beacon::new("devnet".to_string(), 1, 2),
StakeDistributionParty::from_signers(fixture.signers_with_stake()).as_slice(),
fixture.compute_and_encode_avk(),
SignedEntityType::MithrilStakeDistribution(Epoch(1)),
ExpectedCertificate::genesis_identifier(&Beacon::new("devnet".to_string(), 1, 1)),
)
);

comment!("Increase the immutable file number");
tester.increase_immutable_number().await.unwrap();
cycle!(tester, "signing");

comment!("The state machine should get back to signing to sign CardanoImmutableFilesFull");
let signers_for_immutables = &fixture.signers_fixture()[0..=6];
tester
.send_single_signatures(
SignedEntityTypeDiscriminants::CardanoImmutableFilesFull,
signers_for_immutables,
)
.await
.unwrap();

comment!("The state machine should issue a certificate for the CardanoImmutableFilesFull");
cycle!(tester, "ready");
assert_last_certificate_eq!(
tester,
ExpectedCertificate::new(
Beacon::new("devnet".to_string(), 1, 4),
&signers_for_immutables
.iter()
.map(|s| s.signer_with_stake.clone().into())
.collect::<Vec<_>>(),
fixture.compute_and_encode_avk(),
SignedEntityType::CardanoImmutableFilesFull(Beacon::new("devnet".to_string(), 1, 4)),
ExpectedCertificate::genesis_identifier(&Beacon::new("devnet".to_string(), 1, 1)),
)
);
}
32 changes: 31 additions & 1 deletion mithril-aggregator/tests/test_extensions/runtime_tester.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use anyhow::{anyhow, Context};
use mithril_aggregator::database::provider::SignedEntityRecord;
use chrono::Utc;
use mithril_aggregator::database::provider::{OpenMessageRepository, SignedEntityRecord};
use mithril_aggregator::{
dependency_injection::DependenciesBuilder, event_store::EventMessage, AggregatorRuntime,
Configuration, DependencyContainer, DumbSnapshotUploader, DumbSnapshotter,
Expand All @@ -22,6 +23,7 @@ use mithril_common::{
use slog::Drain;
use slog_scope::debug;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::UnboundedReceiver;

use crate::test_extensions::{AggregatorObserver, ExpectedCertificate};
Expand Down Expand Up @@ -65,6 +67,7 @@ pub struct RuntimeTester {
pub receiver: UnboundedReceiver<EventMessage>,
pub era_reader_adapter: Arc<EraReaderDummyAdapter>,
pub observer: Arc<AggregatorObserver>,
pub open_message_repository: Arc<OpenMessageRepository>,
_logs_guard: slog_scope::GlobalLoggerGuard,
}

Expand Down Expand Up @@ -104,6 +107,7 @@ impl RuntimeTester {
let runtime = deps_builder.create_aggregator_runner().await.unwrap();
let receiver = deps_builder.get_event_transmitter_receiver().await.unwrap();
let observer = Arc::new(AggregatorObserver::new(&mut deps_builder).await);
let open_message_repository = deps_builder.get_open_message_repository().await.unwrap();

Self {
snapshot_uploader,
Expand All @@ -117,6 +121,7 @@ impl RuntimeTester {
receiver,
era_reader_adapter,
observer,
open_message_repository,
_logs_guard: logger,
}
}
Expand Down Expand Up @@ -360,6 +365,31 @@ impl RuntimeTester {
.await;
}

/// Activate open message expiration
pub async fn activate_open_message_expiration(
&self,
discriminant: SignedEntityTypeDiscriminants,
timeout: Duration,
) -> StdResult<()> {
let signed_entity_type = self
.observer
.get_current_signed_entity_type(discriminant)
.await?;
let mut open_message = self
.open_message_repository
.get_open_message(&signed_entity_type)
.await
.with_context(|| "Querying open message should not fail")?
.ok_or(anyhow!("An open message should exist"))?;
open_message.expires_at = Some(Utc::now() + timeout);
self.open_message_repository
.update_open_message(&open_message)
.await
.with_context(|| "Saving open message should not fail")?;

Ok(())
}

/// Update the Era markers
pub async fn set_era_markers(&self, markers: Vec<EraMarker>) {
self.era_reader_adapter.set_markers(markers)
Expand Down

0 comments on commit 6290cbd

Please sign in to comment.