diff --git a/CHANGELOG.md b/CHANGELOG.md index dbc13622a95..948ac003f61 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,8 @@ As a minor extension, we have adopted a slightly different versioning convention - Optimizations of the state machine used by the signer to create individual signatures. +- Support for buffering of incoming single signatures by the aggregator if it can not aggregate them yet + - Crates versions: | Crate | Version | diff --git a/Cargo.lock b/Cargo.lock index 2ec3c56289e..272a3ac21fa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3403,7 +3403,7 @@ dependencies = [ [[package]] name = "mithril-aggregator" -version = "0.5.63" +version = "0.5.64" dependencies = [ "anyhow", "async-trait", @@ -3559,7 +3559,7 @@ dependencies = [ [[package]] name = "mithril-common" -version = "0.4.51" +version = "0.4.52" dependencies = [ "anyhow", "async-trait", @@ -3704,7 +3704,7 @@ dependencies = [ [[package]] name = "mithril-signer" -version = "0.2.183" +version = "0.2.184" dependencies = [ "anyhow", "async-trait", diff --git a/mithril-aggregator/Cargo.toml b/mithril-aggregator/Cargo.toml index ac7e9395526..36489257b7a 100644 --- a/mithril-aggregator/Cargo.toml +++ b/mithril-aggregator/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mithril-aggregator" -version = "0.5.63" +version = "0.5.64" description = "A Mithril Aggregator server" authors = { workspace = true } edition = { workspace = true } diff --git a/mithril-aggregator/src/database/migration.rs b/mithril-aggregator/src/database/migration.rs index 3e17a951ee1..5608437f718 100644 --- a/mithril-aggregator/src/database/migration.rs +++ b/mithril-aggregator/src/database/migration.rs @@ -758,6 +758,23 @@ pragma foreign_keys=true; 26, r#" create unique index signed_entity_unique_index on signed_entity(signed_entity_type_id, beacon); +"#, + ), + // Migration 27 + SqlMigration::new( + 27, + r#" +create table buffered_single_signature ( + signed_entity_type_id integer not null, + party_id text not null, + lottery_indexes json not null, + signature text not null, + created_at text not null, + primary key (signed_entity_type_id, party_id) +); + +create index buffered_single_signature_signed_entity_type_id on buffered_single_signature(signed_entity_type_id); +create index buffered_single_signature_party_id_index on buffered_single_signature(party_id); "#, ), ] diff --git a/mithril-aggregator/src/database/query/buffered_single_signature/delete_buffered_single_signature.rs b/mithril-aggregator/src/database/query/buffered_single_signature/delete_buffered_single_signature.rs new file mode 100644 index 00000000000..f578f4814b6 --- /dev/null +++ b/mithril-aggregator/src/database/query/buffered_single_signature/delete_buffered_single_signature.rs @@ -0,0 +1,119 @@ +use sqlite::Value; + +use mithril_common::entities::{PartyId, SignedEntityTypeDiscriminants}; +use mithril_persistence::sqlite::{Query, SourceAlias, SqLiteEntity, WhereCondition}; + +use crate::database::record::BufferedSingleSignatureRecord; + +/// Query to delete old [BufferedSingleSignatureRecord] from the sqlite database +pub struct DeleteBufferedSingleSignatureQuery { + condition: WhereCondition, +} + +impl DeleteBufferedSingleSignatureQuery { + pub fn by_discriminant_and_party_ids( + signed_entity_type_discriminant: SignedEntityTypeDiscriminants, + party_ids: Vec, + ) -> Self { + let ids_values = party_ids.into_iter().map(Value::String).collect(); + + Self { + condition: WhereCondition::new( + "signed_entity_type_id = ?*", + vec![Value::Integer( + signed_entity_type_discriminant.index() as i64 + )], + ) + .and_where(WhereCondition::where_in("party_id", ids_values)), + } + } +} + +impl Query for DeleteBufferedSingleSignatureQuery { + type Entity = BufferedSingleSignatureRecord; + + fn filters(&self) -> WhereCondition { + self.condition.clone() + } + + fn get_definition(&self, condition: &str) -> String { + // it is important to alias the fields with the same name as the table + // since the table cannot be aliased in a RETURNING statement in SQLite. + let projection = Self::Entity::get_projection().expand(SourceAlias::new(&[( + "{:buffered_single_signature:}", + "buffered_single_signature", + )])); + + format!("delete from buffered_single_signature where {condition} returning {projection}") + } +} + +#[cfg(test)] +mod tests { + use mithril_common::entities::SignedEntityTypeDiscriminants::{ + CardanoTransactions, MithrilStakeDistribution, + }; + use mithril_persistence::sqlite::ConnectionExtensions; + + use crate::database::query::GetBufferedSingleSignatureQuery; + use crate::database::record::strip_buffered_sigs_date; + use crate::database::test_helper::{insert_buffered_single_signatures, main_db_connection}; + + use super::*; + + #[test] + fn test_delete_buffered_single_signature_records_by_discriminant_and_party_ids() { + let connection = main_db_connection().unwrap(); + let records = BufferedSingleSignatureRecord::fakes(&[ + ("party_1", MithrilStakeDistribution), + ("party_2", MithrilStakeDistribution), + ("party_3", MithrilStakeDistribution), + ("party_1", CardanoTransactions), + ("party_2", CardanoTransactions), + ]); + insert_buffered_single_signatures(&connection, records.clone()).unwrap(); + + let cursor = connection + .fetch( + DeleteBufferedSingleSignatureQuery::by_discriminant_and_party_ids( + MithrilStakeDistribution, + vec!["party_1".into(), "party_3".into()], + ), + ) + .unwrap(); + assert_eq!(2, cursor.count()); + + let remaining_records: Vec = connection + .fetch_collect(GetBufferedSingleSignatureQuery::all()) + .unwrap(); + assert_eq!( + strip_buffered_sigs_date(&BufferedSingleSignatureRecord::fakes(&[ + ("party_2", CardanoTransactions), + ("party_1", CardanoTransactions), + ("party_2", MithrilStakeDistribution), + ])), + strip_buffered_sigs_date(&remaining_records) + ); + + let cursor = connection + .fetch( + DeleteBufferedSingleSignatureQuery::by_discriminant_and_party_ids( + CardanoTransactions, + vec!["party_1".into(), "party_2".into()], + ), + ) + .unwrap(); + assert_eq!(2, cursor.count()); + + let remaining_records: Vec = connection + .fetch_collect(GetBufferedSingleSignatureQuery::all()) + .unwrap(); + assert_eq!( + strip_buffered_sigs_date(&BufferedSingleSignatureRecord::fakes(&[( + "party_2", + MithrilStakeDistribution + ),])), + strip_buffered_sigs_date(&remaining_records) + ); + } +} diff --git a/mithril-aggregator/src/database/query/buffered_single_signature/get_buffered_single_signature.rs b/mithril-aggregator/src/database/query/buffered_single_signature/get_buffered_single_signature.rs new file mode 100644 index 00000000000..7365e715a3f --- /dev/null +++ b/mithril-aggregator/src/database/query/buffered_single_signature/get_buffered_single_signature.rs @@ -0,0 +1,119 @@ +use sqlite::Value; + +use mithril_common::entities::SignedEntityTypeDiscriminants; +use mithril_persistence::sqlite::{Query, SourceAlias, SqLiteEntity, WhereCondition}; + +use crate::database::record::BufferedSingleSignatureRecord; + +/// Simple queries to retrieve [BufferedSingleSignatureRecord] from the sqlite database. +pub struct GetBufferedSingleSignatureQuery { + condition: WhereCondition, +} + +impl GetBufferedSingleSignatureQuery { + #[cfg(test)] + pub(crate) fn all() -> Self { + Self { + condition: WhereCondition::default(), + } + } + + pub fn by_discriminant(signed_entity_type_discriminant: SignedEntityTypeDiscriminants) -> Self { + Self { + condition: WhereCondition::new( + "signed_entity_type_id = ?*", + vec![Value::Integer( + signed_entity_type_discriminant.index() as i64 + )], + ), + } + } +} + +impl Query for GetBufferedSingleSignatureQuery { + type Entity = BufferedSingleSignatureRecord; + + fn filters(&self) -> WhereCondition { + self.condition.clone() + } + + fn get_definition(&self, condition: &str) -> String { + let aliases = SourceAlias::new(&[("{:buffered_single_signature:}", "b")]); + let projection = Self::Entity::get_projection().expand(aliases); + format!("select {projection} from buffered_single_signature as b where {condition} order by ROWID desc") + } +} + +#[cfg(test)] +mod tests { + use mithril_common::entities::SignedEntityTypeDiscriminants::{ + CardanoImmutableFilesFull, CardanoTransactions, MithrilStakeDistribution, + }; + use mithril_persistence::sqlite::ConnectionExtensions; + + use crate::database::test_helper::{insert_buffered_single_signatures, main_db_connection}; + + use super::*; + + #[test] + fn test_get_all() { + let connection = main_db_connection().unwrap(); + let records = BufferedSingleSignatureRecord::fakes(&[ + ("party1", MithrilStakeDistribution), + ("party2", CardanoTransactions), + ("party3", MithrilStakeDistribution), + ]); + insert_buffered_single_signatures(&connection, records.clone()).unwrap(); + + let stored_records: Vec = connection + .fetch_collect(GetBufferedSingleSignatureQuery::all()) + .unwrap(); + + assert_eq!( + records.into_iter().rev().collect::>(), + stored_records + ); + } + + #[test] + fn test_get_buffered_single_signature_records_by_discriminant() { + let connection = main_db_connection().unwrap(); + let msd_records = BufferedSingleSignatureRecord::fakes(&[ + ("party1", MithrilStakeDistribution), + ("party2", MithrilStakeDistribution), + ]); + let ctx_records = BufferedSingleSignatureRecord::fakes(&[("party3", CardanoTransactions)]); + insert_buffered_single_signatures( + &connection, + [msd_records.clone(), ctx_records.clone()].concat(), + ) + .unwrap(); + + let stored_msd_records: Vec = connection + .fetch_collect(GetBufferedSingleSignatureQuery::by_discriminant( + MithrilStakeDistribution, + )) + .unwrap(); + assert_eq!( + msd_records.into_iter().rev().collect::>(), + stored_msd_records + ); + + let stored_ctx_records: Vec = connection + .fetch_collect(GetBufferedSingleSignatureQuery::by_discriminant( + CardanoTransactions, + )) + .unwrap(); + assert_eq!( + ctx_records.into_iter().rev().collect::>(), + stored_ctx_records + ); + + let cursor = connection + .fetch(GetBufferedSingleSignatureQuery::by_discriminant( + CardanoImmutableFilesFull, + )) + .unwrap(); + assert_eq!(0, cursor.count()); + } +} diff --git a/mithril-aggregator/src/database/query/buffered_single_signature/insert_or_replace_buffered_single_signature.rs b/mithril-aggregator/src/database/query/buffered_single_signature/insert_or_replace_buffered_single_signature.rs new file mode 100644 index 00000000000..abcc04a082f --- /dev/null +++ b/mithril-aggregator/src/database/query/buffered_single_signature/insert_or_replace_buffered_single_signature.rs @@ -0,0 +1,170 @@ +use sqlite::Value; + +use mithril_persistence::sqlite::{Query, SourceAlias, SqLiteEntity, WhereCondition}; + +use crate::database::record::BufferedSingleSignatureRecord; + +/// Query to insert or replace [BufferedSingleSignatureRecord] in the sqlite database +pub struct InsertOrReplaceBufferedSingleSignatureRecordQuery { + condition: WhereCondition, +} + +impl InsertOrReplaceBufferedSingleSignatureRecordQuery { + pub fn one(record: BufferedSingleSignatureRecord) -> Self { + let condition = + WhereCondition::new( + "(signed_entity_type_id, party_id, lottery_indexes, signature, created_at) values (?*, ?*, ?*, ?*, ?*)", + vec![ + Value::Integer(record.signed_entity_type_id.index() as i64), + Value::String(record.party_id), + Value::String(serde_json::to_string(&record.lottery_indexes).unwrap()), + Value::String(record.signature), + Value::String(record.created_at.to_rfc3339()), + ], + ); + + Self { condition } + } +} + +impl Query for InsertOrReplaceBufferedSingleSignatureRecordQuery { + type Entity = BufferedSingleSignatureRecord; + + fn filters(&self) -> WhereCondition { + self.condition.clone() + } + + fn get_definition(&self, condition: &str) -> String { + // it is important to alias the fields with the same name as the table + // since the table cannot be aliased in a RETURNING statement in SQLite. + let projection = Self::Entity::get_projection().expand(SourceAlias::new(&[( + "{:buffered_single_signature:}", + "buffered_single_signature", + )])); + + format!( + "insert or replace into buffered_single_signature {condition} returning {projection}" + ) + } +} + +#[cfg(test)] +mod tests { + use chrono::{Duration, Utc}; + use mithril_common::entities::SignedEntityTypeDiscriminants::{ + CardanoImmutableFilesFull, CardanoStakeDistribution, + }; + use mithril_persistence::sqlite::ConnectionExtensions; + + use crate::database::query::GetBufferedSingleSignatureQuery; + use crate::database::test_helper::main_db_connection; + + use super::*; + + #[test] + fn insert_records_in_empty_db() { + let connection = main_db_connection().unwrap(); + + let record = BufferedSingleSignatureRecord::fake("party_8", CardanoImmutableFilesFull); + let inserted_record = connection + .fetch_first(InsertOrReplaceBufferedSingleSignatureRecordQuery::one( + record.clone(), + )) + .unwrap(); + + assert_eq!(Some(record), inserted_record); + } + + #[test] + fn allow_to_insert_record_with_different_party_id_and_discriminant_but_different_signature() { + let connection = main_db_connection().unwrap(); + + let record = BufferedSingleSignatureRecord::fake("party_8", CardanoImmutableFilesFull); + let other_record = BufferedSingleSignatureRecord { + party_id: "party_10".to_string(), + signed_entity_type_id: CardanoStakeDistribution, + ..record.clone() + }; + + connection + .fetch_first(InsertOrReplaceBufferedSingleSignatureRecordQuery::one( + record, + )) + .unwrap(); + connection + .fetch_first(InsertOrReplaceBufferedSingleSignatureRecordQuery::one( + other_record, + )) + .unwrap(); + + let count = connection + .fetch(GetBufferedSingleSignatureQuery::all()) + .unwrap() + .count(); + assert_eq!(2, count); + } + + #[test] + fn inserting_same_record_twice_should_replace_first_insert() { + let connection = main_db_connection().unwrap(); + + let record = BufferedSingleSignatureRecord::fake("party_8", CardanoImmutableFilesFull); + + connection + .fetch_first(InsertOrReplaceBufferedSingleSignatureRecordQuery::one( + record.clone(), + )) + .unwrap(); + let inserted_record = connection + .fetch_first(InsertOrReplaceBufferedSingleSignatureRecordQuery::one( + record.clone(), + )) + .unwrap(); + + assert_eq!(Some(record), inserted_record); + } + + #[test] + fn inserting_record_with_same_party_id_and_discriminant_replace_previous_record() { + let connection = main_db_connection().unwrap(); + + let record = BufferedSingleSignatureRecord { + party_id: "party_15".to_string(), + signed_entity_type_id: CardanoStakeDistribution, + lottery_indexes: vec![1, 2, 3], + signature: "a signature".to_string(), + created_at: Utc::now(), + }; + connection + .fetch_first(InsertOrReplaceBufferedSingleSignatureRecordQuery::one( + record.clone(), + )) + .unwrap(); + let count = connection + .fetch(GetBufferedSingleSignatureQuery::all()) + .unwrap() + .count(); + + assert_eq!(1, count); + + let updated_record = BufferedSingleSignatureRecord { + party_id: record.party_id.clone(), + signed_entity_type_id: record.signed_entity_type_id, + lottery_indexes: vec![7, 8, 9], + signature: "another signature".to_string(), + created_at: Utc::now() + Duration::minutes(18), + }; + let replaced_record = connection + .fetch_first(InsertOrReplaceBufferedSingleSignatureRecordQuery::one( + updated_record.clone(), + )) + .unwrap(); + let count = connection + .fetch(GetBufferedSingleSignatureQuery::all()) + .unwrap() + .count(); + + assert_eq!(Some(updated_record), replaced_record); + assert_eq!(1, count); + } +} diff --git a/mithril-aggregator/src/database/query/buffered_single_signature/mod.rs b/mithril-aggregator/src/database/query/buffered_single_signature/mod.rs new file mode 100644 index 00000000000..6a63cb371e3 --- /dev/null +++ b/mithril-aggregator/src/database/query/buffered_single_signature/mod.rs @@ -0,0 +1,7 @@ +mod delete_buffered_single_signature; +mod get_buffered_single_signature; +mod insert_or_replace_buffered_single_signature; + +pub use delete_buffered_single_signature::*; +pub use get_buffered_single_signature::*; +pub use insert_or_replace_buffered_single_signature::*; diff --git a/mithril-aggregator/src/database/query/mod.rs b/mithril-aggregator/src/database/query/mod.rs index 9eab281ce42..c5010c32ac1 100644 --- a/mithril-aggregator/src/database/query/mod.rs +++ b/mithril-aggregator/src/database/query/mod.rs @@ -1,4 +1,5 @@ //! Aggregator related database queries +mod buffered_single_signature; mod certificate; mod epoch_setting; mod open_message; @@ -8,6 +9,7 @@ mod signer_registration; mod single_signature; mod stake_pool; +pub use buffered_single_signature::*; pub use certificate::*; pub use epoch_setting::*; pub use open_message::*; diff --git a/mithril-aggregator/src/database/record/buffered_single_signature_record.rs b/mithril-aggregator/src/database/record/buffered_single_signature_record.rs new file mode 100644 index 00000000000..8207a682ebd --- /dev/null +++ b/mithril-aggregator/src/database/record/buffered_single_signature_record.rs @@ -0,0 +1,212 @@ +use chrono::{DateTime, Utc}; + +use mithril_common::entities::{ + HexEncodedSingleSignature, LotteryIndex, SignedEntityTypeDiscriminants, SingleSignatures, +}; +use mithril_common::{StdError, StdResult}; +use mithril_persistence::sqlite::{HydrationError, Projection, SqLiteEntity}; + +/// `BufferedSingleSignatureRecord` record is the representation of a buffered single_signature +/// that may be used for upcoming open messages. +#[derive(Debug, PartialEq, Clone)] +pub struct BufferedSingleSignatureRecord { + /// Party id. + pub party_id: String, + + /// Signed entity type discriminant. + pub signed_entity_type_id: SignedEntityTypeDiscriminants, + + /// Lottery indexes + pub lottery_indexes: Vec, + + /// The STM single signature of the message + pub signature: HexEncodedSingleSignature, + + /// Date and time when the buffered single signature was created + pub created_at: DateTime, +} + +impl BufferedSingleSignatureRecord { + pub(crate) fn try_from_single_signatures( + other: &SingleSignatures, + signed_entity_id: SignedEntityTypeDiscriminants, + ) -> StdResult { + let record = BufferedSingleSignatureRecord { + signed_entity_type_id: signed_entity_id, + party_id: other.party_id.to_owned(), + lottery_indexes: other.won_indexes.to_owned(), + signature: other.signature.to_json_hex()?, + created_at: Utc::now(), + }; + + Ok(record) + } +} + +#[cfg(test)] +impl BufferedSingleSignatureRecord { + pub(crate) fn fake>( + party_id: T, + discriminant: SignedEntityTypeDiscriminants, + ) -> Self { + // Note: due to the unique constraint on the signature column, we want to make sure that + // the signatures are different for party_id/discriminant pairs. + // We can't just reuse fake_data::single_signatures as they are static. + Self::try_from_single_signatures( + &SingleSignatures::fake(party_id.into(), discriminant.to_string()), + discriminant, + ) + .unwrap() + } + + pub(crate) fn fakes + Clone>( + input: &[(T, SignedEntityTypeDiscriminants)], + ) -> Vec { + input + .iter() + .map(|(party_id, discriminants)| Self::fake(party_id.clone(), *discriminants)) + .collect() + } + + /// Returns a copy of the record with the date replaced by "1st of January 1970". + pub(crate) fn with_stripped_date(&self) -> Self { + Self { + created_at: DateTime::::default(), + ..self.clone() + } + } +} + +impl TryFrom for SingleSignatures { + type Error = StdError; + + fn try_from(value: BufferedSingleSignatureRecord) -> Result { + let signatures = SingleSignatures { + party_id: value.party_id, + won_indexes: value.lottery_indexes, + signature: value.signature.try_into()?, + authentication_status: Default::default(), + }; + + Ok(signatures) + } +} + +impl SqLiteEntity for BufferedSingleSignatureRecord { + fn hydrate(row: sqlite::Row) -> Result + where + Self: Sized, + { + let signed_entity_type_id = usize::try_from(row.read::(0)).map_err(|e| { + panic!("Integer field signed_entity_type_id cannot be turned into usize: {e}") + })?; + let party_id = row.read::<&str, _>(1).to_string(); + let lottery_indexes_str = row.read::<&str, _>(2); + let signature = row.read::<&str, _>(3).to_string(); + let created_at = row.read::<&str, _>(4); + + let record = Self { + party_id, + signed_entity_type_id: SignedEntityTypeDiscriminants::from_id(signed_entity_type_id).map_err( + |e| { + HydrationError::InvalidData(format!( + "Could not turn i64 ({signed_entity_type_id}) to SignedEntityTypeDiscriminants. Error: '{e}'" + )) + }, + )?, + lottery_indexes: serde_json::from_str(lottery_indexes_str).map_err(|e| { + HydrationError::InvalidData(format!( + "Could not turn string '{lottery_indexes_str}' to Vec. Error: {e}" + )) + })?, + signature, + created_at: DateTime::parse_from_rfc3339(created_at) + .map_err(|e| { + HydrationError::InvalidData(format!( + "Could not turn string '{created_at}' to rfc3339 Datetime. Error: {e}" + )) + })? + .with_timezone(&Utc), + }; + + Ok(record) + } + + fn get_projection() -> Projection { + let mut projection = Projection::default(); + projection.add_field( + "signed_entity_type_id", + "{:buffered_single_signature:}.signed_entity_type_id", + "integer", + ); + projection.add_field("party_id", "{:buffered_single_signature:}.party_id", "text"); + projection.add_field( + "lottery_indexes", + "{:buffered_single_signature:}.lottery_indexes", + "text", + ); + projection.add_field( + "signature", + "{:buffered_single_signature:}.signature", + "text", + ); + projection.add_field( + "created_at", + "{:buffered_single_signature:}.created_at", + "text", + ); + + projection + } +} + +/// Test only - strip the date from the given records to make them comparable. +#[cfg(test)] +pub(crate) fn strip_buffered_sigs_date( + records: &[BufferedSingleSignatureRecord], +) -> Vec { + records + .iter() + .map(BufferedSingleSignatureRecord::with_stripped_date) + .collect::>() +} + +#[cfg(test)] +mod tests { + use mithril_common::entities::SignedEntityTypeDiscriminants::{ + CardanoTransactions, MithrilStakeDistribution, + }; + use mithril_common::test_utils::fake_data; + + use super::*; + + #[test] + fn test_convert_single_signatures() { + let single_signature = fake_data::single_signatures(vec![1, 3, 4, 6, 7, 9]); + let single_signature_record = BufferedSingleSignatureRecord::try_from_single_signatures( + &single_signature, + CardanoTransactions, + ) + .unwrap(); + let single_signature_returned = single_signature_record.try_into().unwrap(); + + assert_eq!(single_signature, single_signature_returned); + } + + #[test] + fn building_fake_generate_different_protocol_single_signature() { + assert_eq!( + BufferedSingleSignatureRecord::fake("party_1", CardanoTransactions).signature, + BufferedSingleSignatureRecord::fake("party_1", CardanoTransactions).signature + ); + + assert_ne!( + BufferedSingleSignatureRecord::fake("party_1", CardanoTransactions).signature, + BufferedSingleSignatureRecord::fake("party_2", CardanoTransactions).signature + ); + assert_ne!( + BufferedSingleSignatureRecord::fake("party_1", CardanoTransactions).signature, + BufferedSingleSignatureRecord::fake("party_1", MithrilStakeDistribution).signature + ); + } +} diff --git a/mithril-aggregator/src/database/record/mod.rs b/mithril-aggregator/src/database/record/mod.rs index 3d3b78d8a97..7f65aedfcad 100644 --- a/mithril-aggregator/src/database/record/mod.rs +++ b/mithril-aggregator/src/database/record/mod.rs @@ -1,5 +1,6 @@ //! Aggregator related database records +mod buffered_single_signature_record; mod certificate; mod epoch_setting; mod open_message; @@ -10,6 +11,7 @@ mod signer_registration; mod single_signature; mod stake_pool; +pub use buffered_single_signature_record::*; pub use certificate::*; pub use epoch_setting::*; pub use open_message::*; diff --git a/mithril-aggregator/src/database/record/single_signature.rs b/mithril-aggregator/src/database/record/single_signature.rs index ae35a229336..c273e9dcd91 100644 --- a/mithril-aggregator/src/database/record/single_signature.rs +++ b/mithril-aggregator/src/database/record/single_signature.rs @@ -54,7 +54,7 @@ impl TryFrom for SingleSignatures { party_id: value.signer_id, won_indexes: value.lottery_indexes, signature: value.signature.try_into()?, - signed_message: None, + authentication_status: Default::default(), }; Ok(signatures) diff --git a/mithril-aggregator/src/database/repository/buffered_single_signature_repository.rs b/mithril-aggregator/src/database/repository/buffered_single_signature_repository.rs new file mode 100644 index 00000000000..e4265f1ab41 --- /dev/null +++ b/mithril-aggregator/src/database/repository/buffered_single_signature_repository.rs @@ -0,0 +1,298 @@ +use anyhow::Context; +use async_trait::async_trait; +use std::sync::Arc; + +use mithril_common::entities::{SignedEntityTypeDiscriminants, SingleSignatures}; +use mithril_common::{StdError, StdResult}; +use mithril_persistence::sqlite::{ConnectionExtensions, SqliteConnection}; + +use crate::database::query::{ + DeleteBufferedSingleSignatureQuery, GetBufferedSingleSignatureQuery, + InsertOrReplaceBufferedSingleSignatureRecordQuery, +}; +use crate::database::record::BufferedSingleSignatureRecord; +use crate::services::BufferedSingleSignatureStore; + +/// An implementation of [BufferedSingleSignatureStore] that uses a SQLite database. +pub struct BufferedSingleSignatureRepository { + connection: Arc, +} + +impl BufferedSingleSignatureRepository { + /// Creates a new [BufferedSingleSignatureRepository] instance. + pub fn new(connection_pool: Arc) -> Self { + Self { + connection: connection_pool, + } + } + + #[cfg(test)] + fn get_all(&self) -> StdResult> { + self.connection + .fetch_collect(GetBufferedSingleSignatureQuery::all()) + } + + fn get_by_discriminant( + &self, + signed_entity_type_discriminant: SignedEntityTypeDiscriminants, + ) -> StdResult> + where + T: TryFrom, + StdError: From, + { + let records: Vec = + self.connection + .fetch_collect(GetBufferedSingleSignatureQuery::by_discriminant( + signed_entity_type_discriminant, + ))?; + + let mut entities: Vec = Vec::with_capacity(records.len()); + for record in records { + entities.push(record.try_into()?); + } + + Ok(entities) + } +} + +#[async_trait] +impl BufferedSingleSignatureStore for BufferedSingleSignatureRepository { + async fn buffer_signature( + &self, + signed_entity_type_discriminant: SignedEntityTypeDiscriminants, + signature: &SingleSignatures, + ) -> StdResult<()> { + let record = BufferedSingleSignatureRecord::try_from_single_signatures( + signature, + signed_entity_type_discriminant, + ) + .with_context(|| "Failed to convert SingleSignatures to BufferedSingleSignatureRecord")?; + + self.connection + .fetch_first(InsertOrReplaceBufferedSingleSignatureRecordQuery::one( + record, + ))?; + + Ok(()) + } + + async fn get_buffered_signatures( + &self, + signed_entity_type_discriminant: SignedEntityTypeDiscriminants, + ) -> StdResult> { + self.get_by_discriminant(signed_entity_type_discriminant) + } + + async fn remove_buffered_signatures( + &self, + signed_entity_type_discriminant: SignedEntityTypeDiscriminants, + single_signatures: Vec, + ) -> StdResult<()> { + let signatures_party_ids = single_signatures.into_iter().map(|s| s.party_id).collect(); + self.connection.fetch_first( + DeleteBufferedSingleSignatureQuery::by_discriminant_and_party_ids( + signed_entity_type_discriminant, + signatures_party_ids, + ), + )?; + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use mithril_common::entities::SignedEntityTypeDiscriminants::{ + CardanoTransactions, MithrilStakeDistribution, + }; + use mithril_common::test_utils::fake_keys; + + use crate::database::record::{strip_buffered_sigs_date, BufferedSingleSignatureRecord}; + use crate::database::test_helper::{insert_buffered_single_signatures, main_db_connection}; + + use super::*; + + #[test] + fn retrieve_all() { + let connection = main_db_connection().unwrap(); + insert_buffered_single_signatures( + &connection, + BufferedSingleSignatureRecord::fakes(&[ + ("party1", CardanoTransactions), + ("party2", CardanoTransactions), + ("party3", MithrilStakeDistribution), + ]), + ) + .unwrap(); + + let store = BufferedSingleSignatureRepository::new(Arc::new(connection)); + + let buffered_signatures_ctx = store.get_all().unwrap(); + assert_eq!( + strip_buffered_sigs_date(&BufferedSingleSignatureRecord::fakes(&[ + ("party3", MithrilStakeDistribution), + ("party2", CardanoTransactions), + ("party1", CardanoTransactions), + ])), + strip_buffered_sigs_date(&buffered_signatures_ctx) + ); + } + + #[tokio::test] + async fn retrieve_signatures_by_discriminant() { + let connection = main_db_connection().unwrap(); + insert_buffered_single_signatures( + &connection, + BufferedSingleSignatureRecord::fakes(&[ + ("party1", CardanoTransactions), + ("party2", CardanoTransactions), + ("party3", MithrilStakeDistribution), + ]), + ) + .unwrap(); + + let store = BufferedSingleSignatureRepository::new(Arc::new(connection)); + + let buffered_signatures_ctx = store + .get_by_discriminant::(CardanoTransactions) + .unwrap(); + assert_eq!( + strip_buffered_sigs_date(&BufferedSingleSignatureRecord::fakes(&[ + ("party2", CardanoTransactions), + ("party1", CardanoTransactions), + ])), + strip_buffered_sigs_date(&buffered_signatures_ctx) + ); + + let buffered_signatures_msd = store + .get_by_discriminant::(MithrilStakeDistribution) + .unwrap(); + assert_eq!( + strip_buffered_sigs_date(&BufferedSingleSignatureRecord::fakes(&[( + "party3", + MithrilStakeDistribution + ),])), + strip_buffered_sigs_date(&buffered_signatures_msd) + ); + } + + #[tokio::test] + async fn store_signatures() { + let connection = main_db_connection().unwrap(); + let store = BufferedSingleSignatureRepository::new(Arc::new(connection)); + + // Multiple signatures of the same signed entity type + { + store + .buffer_signature( + CardanoTransactions, + &SingleSignatures::new( + "party1", + fake_keys::single_signature()[0].try_into().unwrap(), + vec![1], + ), + ) + .await + .unwrap(); + store + .buffer_signature( + CardanoTransactions, + &SingleSignatures::new( + "party2", + fake_keys::single_signature()[1].try_into().unwrap(), + vec![2], + ), + ) + .await + .unwrap(); + + let buffered_signatures = store + .get_buffered_signatures(CardanoTransactions) + .await + .unwrap(); + assert_eq!( + vec![ + SingleSignatures::new( + "party2", + fake_keys::single_signature()[1].try_into().unwrap(), + vec![2], + ), + SingleSignatures::new( + "party1", + fake_keys::single_signature()[0].try_into().unwrap(), + vec![1], + ), + ], + buffered_signatures + ); + } + // Another signed entity type to test that the store is able to differentiate between them + { + store + .buffer_signature( + MithrilStakeDistribution, + &SingleSignatures::new( + "party3", + fake_keys::single_signature()[2].try_into().unwrap(), + vec![3], + ), + ) + .await + .unwrap(); + + let buffered_signatures = store + .get_buffered_signatures(MithrilStakeDistribution) + .await + .unwrap(); + assert_eq!( + vec![SingleSignatures::new( + "party3", + fake_keys::single_signature()[2].try_into().unwrap(), + vec![3], + )], + buffered_signatures + ); + } + } + + #[tokio::test] + async fn remove_buffered_signatures() { + let connection = main_db_connection().unwrap(); + insert_buffered_single_signatures( + &connection, + BufferedSingleSignatureRecord::fakes(&[ + ("party1", MithrilStakeDistribution), + ("party2", MithrilStakeDistribution), + ("party3", MithrilStakeDistribution), + ("party4", CardanoTransactions), + ]), + ) + .unwrap(); + + let store = BufferedSingleSignatureRepository::new(Arc::new(connection)); + + store + .remove_buffered_signatures( + MithrilStakeDistribution, + vec![ + BufferedSingleSignatureRecord::fake("party1", MithrilStakeDistribution) + .try_into() + .unwrap(), + BufferedSingleSignatureRecord::fake("party3", MithrilStakeDistribution) + .try_into() + .unwrap(), + ], + ) + .await + .unwrap(); + + let remaining_msd_sigs = store.get_all().unwrap(); + assert_eq!( + strip_buffered_sigs_date(&BufferedSingleSignatureRecord::fakes(&[ + ("party4", CardanoTransactions), + ("party2", MithrilStakeDistribution), + ])), + strip_buffered_sigs_date(&remaining_msd_sigs) + ); + } +} diff --git a/mithril-aggregator/src/database/repository/mod.rs b/mithril-aggregator/src/database/repository/mod.rs index ad443a26d3b..6ba624a92ac 100644 --- a/mithril-aggregator/src/database/repository/mod.rs +++ b/mithril-aggregator/src/database/repository/mod.rs @@ -1,4 +1,5 @@ //! Aggregator related database repositories +mod buffered_single_signature_repository; mod cardano_transaction_repository; mod certificate_repository; mod epoch_setting_store; @@ -9,6 +10,7 @@ mod signer_store; mod single_signature_repository; mod stake_pool_store; +pub use buffered_single_signature_repository::*; pub use certificate_repository::*; pub use epoch_setting_store::*; pub use open_message_repository::*; diff --git a/mithril-aggregator/src/database/test_helper.rs b/mithril-aggregator/src/database/test_helper.rs index ba23fb96633..13d698edaff 100644 --- a/mithril-aggregator/src/database/test_helper.rs +++ b/mithril-aggregator/src/database/test_helper.rs @@ -11,12 +11,13 @@ use mithril_persistence::sqlite::{ use crate::database::query::{ ImportSignerRecordQuery, InsertCertificateRecordQuery, + InsertOrReplaceBufferedSingleSignatureRecordQuery, InsertOrReplaceSignerRegistrationRecordQuery, InsertOrReplaceStakePoolQuery, InsertSignedEntityRecordQuery, UpdateEpochSettingQuery, UpdateSingleSignatureRecordQuery, }; use crate::database::record::{ - CertificateRecord, SignedEntityRecord, SignerRecord, SignerRegistrationRecord, - SingleSignatureRecord, + BufferedSingleSignatureRecord, CertificateRecord, SignedEntityRecord, SignerRecord, + SignerRegistrationRecord, SingleSignatureRecord, }; /// In-memory sqlite database without foreign key support with migrations applied @@ -141,6 +142,44 @@ pub fn insert_single_signatures_in_db( Ok(()) } +pub fn insert_buffered_single_signatures( + connection: &SqliteConnection, + buffered_signature_records: Vec, +) -> StdResult<()> { + if buffered_signature_records.is_empty() { + return Ok(()); + } + + let query = { + // leverage the expanded parameter from this query which is unit + // tested on its own above. + let (sql_values, _) = InsertOrReplaceBufferedSingleSignatureRecordQuery::one( + buffered_signature_records.first().unwrap().clone(), + ) + .filters() + .expand(); + format!("insert into buffered_single_signature {sql_values}") + }; + + for record in buffered_signature_records { + let mut statement = connection.prepare(&query)?; + + statement.bind::<&[(_, Value)]>(&[ + ( + 1, + Value::Integer(record.signed_entity_type_id.index() as i64), + ), + (2, record.party_id.into()), + (3, serde_json::to_string(&record.lottery_indexes)?.into()), + (4, record.signature.into()), + (5, record.created_at.to_rfc3339().into()), + ])?; + statement.next()?; + } + + Ok(()) +} + pub fn insert_certificate_records>( connection: &ConnectionThreadSafe, records: Vec, diff --git a/mithril-aggregator/src/dependency_injection/builder.rs b/mithril-aggregator/src/dependency_injection/builder.rs index 2e9750a83b4..a5afeae4ef1 100644 --- a/mithril-aggregator/src/dependency_injection/builder.rs +++ b/mithril-aggregator/src/dependency_injection/builder.rs @@ -51,6 +51,7 @@ use mithril_persistence::{ store::adapter::{MemoryAdapter, SQLiteAdapter, StoreAdapter}, }; +use super::{DependenciesBuilderError, EpochServiceWrapper, Result}; use crate::{ artifact_builder::{ CardanoImmutableFilesFullArtifactBuilder, CardanoStakeDistributionArtifactBuilder, @@ -58,28 +59,28 @@ use crate::{ }, configuration::ExecutionEnvironment, database::repository::{ - CertificateRepository, EpochSettingStore, OpenMessageRepository, SignedEntityStore, - SignedEntityStorer, SignerRegistrationStore, SignerStore, SingleSignatureRepository, - StakePoolStore, + BufferedSingleSignatureRepository, CertificateRepository, EpochSettingStore, + OpenMessageRepository, SignedEntityStore, SignedEntityStorer, SignerRegistrationStore, + SignerStore, SingleSignatureRepository, StakePoolStore, }, event_store::{EventMessage, EventStore, TransmitterService}, http_server::routes::router, services::{ - AggregatorUpkeepService, CardanoTransactionsImporter, CertifierService, MessageService, - MithrilCertifierService, MithrilEpochService, MithrilMessageService, MithrilProverService, - MithrilSignedEntityService, MithrilStakeDistributionService, ProverService, - SignedEntityService, StakeDistributionService, UpkeepService, + AggregatorUpkeepService, BufferedCertifierService, CardanoTransactionsImporter, + CertifierService, MessageService, MithrilCertifierService, MithrilEpochService, + MithrilMessageService, MithrilProverService, MithrilSignedEntityService, + MithrilStakeDistributionService, ProverService, SignedEntityService, + StakeDistributionService, UpkeepService, }, tools::{CExplorerSignerRetriever, GcpFileUploader, GenesisToolsDependency, SignersImporter}, AggregatorConfig, AggregatorRunner, AggregatorRuntime, CertificatePendingStore, CompressedArchiveSnapshotter, Configuration, DependencyContainer, DumbSnapshotUploader, DumbSnapshotter, LocalSnapshotUploader, MithrilSignerRegisterer, MultiSigner, MultiSignerImpl, - ProtocolParametersStorer, RemoteSnapshotUploader, SnapshotUploader, SnapshotUploaderType, - Snapshotter, SnapshotterCompressionAlgorithm, VerificationKeyStorer, + ProtocolParametersStorer, RemoteSnapshotUploader, SingleSignatureAuthenticator, + SnapshotUploader, SnapshotUploaderType, Snapshotter, SnapshotterCompressionAlgorithm, + VerificationKeyStorer, }; -use super::{DependenciesBuilderError, EpochServiceWrapper, Result}; - const SQLITE_FILE: &str = "aggregator.sqlite3"; const SQLITE_FILE_CARDANO_TRANSACTION: &str = "cardano-transaction.sqlite3"; @@ -115,7 +116,7 @@ pub struct DependenciesBuilder { pub snapshot_uploader: Option>, /// Multisigner service. - pub multi_signer: Option>>, + pub multi_signer: Option>, /// Certificate pending store. pub certificate_pending_store: Option>, @@ -230,6 +231,9 @@ pub struct DependenciesBuilder { /// Upkeep service pub upkeep_service: Option>, + + /// Single signer authenticator + pub single_signer_authenticator: Option>, } impl DependenciesBuilder { @@ -280,6 +284,7 @@ impl DependenciesBuilder { signed_entity_type_lock: None, transactions_importer: None, upkeep_service: None, + single_signer_authenticator: None, } } @@ -449,14 +454,14 @@ impl DependenciesBuilder { Ok(self.snapshot_uploader.as_ref().cloned().unwrap()) } - async fn build_multi_signer(&mut self) -> Result>> { + async fn build_multi_signer(&mut self) -> Result> { let multi_signer = MultiSignerImpl::new(self.get_epoch_service().await?); - Ok(Arc::new(RwLock::new(multi_signer))) + Ok(Arc::new(multi_signer)) } /// Get a configured multi signer - pub async fn get_multi_signer(&mut self) -> Result>> { + pub async fn get_multi_signer(&mut self) -> Result> { if self.multi_signer.is_none() { self.multi_signer = Some(self.build_multi_signer().await?); } @@ -1263,6 +1268,27 @@ impl DependenciesBuilder { Ok(self.upkeep_service.as_ref().cloned().unwrap()) } + async fn build_single_signature_authenticator( + &mut self, + ) -> Result> { + let authenticator = + SingleSignatureAuthenticator::new(self.get_multi_signer().await?, self.get_logger()?); + + Ok(Arc::new(authenticator)) + } + + /// [SingleSignatureAuthenticator] service + pub async fn get_single_signature_authenticator( + &mut self, + ) -> Result> { + if self.single_signer_authenticator.is_none() { + self.single_signer_authenticator = + Some(self.build_single_signature_authenticator().await?); + } + + Ok(self.single_signer_authenticator.as_ref().cloned().unwrap()) + } + /// Return an unconfigured [DependencyContainer] pub async fn build_dependency_container(&mut self) -> Result { let dependency_manager = DependencyContainer { @@ -1307,6 +1333,7 @@ impl DependenciesBuilder { prover_service: self.get_prover_service().await?, signed_entity_type_lock: self.get_signed_entity_lock().await?, upkeep_service: self.get_upkeep_service().await?, + single_signer_authenticator: self.get_single_signature_authenticator().await?, }; Ok(dependency_manager) @@ -1428,10 +1455,10 @@ impl DependenciesBuilder { let cardano_network = self.configuration.get_network().with_context(|| { "Dependencies Builder can not get Cardano network while building the chain observer" })?; + let sqlite_connection = self.get_sqlite_connection().await?; let open_message_repository = self.get_open_message_repository().await?; - let single_signature_repository = Arc::new(SingleSignatureRepository::new( - self.get_sqlite_connection().await?, - )); + let single_signature_repository = + Arc::new(SingleSignatureRepository::new(sqlite_connection.clone())); let certificate_repository = self.get_certificate_repository().await?; let certificate_verifier = self.get_certificate_verifier().await?; let genesis_verifier = self.get_genesis_verifier().await?; @@ -1440,7 +1467,7 @@ impl DependenciesBuilder { let epoch_service = self.get_epoch_service().await?; let logger = self.get_logger()?; - Ok(Arc::new(MithrilCertifierService::new( + let certifier = Arc::new(MithrilCertifierService::new( cardano_network, open_message_repository, single_signature_repository, @@ -1451,6 +1478,12 @@ impl DependenciesBuilder { ticker_service, epoch_service, logger, + )); + + Ok(Arc::new(BufferedCertifierService::new( + certifier, + Arc::new(BufferedSingleSignatureRepository::new(sqlite_connection)), + self.get_logger()?, ))) } diff --git a/mithril-aggregator/src/dependency_injection/containers.rs b/mithril-aggregator/src/dependency_injection/containers.rs index e89798084c6..4be9b1766cc 100644 --- a/mithril-aggregator/src/dependency_injection/containers.rs +++ b/mithril-aggregator/src/dependency_injection/containers.rs @@ -33,12 +33,10 @@ use crate::{ signer_registerer::SignerRecorder, snapshot_uploaders::SnapshotUploader, CertificatePendingStore, ProtocolParametersStorer, SignerRegisterer, - SignerRegistrationRoundOpener, Snapshotter, VerificationKeyStorer, + SignerRegistrationRoundOpener, SingleSignatureAuthenticator, Snapshotter, + VerificationKeyStorer, }; -/// MultiSignerWrapper wraps a [MultiSigner] -pub type MultiSignerWrapper = Arc>; - /// EpochServiceWrapper wraps a [EpochService] pub type EpochServiceWrapper = Arc>; @@ -67,7 +65,7 @@ pub struct DependencyContainer { pub snapshot_uploader: Arc, /// Multisigner service. - pub multi_signer: MultiSignerWrapper, + pub multi_signer: Arc, /// Certificate pending store. pub certificate_pending_store: Arc, @@ -164,6 +162,9 @@ pub struct DependencyContainer { /// Upkeep service pub upkeep_service: Arc, + + /// Single signer authenticator + pub single_signer_authenticator: Arc, } #[doc(hidden)] diff --git a/mithril-aggregator/src/http_server/routes/middlewares.rs b/mithril-aggregator/src/http_server/routes/middlewares.rs index 65a1f1d1942..49c3c350ce3 100644 --- a/mithril-aggregator/src/http_server/routes/middlewares.rs +++ b/mithril-aggregator/src/http_server/routes/middlewares.rs @@ -12,7 +12,7 @@ use crate::event_store::{EventMessage, TransmitterService}; use crate::services::{CertifierService, MessageService, ProverService, SignedEntityService}; use crate::{ CertificatePendingStore, Configuration, DependencyContainer, SignerRegisterer, - VerificationKeyStorer, + SingleSignatureAuthenticator, VerificationKeyStorer, }; /// With certificate pending store @@ -113,6 +113,13 @@ pub fn with_prover_service( warp::any().map(move || dependency_manager.prover_service.clone()) } +/// With Single Signature Authenticator +pub fn with_single_signature_authenticator( + dependency_manager: Arc, +) -> impl Filter,), Error = Infallible> + Clone { + warp::any().map(move || dependency_manager.single_signer_authenticator.clone()) +} + pub mod validators { use crate::http_server::validators::ProverTransactionsHashValidator; diff --git a/mithril-aggregator/src/http_server/routes/signatures_routes.rs b/mithril-aggregator/src/http_server/routes/signatures_routes.rs index 8e74c54710a..029f2f758af 100644 --- a/mithril-aggregator/src/http_server/routes/signatures_routes.rs +++ b/mithril-aggregator/src/http_server/routes/signatures_routes.rs @@ -19,6 +19,9 @@ fn register_signatures( .and(middlewares::with_certifier_service( dependency_manager.clone(), )) + .and(middlewares::with_single_signature_authenticator( + dependency_manager, + )) .and_then(handlers::register_signatures) } @@ -33,20 +36,23 @@ mod handlers { use crate::{ http_server::routes::reply, message_adapters::FromRegisterSingleSignatureAdapter, - services::{CertifierService, CertifierServiceError}, + services::{CertifierService, CertifierServiceError, SignatureRegistrationStatus}, + unwrap_to_internal_server_error, SingleSignatureAuthenticator, }; /// Register Signatures pub async fn register_signatures( message: RegisterSignatureMessage, certifier_service: Arc, + single_signer_authenticator: Arc, ) -> Result { debug!("⇄ HTTP SERVER: register_signatures/{:?}", message); trace!("⇄ HTTP SERVER: register_signatures"; "complete_message" => #?message ); let signed_entity_type = message.signed_entity_type.clone(); + let signed_message = message.signed_message.clone(); - let signatures = match FromRegisterSingleSignatureAdapter::try_adapt(message) { + let mut signatures = match FromRegisterSingleSignatureAdapter::try_adapt(message) { Ok(signature) => signature, Err(err) => { warn!("register_signatures::payload decoding error"; "error" => ?err); @@ -58,6 +64,23 @@ mod handlers { } }; + if let Some(signed_message) = signed_message { + unwrap_to_internal_server_error!( + single_signer_authenticator + .authenticate(&mut signatures, &signed_message) + .await, + "single_signer_authenticator::error" + ); + + if !signatures.is_authenticated() { + debug!("register_signatures::unauthenticated_signature"); + return Ok(reply::bad_request( + "Could not authenticate signature".to_string(), + "Signature could not be authenticated".to_string(), + )); + } + } + match certifier_service .register_single_signature(&signed_entity_type, &signatures) .await @@ -76,7 +99,8 @@ mod handlers { Ok(reply::server_error(err)) } }, - Ok(()) => Ok(reply::empty(StatusCode::CREATED)), + Ok(SignatureRegistrationStatus::Registered) => Ok(reply::empty(StatusCode::CREATED)), + Ok(SignatureRegistrationStatus::Buffered) => Ok(reply::empty(StatusCode::ACCEPTED)), } } } @@ -95,7 +119,8 @@ mod tests { use crate::{ http_server::SERVER_BASE_PATH, initialize_dependencies, - services::{CertifierServiceError, MockCertifierService}, + services::{CertifierServiceError, MockCertifierService, SignatureRegistrationStatus}, + SingleSignatureAuthenticator, }; use super::*; @@ -114,11 +139,78 @@ mod tests { } #[tokio::test] - async fn test_register_signatures_post_ok() { + async fn test_register_signatures_try_to_authenticate_signature_with_signed_message() { + let mut mock_certifier_service = MockCertifierService::new(); + mock_certifier_service + .expect_register_single_signature() + .withf(|_, signature| signature.is_authenticated()) + .once() + .return_once(move |_, _| Ok(SignatureRegistrationStatus::Registered)); + let mut dependency_manager = initialize_dependencies().await; + dependency_manager.certifier_service = Arc::new(mock_certifier_service); + dependency_manager.single_signer_authenticator = + Arc::new(SingleSignatureAuthenticator::new_that_authenticate_everything()); + + let message = RegisterSignatureMessage { + signed_message: Some("message".to_string()), + ..RegisterSignatureMessage::dummy() + }; + + let method = Method::POST.as_str(); + let path = "/register-signatures"; + + request() + .method(method) + .path(&format!("/{SERVER_BASE_PATH}{path}")) + .json(&message) + .reply(&setup_router(Arc::new(dependency_manager))) + .await; + } + + #[tokio::test] + async fn test_register_signatures_return_400_if_authentication_fail() { let mut mock_certifier_service = MockCertifierService::new(); mock_certifier_service .expect_register_single_signature() - .return_once(move |_, _| Ok(())); + .never(); + let mut dependency_manager = initialize_dependencies().await; + dependency_manager.certifier_service = Arc::new(mock_certifier_service); + dependency_manager.single_signer_authenticator = + Arc::new(SingleSignatureAuthenticator::new_that_reject_everything()); + + let message = RegisterSignatureMessage { + signed_message: Some("message".to_string()), + ..RegisterSignatureMessage::dummy() + }; + + let method = Method::POST.as_str(); + let path = "/register-signatures"; + + let response = request() + .method(method) + .path(&format!("/{SERVER_BASE_PATH}{path}")) + .json(&message) + .reply(&setup_router(Arc::new(dependency_manager))) + .await; + + APISpec::verify_conformity( + APISpec::get_all_spec_files(), + method, + path, + "application/json", + &message, + &response, + &StatusCode::BAD_REQUEST, + ) + .unwrap(); + } + + #[tokio::test] + async fn test_register_signatures_post_ok_201() { + let mut mock_certifier_service = MockCertifierService::new(); + mock_certifier_service + .expect_register_single_signature() + .return_once(move |_, _| Ok(SignatureRegistrationStatus::Registered)); let mut dependency_manager = initialize_dependencies().await; dependency_manager.certifier_service = Arc::new(mock_certifier_service); @@ -146,12 +238,45 @@ mod tests { .unwrap(); } + #[tokio::test] + async fn test_register_signatures_post_ok_202() { + let mut mock_certifier_service = MockCertifierService::new(); + mock_certifier_service + .expect_register_single_signature() + .return_once(move |_, _| Ok(SignatureRegistrationStatus::Buffered)); + let mut dependency_manager = initialize_dependencies().await; + dependency_manager.certifier_service = Arc::new(mock_certifier_service); + + let message = RegisterSignatureMessage::dummy(); + + let method = Method::POST.as_str(); + let path = "/register-signatures"; + + let response = request() + .method(method) + .path(&format!("/{SERVER_BASE_PATH}{path}")) + .json(&message) + .reply(&setup_router(Arc::new(dependency_manager))) + .await; + + APISpec::verify_conformity( + APISpec::get_all_spec_files(), + method, + path, + "application/json", + &message, + &response, + &StatusCode::ACCEPTED, + ) + .unwrap(); + } + #[tokio::test] async fn test_register_signatures_post_ko_400() { let mut mock_certifier_service = MockCertifierService::new(); mock_certifier_service .expect_register_single_signature() - .return_once(move |_, _| Ok(())); + .return_once(move |_, _| Ok(SignatureRegistrationStatus::Registered)); let mut dependency_manager = initialize_dependencies().await; dependency_manager.certifier_service = Arc::new(mock_certifier_service); diff --git a/mithril-aggregator/src/lib.rs b/mithril-aggregator/src/lib.rs index baa5318d921..fa955490d73 100644 --- a/mithril-aggregator/src/lib.rs +++ b/mithril-aggregator/src/lib.rs @@ -57,6 +57,7 @@ pub use store::{ }; pub use tools::{ CExplorerSignerRetriever, SignersImporter, SignersImporterPersister, SignersImporterRetriever, + SingleSignatureAuthenticator, }; #[cfg(test)] diff --git a/mithril-aggregator/src/message_adapters/from_register_signature.rs b/mithril-aggregator/src/message_adapters/from_register_signature.rs index ad1dd3a759f..0ef7f029d51 100644 --- a/mithril-aggregator/src/message_adapters/from_register_signature.rs +++ b/mithril-aggregator/src/message_adapters/from_register_signature.rs @@ -1,6 +1,6 @@ use anyhow::Context; use mithril_common::{ - entities::SingleSignatures, + entities::{SingleSignatureAuthenticationStatus, SingleSignatures}, messages::{RegisterSignatureMessage, TryFromMessageAdapter}, StdResult, }; @@ -22,7 +22,7 @@ impl TryFromMessageAdapter "'FromRegisterSingleSignatureAdapter' can not convert the single signature" })?, won_indexes: register_single_signature_message.won_indexes, - signed_message: register_single_signature_message.signed_message, + authentication_status: SingleSignatureAuthenticationStatus::Unauthenticated, }; Ok(signatures) @@ -42,9 +42,5 @@ mod tests { .unwrap(); assert_eq!("party_id".to_string(), signatures.party_id); - assert_eq!( - Some("signed_message".to_string()), - signatures.signed_message - ); } } diff --git a/mithril-aggregator/src/multi_signer.rs b/mithril-aggregator/src/multi_signer.rs index a001b8e3dc7..b4aded9ee96 100644 --- a/mithril-aggregator/src/multi_signer.rs +++ b/mithril-aggregator/src/multi_signer.rs @@ -5,6 +5,7 @@ use slog_scope::{debug, warn}; use mithril_common::{ crypto_helper::{ProtocolAggregationError, ProtocolMultiSignature}, entities::{self}, + protocol::MultiSigner as ProtocolMultiSigner, StdResult, }; @@ -21,7 +22,14 @@ pub trait MultiSigner: Sync + Send { /// Verify a single signature async fn verify_single_signature( &self, - message: &entities::ProtocolMessage, + message: &str, + signatures: &entities::SingleSignatures, + ) -> StdResult<()>; + + /// Verify a single signature using the stake distribution of the next epoch + async fn verify_single_signature_for_next_stake_distribution( + &self, + message: &str, signatures: &entities::SingleSignatures, ) -> StdResult<()>; @@ -43,31 +51,56 @@ impl MultiSignerImpl { debug!("New MultiSignerImpl created"); Self { epoch_service } } -} -#[async_trait] -impl MultiSigner for MultiSignerImpl { - /// Verify a single signature - async fn verify_single_signature( + fn run_verify_single_signature( &self, - message: &entities::ProtocolMessage, + message: &str, single_signature: &entities::SingleSignatures, + protocol_multi_signer: &ProtocolMultiSigner, ) -> StdResult<()> { debug!( "Verify single signature from {} at indexes {:?} for message {:?}", single_signature.party_id, single_signature.won_indexes, message ); + protocol_multi_signer + .verify_single_signature(&message, single_signature) + .with_context(|| { + format!("Multi Signer can not verify single signature for message '{message:?}'") + }) + } +} + +#[async_trait] +impl MultiSigner for MultiSignerImpl { + /// Verify a single signature + async fn verify_single_signature( + &self, + message: &str, + single_signature: &entities::SingleSignatures, + ) -> StdResult<()> { let epoch_service = self.epoch_service.read().await; let protocol_multi_signer = epoch_service.protocol_multi_signer().with_context(|| { "Multi Signer could not get protocol multi-signer from epoch service" })?; - protocol_multi_signer - .verify_single_signature(message, single_signature) - .with_context(|| { - format!("Multi Signer can not verify single signature for message '{message:?}'") - }) + self.run_verify_single_signature(message, single_signature, protocol_multi_signer) + } + + async fn verify_single_signature_for_next_stake_distribution( + &self, + message: &str, + single_signature: &entities::SingleSignatures, + ) -> StdResult<()> { + let epoch_service = self.epoch_service.read().await; + let next_protocol_multi_signer = + epoch_service + .next_protocol_multi_signer() + .with_context(|| { + "Multi Signer could not get next protocol multi-signer from epoch service" + })?; + + self.run_verify_single_signature(message, single_signature, next_protocol_multi_signer) } /// Creates a multi signature from single signatures @@ -101,17 +134,18 @@ impl MultiSigner for MultiSignerImpl { #[cfg(test)] mod tests { - use super::*; - use crate::services::FakeEpochService; - use mithril_common::entities::SignerWithStake; - use mithril_common::{ - crypto_helper::tests_setup::*, - entities::{CardanoDbBeacon, Epoch, SignedEntityType}, - test_utils::{fake_data, MithrilFixtureBuilder}, - }; use std::sync::Arc; use tokio::sync::RwLock; + use mithril_common::crypto_helper::tests_setup::*; + use mithril_common::entities::{CardanoDbBeacon, Epoch, SignedEntityType, SignerWithStake}; + use mithril_common::protocol::ToMessage; + use mithril_common::test_utils::{fake_data, MithrilFixtureBuilder}; + + use crate::services::FakeEpochService; + + use super::*; + fn take_signatures_until_quorum_is_almost_reached( signatures: &mut Vec, quorum: usize, @@ -132,6 +166,52 @@ mod tests { result } + #[tokio::test] + async fn test_verify_single_signature() { + let epoch = Epoch(5); + let fixture = MithrilFixtureBuilder::default().with_signers(5).build(); + let next_fixture = MithrilFixtureBuilder::default().with_signers(4).build(); + let multi_signer = + MultiSignerImpl::new(Arc::new(RwLock::new(FakeEpochService::with_data( + epoch, + &fixture.protocol_parameters(), + &next_fixture.protocol_parameters(), + &next_fixture.protocol_parameters(), + &fixture.signers_with_stake(), + &next_fixture.signers_with_stake(), + )))); + + { + let message = setup_message(); + let signature = fixture.signers_fixture()[0].sign(&message).unwrap(); + + multi_signer + .verify_single_signature(&message.to_message(), &signature) + .await + .unwrap(); + + multi_signer.verify_single_signature_for_next_stake_distribution(&message.to_message(), &signature).await.expect_err( + "single signature issued in the current epoch should not be valid for the next epoch", + ); + } + { + let message = setup_message(); + let next_epoch_signature = next_fixture.signers_fixture()[0].sign(&message).unwrap(); + + multi_signer + .verify_single_signature_for_next_stake_distribution( + &message.to_message(), + &next_epoch_signature, + ) + .await + .unwrap(); + + multi_signer.verify_single_signature(&message.to_message(), &next_epoch_signature).await.expect_err( + "single signature issued in the next epoch should not be valid for the current epoch", + ); + } + } + #[tokio::test] async fn test_multi_signer_multi_signature_ok() { let epoch = Epoch(5); @@ -147,25 +227,15 @@ mod tests { let mut expected_certificate_signers: Vec = Vec::new(); for signer_fixture in fixture.signers_fixture() { - if let Some(signature) = signer_fixture - .protocol_signer - .sign(message.compute_hash().as_bytes()) - { - let won_indexes = signature.indexes.clone(); - - signatures.push(entities::SingleSignatures::new( - signer_fixture.signer_with_stake.party_id.to_owned(), - signature.into(), - won_indexes, - )); - + if let Some(signature) = signer_fixture.sign(&message) { + signatures.push(signature); expected_certificate_signers.push(signer_fixture.signer_with_stake.to_owned()) } } for signature in &signatures { multi_signer - .verify_single_signature(&message, signature) + .verify_single_signature(&message.to_message(), signature) .await .expect("single signature should be valid"); } diff --git a/mithril-aggregator/src/services/certifier/buffered_certifier.rs b/mithril-aggregator/src/services/certifier/buffered_certifier.rs new file mode 100644 index 00000000000..7edf5538798 --- /dev/null +++ b/mithril-aggregator/src/services/certifier/buffered_certifier.rs @@ -0,0 +1,529 @@ +use async_trait::async_trait; +use slog::{debug, trace, warn, Logger}; +use std::sync::Arc; + +use mithril_common::entities::{ + Certificate, Epoch, ProtocolMessage, SignedEntityType, SignedEntityTypeDiscriminants, + SingleSignatures, +}; +use mithril_common::StdResult; + +use crate::entities::OpenMessage; +use crate::services::{ + BufferedSingleSignatureStore, CertifierService, CertifierServiceError, + SignatureRegistrationStatus, +}; + +/// A decorator of [CertifierService] that can buffer registration of single signatures +/// when the open message is not yet created. +/// +/// When an open message is created, buffered single signatures for the open message type are +/// registered. +pub struct BufferedCertifierService { + certifier_service: Arc, + buffered_single_signature_store: Arc, + logger: Logger, +} + +impl BufferedCertifierService { + /// Create a new instance of `BufferedCertifierService`. + pub fn new( + certifier_service: Arc, + buffered_single_signature_store: Arc, + logger: Logger, + ) -> Self { + Self { + certifier_service, + buffered_single_signature_store, + logger, + } + } + + async fn try_register_buffered_signatures_to_current_open_message( + &self, + signed_entity_type: &SignedEntityType, + ) -> StdResult<()> { + let discriminant: SignedEntityTypeDiscriminants = signed_entity_type.into(); + let buffered_signatures = self + .buffered_single_signature_store + .get_buffered_signatures(discriminant) + .await?; + let mut signatures_to_remove = vec![]; + + for signature in buffered_signatures { + match self + .certifier_service + .register_single_signature(signed_entity_type, &signature) + .await + { + Ok(..) => { + signatures_to_remove.push(signature); + } + Err(error) => match error.downcast_ref::() { + Some(CertifierServiceError::InvalidSingleSignature(..)) => { + trace!(self.logger, "Skipping invalid signature for signed entity '{signed_entity_type:?}'"; + "party_id" => &signature.party_id, + "error" => ?error, + ); + } + _ => { + anyhow::bail!(error); + } + }, + } + } + + self.buffered_single_signature_store + .remove_buffered_signatures(discriminant, signatures_to_remove) + .await?; + + Ok(()) + } +} + +#[async_trait] +impl CertifierService for BufferedCertifierService { + async fn inform_epoch(&self, epoch: Epoch) -> StdResult<()> { + self.certifier_service.inform_epoch(epoch).await + } + + async fn register_single_signature( + &self, + signed_entity_type: &SignedEntityType, + signature: &SingleSignatures, + ) -> StdResult { + match self + .certifier_service + .register_single_signature(signed_entity_type, signature) + .await + { + Ok(res) => Ok(res), + Err(error) => match error.downcast_ref::() { + Some(CertifierServiceError::NotFound(..)) if signature.is_authenticated() => { + debug!( + self.logger, + "No OpenMessage available for signed entity - Buffering single signature"; + "signed_entity_type" => ?signed_entity_type, + "party_id" => &signature.party_id + ); + + self.buffered_single_signature_store + .buffer_signature(signed_entity_type.into(), signature) + .await?; + + Ok(SignatureRegistrationStatus::Buffered) + } + _ => Err(error), + }, + } + } + + async fn create_open_message( + &self, + signed_entity_type: &SignedEntityType, + protocol_message: &ProtocolMessage, + ) -> StdResult { + // IMPORTANT: this method should not fail if the open message creation succeeds + // Otherwise, the state machine won't aggregate signatures for this open message. + + let creation_result = self + .certifier_service + .create_open_message(signed_entity_type, protocol_message) + .await; + + if creation_result.is_ok() { + if let Err(error) = self + .try_register_buffered_signatures_to_current_open_message(signed_entity_type) + .await + { + warn!(self.logger, "Failed to register buffered signatures to the new open message"; + "signed_entity_type" => ?signed_entity_type, + "error" => ?error + ); + } + } + + creation_result + } + + async fn get_open_message( + &self, + signed_entity_type: &SignedEntityType, + ) -> StdResult> { + self.certifier_service + .get_open_message(signed_entity_type) + .await + } + + async fn mark_open_message_if_expired( + &self, + signed_entity_type: &SignedEntityType, + ) -> StdResult> { + self.certifier_service + .mark_open_message_if_expired(signed_entity_type) + .await + } + + async fn create_certificate( + &self, + signed_entity_type: &SignedEntityType, + ) -> StdResult> { + self.certifier_service + .create_certificate(signed_entity_type) + .await + } + + async fn get_certificate_by_hash(&self, hash: &str) -> StdResult> { + self.certifier_service.get_certificate_by_hash(hash).await + } + + async fn get_latest_certificates(&self, last_n: usize) -> StdResult> { + self.certifier_service.get_latest_certificates(last_n).await + } + + async fn verify_certificate_chain(&self, epoch: Epoch) -> StdResult<()> { + self.certifier_service.verify_certificate_chain(epoch).await + } +} + +#[cfg(test)] +mod tests { + use anyhow::anyhow; + use mockall::predicate::eq; + + use mithril_common::entities::SignedEntityTypeDiscriminants::{ + CardanoTransactions, MithrilStakeDistribution, + }; + use mithril_common::entities::SingleSignatureAuthenticationStatus; + use mithril_common::test_utils::fake_data; + + use crate::database::repository::BufferedSingleSignatureRepository; + use crate::database::test_helper::main_db_connection; + use crate::services::{ + CertifierServiceError, MockBufferedSingleSignatureStore, MockCertifierService, + }; + use crate::test_tools::TestLogger; + + use super::*; + + fn mock_certifier( + certifier_mock_config: impl FnOnce(&mut MockCertifierService), + ) -> Arc { + let mut certifier = MockCertifierService::new(); + certifier_mock_config(&mut certifier); + Arc::new(certifier) + } + + fn mock_store(store_mock_config: F) -> Arc + where + F: FnOnce(&mut MockBufferedSingleSignatureStore), + { + let mut store = MockBufferedSingleSignatureStore::new(); + store_mock_config(&mut store); + Arc::new(store) + } + + /// Run a scenario where we try to register a signature (using a fixed signed entity type). + /// + /// Return the registration result and the list of buffered signatures after the registration. + async fn run_register_signature_scenario( + decorated_certifier_mock_config: impl FnOnce(&mut MockCertifierService), + signature_to_register: &SingleSignatures, + ) -> ( + StdResult, + Vec, + ) { + let store = Arc::new(BufferedSingleSignatureRepository::new(Arc::new( + main_db_connection().unwrap(), + ))); + let certifier = BufferedCertifierService::new( + mock_certifier(decorated_certifier_mock_config), + store.clone(), + TestLogger::stdout(), + ); + + let registration_result = certifier + .register_single_signature( + &SignedEntityType::MithrilStakeDistribution(Epoch(5)), + signature_to_register, + ) + .await; + + let buffered_signatures = store + .get_buffered_signatures(MithrilStakeDistribution) + .await + .unwrap(); + + (registration_result, buffered_signatures) + } + + #[tokio::test] + async fn when_registering_single_signature_dont_buffer_signature_if_decorated_certifier_succeed( + ) { + let (registration_result, buffered_signatures_after_registration) = + run_register_signature_scenario( + |mock_certifier| { + mock_certifier + .expect_register_single_signature() + .returning(|_, _| Ok(SignatureRegistrationStatus::Registered)); + }, + &SingleSignatures::fake("party_1", "a message"), + ) + .await; + + let status = registration_result.expect("Registration should have succeed"); + assert_eq!(status, SignatureRegistrationStatus::Registered); + assert_eq!( + buffered_signatures_after_registration, + Vec::::new() + ); + } + + mod when_registering_single_signature_if_decorated_certifier_as_no_opened_message { + use super::*; + + #[tokio::test] + async fn buffer_signature_if_authenticated() { + let (registration_result, buffered_signatures_after_registration) = + run_register_signature_scenario( + |mock_certifier| { + mock_certifier + .expect_register_single_signature() + .returning(|_, _| { + Err(CertifierServiceError::NotFound( + SignedEntityType::MithrilStakeDistribution(Epoch(5)), + ) + .into()) + }); + }, + &SingleSignatures { + authentication_status: SingleSignatureAuthenticationStatus::Authenticated, + ..SingleSignatures::fake("party_1", "a message") + }, + ) + .await; + + let status = registration_result.expect("Registration should have succeed"); + assert_eq!(status, SignatureRegistrationStatus::Buffered); + assert_eq!( + buffered_signatures_after_registration, + vec![SingleSignatures::fake("party_1", "a message")] + ); + } + + #[tokio::test] + async fn dont_buffer_signature_if_not_authenticated() { + let (registration_result, buffered_signatures_after_registration) = + run_register_signature_scenario( + |mock_certifier| { + mock_certifier + .expect_register_single_signature() + .returning(|_, _| { + Err(CertifierServiceError::NotFound( + SignedEntityType::MithrilStakeDistribution(Epoch(5)), + ) + .into()) + }); + }, + &SingleSignatures { + authentication_status: SingleSignatureAuthenticationStatus::Unauthenticated, + ..SingleSignatures::fake("party_1", "a message") + }, + ) + .await; + + registration_result.expect_err("Registration should have failed"); + assert_eq!( + buffered_signatures_after_registration, + Vec::::new() + ); + } + } + + #[tokio::test] + async fn buffered_signatures_are_moved_to_newly_opened_message() { + let store = Arc::new(BufferedSingleSignatureRepository::new(Arc::new( + main_db_connection().unwrap(), + ))); + for (signed_type, signature) in [ + ( + MithrilStakeDistribution, + SingleSignatures::fake("party_1", "message 1"), + ), + ( + MithrilStakeDistribution, + SingleSignatures::fake("party_2", "message 2"), + ), + ( + CardanoTransactions, + SingleSignatures::fake("party_3", "message 3"), + ), + ] { + store + .buffer_signature(signed_type, &signature) + .await + .unwrap(); + } + + let certifier = BufferedCertifierService::new( + mock_certifier(|mock| { + mock.expect_create_open_message() + .returning(|_, _| Ok(OpenMessage::dummy())); + + // Those configuration Asserts that the buffered signatures are registered + mock.expect_register_single_signature() + .with( + eq(SignedEntityType::MithrilStakeDistribution(Epoch(5))), + eq(SingleSignatures::fake("party_1", "message 1")), + ) + .once() + .returning(|_, _| Ok(SignatureRegistrationStatus::Registered)); + mock.expect_register_single_signature() + .with( + eq(SignedEntityType::MithrilStakeDistribution(Epoch(5))), + eq(SingleSignatures::fake("party_2", "message 2")), + ) + .once() + .returning(|_, _| Ok(SignatureRegistrationStatus::Registered)); + }), + store.clone(), + TestLogger::stdout(), + ); + + certifier + .create_open_message( + &SignedEntityType::MithrilStakeDistribution(Epoch(5)), + &ProtocolMessage::new(), + ) + .await + .unwrap(); + + let remaining_sigs = store + .get_buffered_signatures(MithrilStakeDistribution) + .await + .unwrap(); + assert!(remaining_sigs.is_empty()); + } + + mod when_failing_to_transfer_buffered_signature_to_new_open_message { + use mockall::predicate::always; + + use super::*; + + async fn run_scenario( + certifier_mock_config: impl FnOnce(&mut MockCertifierService), + store_mock_config: impl FnOnce(&mut MockBufferedSingleSignatureStore), + ) { + let store = mock_store(store_mock_config); + let certifier = BufferedCertifierService::new( + mock_certifier(certifier_mock_config), + store, + TestLogger::stdout(), + ); + + certifier + .create_open_message( + &SignedEntityType::MithrilStakeDistribution(Epoch(5)), + &ProtocolMessage::new(), + ) + .await + .expect("Transferring buffered signatures to new open message should not fail"); + } + + #[tokio::test] + async fn skip_invalid_signatures() { + run_scenario( + |mock| { + mock.expect_create_open_message() + .returning(|_, _| Ok(OpenMessage::dummy())); + + mock.expect_register_single_signature() + .with(always(), eq(fake_data::single_signatures(vec![1]))) + .returning(|_, _| Ok(SignatureRegistrationStatus::Registered)) + .once(); + mock.expect_register_single_signature() + .with(always(), eq(fake_data::single_signatures(vec![2]))) + .returning(|_, _| { + Err(CertifierServiceError::InvalidSingleSignature( + OpenMessage::dummy().signed_entity_type, + anyhow!("Invalid signature"), + ) + .into()) + }) + .once(); + mock.expect_register_single_signature() + .with(always(), eq(fake_data::single_signatures(vec![3]))) + .returning(|_, _| Ok(SignatureRegistrationStatus::Registered)) + .once(); + }, + |mock| { + mock.expect_get_buffered_signatures().returning(|_| { + Ok(vec![ + fake_data::single_signatures(vec![1]), + fake_data::single_signatures(vec![2]), + fake_data::single_signatures(vec![3]), + ]) + }); + mock.expect_remove_buffered_signatures() + // Only non-skipped signatures should be removed + .withf(|_, sig_to_remove| sig_to_remove.len() == 2) + .returning(|_, _| Ok(())); + }, + ) + .await; + } + + #[tokio::test] + async fn do_not_return_an_error_if_getting_buffer_signatures_fail() { + run_scenario( + |mock| { + mock.expect_create_open_message() + .returning(|_, _| Ok(OpenMessage::dummy())); + mock.expect_register_single_signature() + .returning(|_, _| Ok(SignatureRegistrationStatus::Registered)); + }, + |mock| { + mock.expect_get_buffered_signatures() + .returning(|_| Err(anyhow!("get_buffered_signatures error"))); + }, + ) + .await; + } + + #[tokio::test] + async fn do_not_return_an_error_if_registering_signature_fail() { + run_scenario( + |mock| { + mock.expect_create_open_message() + .returning(|_, _| Ok(OpenMessage::dummy())); + mock.expect_register_single_signature() + .returning(|_, _| Err(anyhow!("register_single_signature error"))); + }, + |mock| { + mock.expect_get_buffered_signatures() + .returning(|_| Ok(vec![fake_data::single_signatures(vec![1])])); + }, + ) + .await; + } + + #[tokio::test] + async fn do_not_return_an_error_if_removing_buffered_signatures_fail() { + run_scenario( + |mock| { + mock.expect_create_open_message() + .returning(|_, _| Ok(OpenMessage::dummy())); + mock.expect_register_single_signature() + .returning(|_, _| Ok(SignatureRegistrationStatus::Registered)); + }, + |mock| { + mock.expect_get_buffered_signatures() + .returning(|_| Ok(vec![fake_data::single_signatures(vec![1])])); + mock.expect_remove_buffered_signatures() + .returning(|_, _| Err(anyhow!("remove_buffered_signatures error"))); + }, + ) + .await; + } + } +} diff --git a/mithril-aggregator/src/services/certifier.rs b/mithril-aggregator/src/services/certifier/certifier_service.rs similarity index 85% rename from mithril-aggregator/src/services/certifier.rs rename to mithril-aggregator/src/services/certifier/certifier_service.rs index a28d3130f5e..a2c26097cfc 100644 --- a/mithril-aggregator/src/services/certifier.rs +++ b/mithril-aggregator/src/services/certifier/certifier_service.rs @@ -1,148 +1,27 @@ -//! ## Certifier Service -//! -//! This service is responsible for [OpenMessage] cycle of life. It creates open -//! messages and turn them into [Certificate]. To do so, it registers -//! single signatures and deal with the multi_signer for aggregate signature -//! creation. - use anyhow::Context; use async_trait::async_trait; use chrono::Utc; -use mithril_common::{ - certificate_chain::CertificateVerifier, - crypto_helper::{ProtocolGenesisVerifier, PROTOCOL_VERSION}, - entities::{ - Certificate, CertificateMetadata, CertificateSignature, Epoch, ProtocolMessage, - SignedEntityType, SingleSignatures, StakeDistributionParty, - }, - CardanoNetwork, StdResult, TickerService, -}; use slog::Logger; -use slog_scope::{debug, error, info, trace, warn}; +use slog_scope::{debug, info, trace, warn}; use std::sync::Arc; -use thiserror::Error; -use tokio::sync::RwLock; - -use crate::{ - database::record::{OpenMessageRecord, OpenMessageWithSingleSignaturesRecord}, - database::repository::{ - CertificateRepository, OpenMessageRepository, SingleSignatureRepository, - }, - entities::OpenMessage, - MultiSigner, + +use mithril_common::certificate_chain::CertificateVerifier; +use mithril_common::crypto_helper::{ProtocolGenesisVerifier, PROTOCOL_VERSION}; +use mithril_common::entities::{ + Certificate, CertificateMetadata, CertificateSignature, Epoch, ProtocolMessage, + SignedEntityType, SingleSignatures, StakeDistributionParty, }; +use mithril_common::protocol::ToMessage; +use mithril_common::{CardanoNetwork, StdResult, TickerService}; +use crate::database::record::{OpenMessageRecord, OpenMessageWithSingleSignaturesRecord}; +use crate::database::repository::{ + CertificateRepository, OpenMessageRepository, SingleSignatureRepository, +}; use crate::dependency_injection::EpochServiceWrapper; - -#[cfg(test)] -use mockall::automock; - -/// Errors dedicated to the CertifierService. -#[derive(Debug, Error)] -pub enum CertifierServiceError { - /// OpenMessage not found. - #[error("The open message was not found for beacon {0:?}.")] - NotFound(SignedEntityType), - - /// The open message is already certified, no more single signatures may be - /// attached to it nor be certified again. - #[error("Open message for beacon {0:?} already certified.")] - AlreadyCertified(SignedEntityType), - - /// The open message is expired, no more single signatures may be - /// attached to it nor be certified again. - #[error("Open message for beacon {0:?} is expired.")] - Expired(SignedEntityType), - - /// No parent certificate could be found, this certifier cannot create genesis certificates. - #[error( - "No parent certificate could be found, this certifier cannot create genesis certificates." - )] - NoParentCertificateFound, - - /// No certificate for this epoch - #[error("There is an epoch gap between the last certificate epoch ({certificate_epoch:?}) and current epoch ({current_epoch:?})")] - CertificateEpochGap { - /// Epoch of the last issued certificate - certificate_epoch: Epoch, - - /// Given current epoch - current_epoch: Epoch, - }, - - /// Could not verify certificate chain because could not find last certificate. - #[error("No certificate found.")] - CouldNotFindLastCertificate, -} - -/// ## CertifierService -/// -/// This service manages the open message and their beacon transitions. It can -/// ultimately transform open messages into certificates. -#[cfg_attr(test, automock)] -#[async_trait] -pub trait CertifierService: Sync + Send { - /// Inform the certifier I have detected a new epoch, it may clear its state - /// and prepare the new signature round. If the given Epoch is equal or less - /// than the previous informed Epoch, nothing is done. - async fn inform_epoch(&self, epoch: Epoch) -> StdResult<()>; - - /// Add a new single signature for the open message at the given beacon. If - /// the open message does not exist or the open message has been certified - /// since then, an error is returned. - async fn register_single_signature( - &self, - signed_entity_type: &SignedEntityType, - signature: &SingleSignatures, - ) -> StdResult<()>; - - /// Create an open message at the given beacon. If the open message does not - /// exist or exists at an older beacon, the older open messages are cleared - /// along with their associated single signatures and the new open message - /// is created. If the message already exists, an error is returned. - async fn create_open_message( - &self, - signed_entity_type: &SignedEntityType, - protocol_message: &ProtocolMessage, - ) -> StdResult; - - /// Return the open message at the given Beacon. If the message does not - /// exist, None is returned. - async fn get_open_message( - &self, - signed_entity_type: &SignedEntityType, - ) -> StdResult>; - - /// Mark the open message if it has expired. - async fn mark_open_message_if_expired( - &self, - signed_entity_type: &SignedEntityType, - ) -> StdResult>; - - /// Create a certificate if possible. If the pointed open message does - /// not exist or has been already certified, an error is raised. If a multi - /// signature is created then the flag `is_certified` of the open - /// message is set to true. The Certificate is created. - /// If the stake quorum of the single signatures is - /// not reached for the multisignature to be created, the certificate is not - /// created and None is returned. If the certificate can be created, the - /// list of the registered signers for the given epoch is used. - async fn create_certificate( - &self, - signed_entity_type: &SignedEntityType, - ) -> StdResult>; - - /// Returns a certificate from its hash. - async fn get_certificate_by_hash(&self, hash: &str) -> StdResult>; - - /// Returns the list of the latest created certificates. - async fn get_latest_certificates(&self, last_n: usize) -> StdResult>; - - /// Verify the certificate chain and epoch gap. This will return an error if - /// there is at least an epoch between the given epoch and the most recent - /// certificate. - async fn verify_certificate_chain(&self, epoch: Epoch) -> StdResult<()>; -} +use crate::entities::OpenMessage; +use crate::services::{CertifierService, CertifierServiceError, SignatureRegistrationStatus}; +use crate::MultiSigner; /// Mithril CertifierService implementation pub struct MithrilCertifierService { @@ -152,7 +31,7 @@ pub struct MithrilCertifierService { certificate_repository: Arc, certificate_verifier: Arc, genesis_verifier: Arc, - multi_signer: Arc>, + multi_signer: Arc, // todo: should be removed after removing immutable file number from the certificate metadata ticker_service: Arc, epoch_service: EpochServiceWrapper, @@ -169,7 +48,7 @@ impl MithrilCertifierService { certificate_repository: Arc, certificate_verifier: Arc, genesis_verifier: Arc, - multi_signer: Arc>, + multi_signer: Arc, ticker_service: Arc, epoch_service: EpochServiceWrapper, logger: Logger, @@ -226,7 +105,7 @@ impl CertifierService for MithrilCertifierService { &self, signed_entity_type: &SignedEntityType, signature: &SingleSignatures, - ) -> StdResult<()> { + ) -> StdResult { debug!("CertifierService::register_single_signature(signed_entity_type: {signed_entity_type:?}, single_signatures: {signature:?}"); trace!("CertifierService::register_single_signature"; "complete_single_signatures" => #?signature); @@ -250,10 +129,12 @@ impl CertifierService for MithrilCertifierService { return Err(CertifierServiceError::Expired(signed_entity_type.clone()).into()); } - let multi_signer = self.multi_signer.read().await; - multi_signer - .verify_single_signature(&open_message.protocol_message, signature) - .await?; + self.multi_signer + .verify_single_signature(&open_message.protocol_message.to_message(), signature) + .await + .map_err(|err| { + CertifierServiceError::InvalidSingleSignature(signed_entity_type.clone(), err) + })?; let single_signature = self .single_signature_repository @@ -262,7 +143,7 @@ impl CertifierService for MithrilCertifierService { info!("CertifierService::register_single_signature: created pool '{}' single signature for {signed_entity_type:?}.", single_signature.signer_id); debug!("CertifierService::register_single_signature: created single signature for open message ID='{}'.", single_signature.open_message_id); - Ok(()) + Ok(SignatureRegistrationStatus::Registered) } async fn create_open_message( @@ -359,8 +240,11 @@ impl CertifierService for MithrilCertifierService { return Err(CertifierServiceError::Expired(signed_entity_type.clone()).into()); } - let multi_signer = self.multi_signer.read().await; - let multi_signature = match multi_signer.create_multi_signature(&open_message).await? { + let multi_signature = match self + .multi_signer + .create_multi_signature(&open_message) + .await? + { None => { debug!("CertifierService::create_certificate: No multi-signature could be created for open message {signed_entity_type:?}"); return Ok(None); @@ -501,6 +385,7 @@ mod tests { entities::{CardanoDbBeacon, ProtocolMessagePartKey}, test_utils::{fake_data, MithrilFixture, MithrilFixtureBuilder}, }; + use tokio::sync::RwLock; use super::*; @@ -743,10 +628,18 @@ mod tests { signatures.push(signature); } } - certifier_service + let err = certifier_service .register_single_signature(&signed_entity_type, &signatures[0]) .await .expect_err("register_single_signature should fail"); + + assert!( + matches!( + err.downcast_ref::(), + Some(CertifierServiceError::InvalidSingleSignature(..)) + ), + "Expected CertifierServiceError::InvalidSingleSignature, got: '{err:?}'" + ); } #[tokio::test] @@ -935,7 +828,7 @@ mod tests { let fixture = MithrilFixtureBuilder::default().with_signers(5).build(); let mut certifier_service = setup_certifier_service(&fixture, &epochs_with_signers, None).await; - certifier_service.multi_signer = Arc::new(RwLock::new(mock_multi_signer)); + certifier_service.multi_signer = Arc::new(mock_multi_signer); certifier_service .create_open_message(&signed_entity_type, &protocol_message) .await diff --git a/mithril-aggregator/src/services/certifier/interface.rs b/mithril-aggregator/src/services/certifier/interface.rs new file mode 100644 index 00000000000..043dc0dd158 --- /dev/null +++ b/mithril-aggregator/src/services/certifier/interface.rs @@ -0,0 +1,158 @@ +use async_trait::async_trait; +use thiserror::Error; + +use mithril_common::entities::{ + Certificate, Epoch, ProtocolMessage, SignedEntityType, SignedEntityTypeDiscriminants, + SingleSignatures, +}; +use mithril_common::{StdError, StdResult}; + +use crate::entities::OpenMessage; + +/// Errors dedicated to the CertifierService. +#[derive(Debug, Error)] +pub enum CertifierServiceError { + /// OpenMessage not found. + #[error("The open message was not found for beacon {0:?}.")] + NotFound(SignedEntityType), + + /// The open message is already certified, no more single signatures may be + /// attached to it nor be certified again. + #[error("Open message for beacon {0:?} already certified.")] + AlreadyCertified(SignedEntityType), + + /// The open message is expired, no more single signatures may be + /// attached to it nor be certified again. + #[error("Open message for beacon {0:?} is expired.")] + Expired(SignedEntityType), + + /// An invalid signature was provided. + #[error("Invalid single signature for {0:?}.")] + InvalidSingleSignature(SignedEntityType, #[source] StdError), + + /// No parent certificate could be found, this certifier cannot create genesis certificates. + #[error( + "No parent certificate could be found, this certifier cannot create genesis certificates." + )] + NoParentCertificateFound, + + /// No certificate for this epoch + #[error("There is an epoch gap between the last certificate epoch ({certificate_epoch:?}) and current epoch ({current_epoch:?})")] + CertificateEpochGap { + /// Epoch of the last issued certificate + certificate_epoch: Epoch, + + /// Given current epoch + current_epoch: Epoch, + }, + + /// Could not verify certificate chain because could not find last certificate. + #[error("No certificate found.")] + CouldNotFindLastCertificate, +} + +/// Status of a successful registration of a single signature. +#[derive(Debug, Copy, Clone, PartialEq)] +pub enum SignatureRegistrationStatus { + /// The signature was registered and will be used for the next certificate. + Registered, + + /// The signature was buffered, it will be used later. + Buffered, +} + +/// ## CertifierService +/// +/// This service manages the open message and their beacon transitions. It can +/// ultimately transform open messages into certificates. +#[cfg_attr(test, mockall::automock)] +#[async_trait] +pub trait CertifierService: Sync + Send { + /// Inform the certifier I have detected a new epoch, it may clear its state + /// and prepare the new signature round. If the given Epoch is equal or less + /// than the previous informed Epoch, nothing is done. + async fn inform_epoch(&self, epoch: Epoch) -> StdResult<()>; + + /// Add a new single signature for the open message at the given beacon. If + /// the open message does not exist or the open message has been certified + /// since then, an error is returned. + async fn register_single_signature( + &self, + signed_entity_type: &SignedEntityType, + signature: &SingleSignatures, + ) -> StdResult; + + /// Create an open message at the given beacon. If the open message does not + /// exist or exists at an older beacon, the older open messages are cleared + /// along with their associated single signatures and the new open message + /// is created. If the message already exists, an error is returned. + async fn create_open_message( + &self, + signed_entity_type: &SignedEntityType, + protocol_message: &ProtocolMessage, + ) -> StdResult; + + /// Return the open message at the given Beacon. If the message does not + /// exist, None is returned. + async fn get_open_message( + &self, + signed_entity_type: &SignedEntityType, + ) -> StdResult>; + + /// Mark the open message if it has expired. + async fn mark_open_message_if_expired( + &self, + signed_entity_type: &SignedEntityType, + ) -> StdResult>; + + /// Create a certificate if possible. If the pointed open message does + /// not exist or has been already certified, an error is raised. If a multi + /// signature is created then the flag `is_certified` of the open + /// message is set to true. The Certificate is created. + /// If the stake quorum of the single signatures is + /// not reached for the multisignature to be created, the certificate is not + /// created and None is returned. If the certificate can be created, the + /// list of the registered signers for the given epoch is used. + async fn create_certificate( + &self, + signed_entity_type: &SignedEntityType, + ) -> StdResult>; + + /// Returns a certificate from its hash. + async fn get_certificate_by_hash(&self, hash: &str) -> StdResult>; + + /// Returns the list of the latest created certificates. + async fn get_latest_certificates(&self, last_n: usize) -> StdResult>; + + /// Verify the certificate chain and epoch gap. This will return an error if + /// there is at least an epoch between the given epoch and the most recent + /// certificate. + async fn verify_certificate_chain(&self, epoch: Epoch) -> StdResult<()>; +} + +/// ## BufferedSingleSignatureStore +/// +/// Allow to buffer single signatures for later use when an open message isn't available yet. +#[cfg_attr(test, mockall::automock)] +#[async_trait] +pub trait BufferedSingleSignatureStore: Sync + Send { + /// Buffer a single signature for later use. + async fn buffer_signature( + &self, + signed_entity_type_discriminant: SignedEntityTypeDiscriminants, + signature: &SingleSignatures, + ) -> StdResult<()>; + + /// Get the buffered single signatures for the given signed entity discriminant. + async fn get_buffered_signatures( + &self, + signed_entity_type_discriminant: SignedEntityTypeDiscriminants, + ) -> StdResult>; + + /// Remove the given single signatures from the buffer. + async fn remove_buffered_signatures( + &self, + signed_entity_type_discriminant: SignedEntityTypeDiscriminants, + single_signatures: Vec, + ) -> StdResult<()>; +} diff --git a/mithril-aggregator/src/services/certifier/mod.rs b/mithril-aggregator/src/services/certifier/mod.rs new file mode 100644 index 00000000000..bfc330c7679 --- /dev/null +++ b/mithril-aggregator/src/services/certifier/mod.rs @@ -0,0 +1,14 @@ +//! ## Certifier Service +//! +//! This service is responsible for [OpenMessage] cycle of life. It creates open +//! messages and turn them into [Certificate]. To do so, it registers +//! single signatures and deal with the multi_signer for aggregate signature +//! creation. + +mod buffered_certifier; +mod certifier_service; +mod interface; + +pub use buffered_certifier::*; +pub use certifier_service::*; +pub use interface::*; diff --git a/mithril-aggregator/src/services/epoch_service.rs b/mithril-aggregator/src/services/epoch_service.rs index 187c6009097..e2926963670 100644 --- a/mithril-aggregator/src/services/epoch_service.rs +++ b/mithril-aggregator/src/services/epoch_service.rs @@ -78,6 +78,9 @@ pub trait EpochService: Sync + Send { /// Get the [protocol multi signer][ProtocolMultiSigner] for the current epoch fn protocol_multi_signer(&self) -> StdResult<&ProtocolMultiSigner>; + + /// Get the [protocol multi signer][ProtocolMultiSigner] for the next epoch + fn next_protocol_multi_signer(&self) -> StdResult<&ProtocolMultiSigner>; } struct EpochData { @@ -95,6 +98,7 @@ struct ComputedEpochData { aggregate_verification_key: ProtocolAggregateVerificationKey, next_aggregate_verification_key: ProtocolAggregateVerificationKey, protocol_multi_signer: ProtocolMultiSigner, + next_protocol_multi_signer: ProtocolMultiSigner, } /// Implementation of the [epoch service][EpochService]. @@ -267,6 +271,7 @@ impl EpochService for MithrilEpochService { next_aggregate_verification_key: next_protocol_multi_signer .compute_aggregate_verification_key(), protocol_multi_signer, + next_protocol_multi_signer, }); Ok(()) @@ -315,6 +320,10 @@ impl EpochService for MithrilEpochService { fn protocol_multi_signer(&self) -> StdResult<&ProtocolMultiSigner> { Ok(&self.unwrap_computed_data()?.protocol_multi_signer) } + + fn next_protocol_multi_signer(&self) -> StdResult<&ProtocolMultiSigner> { + Ok(&self.unwrap_computed_data()?.next_protocol_multi_signer) + } } #[cfg(test)] @@ -371,6 +380,7 @@ impl FakeEpochService { next_aggregate_verification_key: next_protocol_multi_signer .compute_aggregate_verification_key(), protocol_multi_signer, + next_protocol_multi_signer, }), inform_epoch_error: false, update_protocol_parameters_error: false, @@ -497,6 +507,10 @@ impl EpochService for FakeEpochService { fn protocol_multi_signer(&self) -> StdResult<&ProtocolMultiSigner> { Ok(&self.unwrap_computed_data()?.protocol_multi_signer) } + + fn next_protocol_multi_signer(&self) -> StdResult<&ProtocolMultiSigner> { + Ok(&self.unwrap_computed_data()?.next_protocol_multi_signer) + } } #[cfg(test)] @@ -738,15 +752,16 @@ mod tests { let avk = fixture.compute_avk(); let epoch = Epoch(4); let mut service = build_service(epoch, &fixture, &[]).await; + let signer_builder = SignerBuilder::new( + &fixture.signers_with_stake(), + &fixture.protocol_parameters(), + ) + .unwrap(); service.computed_epoch_data = Some(ComputedEpochData { aggregate_verification_key: avk.clone(), next_aggregate_verification_key: avk.clone(), - protocol_multi_signer: SignerBuilder::new( - &fixture.signers_with_stake(), - &fixture.protocol_parameters(), - ) - .unwrap() - .build_multi_signer(), + protocol_multi_signer: signer_builder.build_multi_signer(), + next_protocol_multi_signer: signer_builder.build_multi_signer(), }); service @@ -841,6 +856,10 @@ mod tests { "protocol_multi_signer", service.protocol_multi_signer().err(), ), + ( + "next_protocol_multi_signer", + service.next_protocol_multi_signer().err(), + ), ] { let error = res.unwrap_or_else(|| panic!("getting {name} should have returned an error")); @@ -881,6 +900,10 @@ mod tests { "protocol_multi_signer", service.protocol_multi_signer().err(), ), + ( + "next_protocol_multi_signer", + service.next_protocol_multi_signer().err(), + ), ] { let error = res.unwrap_or_else(|| panic!("getting {name} should have returned an error")); diff --git a/mithril-aggregator/src/tools/mod.rs b/mithril-aggregator/src/tools/mod.rs index 2f6be81ddc0..1df67dc565f 100644 --- a/mithril-aggregator/src/tools/mod.rs +++ b/mithril-aggregator/src/tools/mod.rs @@ -6,6 +6,7 @@ mod genesis; pub mod mocks; mod remote_file_uploader; mod signer_importer; +mod single_signature_authenticator; pub use certificates_hash_migrator::CertificatesHashMigrator; pub use digest_helpers::extract_digest_from_path; @@ -15,6 +16,7 @@ pub use remote_file_uploader::{GcpFileUploader, RemoteFileUploader}; pub use signer_importer::{ CExplorerSignerRetriever, SignersImporter, SignersImporterPersister, SignersImporterRetriever, }; +pub use single_signature_authenticator::*; #[cfg(test)] pub use remote_file_uploader::MockRemoteFileUploader; diff --git a/mithril-aggregator/src/tools/single_signature_authenticator.rs b/mithril-aggregator/src/tools/single_signature_authenticator.rs new file mode 100644 index 00000000000..768d13705eb --- /dev/null +++ b/mithril-aggregator/src/tools/single_signature_authenticator.rs @@ -0,0 +1,264 @@ +use slog::{debug, Logger}; +use std::sync::Arc; + +use mithril_common::entities::{SingleSignatureAuthenticationStatus, SingleSignatures}; +use mithril_common::StdResult; + +use crate::MultiSigner; + +/// Authenticates single signatures against a signed message. +pub struct SingleSignatureAuthenticator { + multi_signer: Arc, + logger: Logger, +} + +impl SingleSignatureAuthenticator { + /// Creates a new `SingleSignatureAuthenticator`. + pub fn new(multi_signer: Arc, logger: Logger) -> Self { + Self { + multi_signer, + logger, + } + } + + /// Authenticates a single signature against a signed message. + pub async fn authenticate( + &self, + single_signature: &mut SingleSignatures, + signed_message: &str, + ) -> StdResult<()> { + let is_authenticated = match self + .multi_signer + .verify_single_signature(signed_message, single_signature) + .await + { + Ok(()) => { + debug!( + self.logger, + "Single signature party authenticated for current stake distribution"; + "party_id" => &single_signature.party_id, + ); + true + } + Err(_error) => { + // Signers may detect epoch changes before the aggregator and send + // new signatures using the next epoch stake distribution + if self + .multi_signer + .verify_single_signature_for_next_stake_distribution( + signed_message, + single_signature, + ) + .await + .is_ok() + { + debug!( + self.logger, + "Single signature party authenticated for next stake distribution"; + "party_id" => &single_signature.party_id, + ); + true + } else { + debug!( + self.logger, + "Single signature party not authenticated"; + "party_id" => &single_signature.party_id, + ); + false + } + } + }; + + single_signature.authentication_status = if is_authenticated { + SingleSignatureAuthenticationStatus::Authenticated + } else { + SingleSignatureAuthenticationStatus::Unauthenticated + }; + + Ok(()) + } +} + +#[cfg(test)] +impl SingleSignatureAuthenticator { + pub(crate) fn new_that_authenticate_everything() -> Self { + let mut multi_signer = crate::multi_signer::MockMultiSigner::new(); + multi_signer + .expect_verify_single_signature() + .returning(|_, _| Ok(())); + multi_signer + .expect_verify_single_signature_for_next_stake_distribution() + .returning(|_, _| Ok(())); + + Self { + multi_signer: Arc::new(multi_signer), + logger: crate::test_tools::TestLogger::stdout(), + } + } + + pub(crate) fn new_that_reject_everything() -> Self { + let mut multi_signer = crate::multi_signer::MockMultiSigner::new(); + multi_signer + .expect_verify_single_signature() + .returning(|_, _| Err(anyhow::anyhow!("error"))); + multi_signer + .expect_verify_single_signature_for_next_stake_distribution() + .returning(|_, _| Err(anyhow::anyhow!("error"))); + + Self { + multi_signer: Arc::new(multi_signer), + logger: crate::test_tools::TestLogger::stdout(), + } + } +} + +#[cfg(test)] +mod tests { + use anyhow::anyhow; + + use crate::multi_signer::MockMultiSigner; + use crate::test_tools::TestLogger; + + use super::*; + + fn mock_multi_signer( + multi_signer_mock_config: impl FnOnce(&mut MockMultiSigner), + ) -> Arc { + let mut multi_signer = MockMultiSigner::new(); + multi_signer_mock_config(&mut multi_signer); + Arc::new(multi_signer) + } + + #[tokio::test] + async fn single_signature_against_valid_signed_message_for_current_stake_distribution_is_authenticated( + ) { + let signed_message = "signed_message".to_string(); + let mut single_signature = SingleSignatures { + authentication_status: SingleSignatureAuthenticationStatus::Unauthenticated, + ..SingleSignatures::fake("party_id", &signed_message) + }; + + let authenticator = SingleSignatureAuthenticator::new( + mock_multi_signer(|mock_config| { + mock_config + .expect_verify_single_signature() + .returning(|_, _| Ok(())); + }), + TestLogger::stdout(), + ); + + authenticator + .authenticate(&mut single_signature, &signed_message) + .await + .unwrap(); + + assert_eq!( + single_signature.authentication_status, + SingleSignatureAuthenticationStatus::Authenticated + ); + } + + #[tokio::test] + async fn single_signature_against_valid_signed_message_for_next_stake_distribution_is_authenticated( + ) { + let signed_message = "signed_message".to_string(); + let mut single_signature = SingleSignatures { + authentication_status: SingleSignatureAuthenticationStatus::Unauthenticated, + ..SingleSignatures::fake("party_id", &signed_message) + }; + + let authenticator = SingleSignatureAuthenticator::new( + mock_multi_signer(|mock_config| { + mock_config + .expect_verify_single_signature() + .returning(|_, _| Err(anyhow!("error"))); + mock_config + .expect_verify_single_signature_for_next_stake_distribution() + .returning(|_, _| Ok(())); + }), + TestLogger::stdout(), + ); + + authenticator + .authenticate(&mut single_signature, &signed_message) + .await + .unwrap(); + + assert_eq!( + single_signature.authentication_status, + SingleSignatureAuthenticationStatus::Authenticated + ); + } + + #[tokio::test] + async fn single_signature_against_invalid_signed_message_for_current_and_next_stake_distribution_is_not_authenticated( + ) { + let signed_message = "signed_message".to_string(); + let mut single_signature = SingleSignatures { + authentication_status: SingleSignatureAuthenticationStatus::Unauthenticated, + ..SingleSignatures::fake("party_id", &signed_message) + }; + + let authenticator = SingleSignatureAuthenticator::new( + mock_multi_signer(|mock_config| { + mock_config + .expect_verify_single_signature() + .returning(|_, _| Err(anyhow!("verify_single_signature error"))); + mock_config + .expect_verify_single_signature_for_next_stake_distribution() + .returning(|_, _| { + Err(anyhow!( + "verify_single_signature_for_next_stake_distribution error" + )) + }); + }), + TestLogger::stdout(), + ); + + authenticator + .authenticate(&mut single_signature, &signed_message) + .await + .unwrap(); + + assert_eq!( + single_signature.authentication_status, + SingleSignatureAuthenticationStatus::Unauthenticated + ); + } + + #[tokio::test] + async fn single_signature_previously_authenticated_but_fail_new_authentication_is_now_unauthenticated( + ) { + let signed_message = "signed_message".to_string(); + let mut single_signature = SingleSignatures { + authentication_status: SingleSignatureAuthenticationStatus::Authenticated, + ..SingleSignatures::fake("party_id", &signed_message) + }; + + let authenticator = SingleSignatureAuthenticator::new( + mock_multi_signer(|mock_config| { + mock_config + .expect_verify_single_signature() + .returning(|_, _| Err(anyhow!("verify_single_signature error"))); + mock_config + .expect_verify_single_signature_for_next_stake_distribution() + .returning(|_, _| { + Err(anyhow!( + "verify_single_signature_for_next_stake_distribution error" + )) + }); + }), + TestLogger::stdout(), + ); + + authenticator + .authenticate(&mut single_signature, &signed_message) + .await + .unwrap(); + + assert_eq!( + single_signature.authentication_status, + SingleSignatureAuthenticationStatus::Unauthenticated + ); + } +} diff --git a/mithril-aggregator/tests/create_certificate_with_buffered_signatures.rs b/mithril-aggregator/tests/create_certificate_with_buffered_signatures.rs new file mode 100644 index 00000000000..de60694d359 --- /dev/null +++ b/mithril-aggregator/tests/create_certificate_with_buffered_signatures.rs @@ -0,0 +1,107 @@ +mod test_extensions; + +use mithril_aggregator::Configuration; +use mithril_common::{ + entities::{ + BlockNumber, CardanoDbBeacon, CardanoTransactionsSigningConfig, ChainPoint, Epoch, + ProtocolParameters, SignedEntityType, SignedEntityTypeDiscriminants, SlotNumber, + StakeDistributionParty, TimePoint, + }, + test_utils::MithrilFixtureBuilder, +}; +use test_extensions::{utilities::get_test_dir, ExpectedCertificate, RuntimeTester}; + +#[tokio::test] +async fn create_certificate_with_buffered_signatures() { + let protocol_parameters = ProtocolParameters { + k: 5, + m: 150, + phi_f: 0.95, + }; + let configuration = Configuration { + protocol_parameters: protocol_parameters.clone(), + signed_entity_types: Some(SignedEntityTypeDiscriminants::CardanoTransactions.to_string()), + data_stores_directory: get_test_dir("create_certificate_with_buffered_signatures"), + cardano_transactions_signing_config: CardanoTransactionsSigningConfig { + security_parameter: BlockNumber(0), + step: BlockNumber(30), + }, + ..Configuration::new_sample() + }; + let mut tester = RuntimeTester::build( + TimePoint { + epoch: Epoch(1), + immutable_file_number: 1, + chain_point: ChainPoint { + slot_number: SlotNumber(10), + block_number: BlockNumber(100), + block_hash: "block_hash-100".to_string(), + }, + }, + configuration, + ) + .await; + + comment!("create signers & declare stake distribution"); + let fixture = MithrilFixtureBuilder::default() + .with_signers(10) + .with_protocol_parameters(protocol_parameters.clone()) + .build(); + + tester.init_state_from_fixture(&fixture).await.unwrap(); + + comment!("Bootstrap the genesis certificate"); + tester.register_genesis_certificate(&fixture).await.unwrap(); + + assert_last_certificate_eq!( + tester, + ExpectedCertificate::new_genesis( + CardanoDbBeacon::new("devnet".to_string(), 1, 1), + fixture.compute_and_encode_avk() + ) + ); + + comment!("Increase immutable number"); + tester.increase_immutable_number().await.unwrap(); + + comment!("start the runtime state machine"); + cycle!(tester, "ready"); + + comment!("signers send their single signature before the state machine is signing"); + tester + .send_authenticated_single_signatures( + SignedEntityTypeDiscriminants::MithrilStakeDistribution, + &fixture.signers_fixture(), + ) + .await + .unwrap(); + cycle!(tester, "signing"); + + comment!("register signers"); + tester + .register_signers(&fixture.signers_fixture()) + .await + .unwrap(); + + comment!("Using buffered signatures, the state machine should issue a certificate for the MithrilStakeDistribution"); + cycle!(tester, "ready"); + assert_last_certificate_eq!( + tester, + ExpectedCertificate::new( + CardanoDbBeacon::new("devnet".to_string(), 1, 2), + StakeDistributionParty::from_signers(fixture.signers_with_stake()).as_slice(), + fixture.compute_and_encode_avk(), + SignedEntityType::MithrilStakeDistribution(Epoch(1)), + ExpectedCertificate::genesis_identifier(&CardanoDbBeacon::new( + "devnet".to_string(), + 1, + 1 + )), + ) + ); + + tester.increase_epoch().await.unwrap(); + cycle!(tester, "idle"); + + cycle!(tester, "ready"); +} diff --git a/mithril-aggregator/tests/test_extensions/aggregator_observer.rs b/mithril-aggregator/tests/test_extensions/aggregator_observer.rs index af879a3e031..b633f43286b 100644 --- a/mithril-aggregator/tests/test_extensions/aggregator_observer.rs +++ b/mithril-aggregator/tests/test_extensions/aggregator_observer.rs @@ -43,7 +43,7 @@ impl AggregatorObserver { self.ticker_service.get_current_time_point().await.unwrap() } - /// Get the current [open message][OpenMessageWithSingleSignatures] for the given message type + /// Get the current [open message][OpenMessage] for the given message type pub async fn get_current_open_message( &self, discriminant: SignedEntityTypeDiscriminants, @@ -53,20 +53,27 @@ impl AggregatorObserver { self.certifier_service .get_open_message(&signed_entity_type) .await - .with_context(|| "Requesting current open message of type CardanoImmutableFilesFull should be not fail") + .with_context(|| { + format!( + "Requesting current open message of type '{discriminant}' should be not fail" + ) + }) } - /// Get the [entity type][SignedEntityType::CardanoImmutableFilesFull] of the current current open message - pub async fn get_current_signed_entity_type( + /// Compute the full [SignedEntityType] for the given discriminant based on the current + /// [TimePoint]. + pub async fn build_current_signed_entity_type( &self, discriminant: SignedEntityTypeDiscriminants, ) -> StdResult { - match self.get_current_open_message(discriminant).await? { - None => Err(anyhow!( - "An open message should be available for cardano immutables" - )), - Some(message) => Ok(message.signed_entity_type), - } + let time_point = self + .ticker_service + .get_current_time_point() + .await + .with_context(|| "Querying the current beacon should not fail")?; + + self.signed_entity_config + .time_point_to_signed_entity(discriminant, &time_point) } /// Get the last certificate produced by the aggregator @@ -98,22 +105,6 @@ impl AggregatorObserver { Ok(last_tx_snapshot) } - async fn build_current_signed_entity_type( - &self, - discriminant: SignedEntityTypeDiscriminants, - ) -> StdResult { - let time_point = self - .ticker_service - .get_current_time_point() - .await - .with_context(|| "Querying the current beacon should not fail")?; - - Ok(self - .signed_entity_config - .time_point_to_signed_entity(discriminant, &time_point) - .unwrap()) - } - pub async fn is_last_signed_entity( &self, signed_entity_type_expected: &SignedEntityType, diff --git a/mithril-aggregator/tests/test_extensions/runtime_tester.rs b/mithril-aggregator/tests/test_extensions/runtime_tester.rs index fe7d8ca65bb..c2ccf35fd16 100644 --- a/mithril-aggregator/tests/test_extensions/runtime_tester.rs +++ b/mithril-aggregator/tests/test_extensions/runtime_tester.rs @@ -16,8 +16,8 @@ use mithril_common::{ digesters::{DumbImmutableDigester, DumbImmutableFileObserver}, entities::{ BlockNumber, Certificate, CertificateSignature, ChainPoint, Epoch, ImmutableFileNumber, - SignedEntityType, SignedEntityTypeDiscriminants, SlotNumber, Snapshot, StakeDistribution, - TimePoint, + ProtocolMessagePartKey, SignedEntityType, SignedEntityTypeDiscriminants, + SingleSignatureAuthenticationStatus, SlotNumber, Snapshot, StakeDistribution, TimePoint, }, era::{adapters::EraReaderDummyAdapter, EraMarker, EraReader, SupportedEra}, test_utils::{ @@ -379,28 +379,69 @@ impl RuntimeTester { Ok(()) } - /// "Send", actually register, the given single signatures in the multi-signers + pub async fn send_authenticated_single_signatures( + &mut self, + discriminant: SignedEntityTypeDiscriminants, + signers: &[SignerFixture], + ) -> StdResult<()> { + self.send_single_signatures_with_auth_status( + discriminant, + signers, + SingleSignatureAuthenticationStatus::Authenticated, + ) + .await + } + pub async fn send_single_signatures( &mut self, discriminant: SignedEntityTypeDiscriminants, signers: &[SignerFixture], + ) -> StdResult<()> { + self.send_single_signatures_with_auth_status( + discriminant, + signers, + SingleSignatureAuthenticationStatus::Unauthenticated, + ) + .await + } + + async fn send_single_signatures_with_auth_status( + &mut self, + discriminant: SignedEntityTypeDiscriminants, + signers: &[SignerFixture], + authentication_status: SingleSignatureAuthenticationStatus, ) -> StdResult<()> { let certifier_service = self.dependencies.certifier_service.clone(); let signed_entity_type = self .observer - .get_current_signed_entity_type(discriminant) + .build_current_signed_entity_type(discriminant) .await?; - let message = certifier_service - .get_open_message(&signed_entity_type) - .await - .with_context(|| { - format!("An open message should exist for signed_entity_type: {signed_entity_type}") - })? - .ok_or(anyhow!("There should be a message to be signed."))? - .protocol_message; + + // Code copied from `AggregatorRunner::compute_protocol_message` + // Todo: Refactor this code to avoid code duplication by making the signable_builder_service + // able to retrieve the next avk by itself. + let mut message = self + .dependencies + .signable_builder_service + .compute_protocol_message(signed_entity_type.clone()) + .await?; + + let epoch_service = self.dependencies.epoch_service.read().await; + message.set_message_part( + ProtocolMessagePartKey::NextAggregateVerificationKey, + epoch_service + .next_aggregate_verification_key()? + .to_json_hex() + .with_context(|| "convert next avk to json hex failure")?, + ); for signer_fixture in signers { - if let Some(single_signatures) = signer_fixture.sign(&message) { + if let Some(mut single_signatures) = signer_fixture.sign(&message) { + if authentication_status == SingleSignatureAuthenticationStatus::Authenticated { + single_signatures.authentication_status = + SingleSignatureAuthenticationStatus::Authenticated; + } + certifier_service .register_single_signature(&signed_entity_type, &single_signatures) .await @@ -493,7 +534,7 @@ impl RuntimeTester { ) -> StdResult<()> { let signed_entity_type = self .observer - .get_current_signed_entity_type(discriminant) + .build_current_signed_entity_type(discriminant) .await?; let mut open_message = self .open_message_repository diff --git a/mithril-common/Cargo.toml b/mithril-common/Cargo.toml index 3863cfe6358..0116e694bcf 100644 --- a/mithril-common/Cargo.toml +++ b/mithril-common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mithril-common" -version = "0.4.51" +version = "0.4.52" description = "Common types, interfaces, and utilities for Mithril nodes." authors = { workspace = true } edition = { workspace = true } diff --git a/mithril-common/src/certificate_chain/certificate_genesis.rs b/mithril-common/src/certificate_chain/certificate_genesis.rs index 6774c2c1aaa..d35626e74cf 100644 --- a/mithril-common/src/certificate_chain/certificate_genesis.rs +++ b/mithril-common/src/certificate_chain/certificate_genesis.rs @@ -14,7 +14,9 @@ use crate::{ Certificate, CertificateMetadata, CertificateSignature, Epoch, ImmutableFileNumber, ProtocolMessage, ProtocolMessagePartKey, ProtocolParameters, }, - era_deprecate, StdResult, + era_deprecate, + protocol::ToMessage, + StdResult, }; /// [CertificateGenesisProducer] related errors. @@ -51,15 +53,15 @@ impl CertificateGenesisProducer { } /// Sign the Genesis protocol message (test only) - pub fn sign_genesis_protocol_message( + pub fn sign_genesis_protocol_message( &self, - genesis_protocol_message: ProtocolMessage, + genesis_message: T, ) -> Result { Ok(self .genesis_signer .as_ref() .ok_or_else(CertificateGenesisProducerError::MissingGenesisSigner)? - .sign(genesis_protocol_message.compute_hash().as_bytes())) + .sign(genesis_message.to_message().as_bytes())) } era_deprecate!("Remove immutable_file_number"); diff --git a/mithril-common/src/crypto_helper/tests_setup.rs b/mithril-common/src/crypto_helper/tests_setup.rs index 4713e0fc475..733bda9f55c 100644 --- a/mithril-common/src/crypto_helper/tests_setup.rs +++ b/mithril-common/src/crypto_helper/tests_setup.rs @@ -57,7 +57,9 @@ fn setup_protocol_initializer( stake: Stake, protocol_parameters: &ProtocolParameters, ) -> ProtocolInitializer { - let protocol_initializer_seed: [u8; 32] = party_id.as_bytes()[..32].try_into().unwrap(); + let protocol_initializer_seed: [u8; 32] = format!("{party_id:<032}").as_bytes()[..32] + .try_into() + .unwrap(); let mut protocol_initializer_rng = ChaCha20Rng::from_seed(protocol_initializer_seed); let kes_period = kes_secret_key_path.as_ref().map(|_| 0); let protocol_initializer: ProtocolInitializer = ProtocolInitializer::setup( diff --git a/mithril-common/src/entities/protocol_message.rs b/mithril-common/src/entities/protocol_message.rs index 4eb3d70b1b2..93b80111a8a 100644 --- a/mithril-common/src/entities/protocol_message.rs +++ b/mithril-common/src/entities/protocol_message.rs @@ -2,6 +2,8 @@ use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; use std::{collections::BTreeMap, fmt::Display}; +use crate::protocol::ToMessage; + /// The key of a ProtocolMessage #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] pub enum ProtocolMessagePartKey { @@ -95,6 +97,12 @@ impl ProtocolMessage { } } +impl ToMessage for ProtocolMessage { + fn to_message(&self) -> String { + self.compute_hash() + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/mithril-common/src/entities/single_signatures.rs b/mithril-common/src/entities/single_signatures.rs index 1cd5f66f423..a136604b5dd 100644 --- a/mithril-common/src/entities/single_signatures.rs +++ b/mithril-common/src/entities/single_signatures.rs @@ -21,41 +21,33 @@ pub struct SingleSignatures { #[serde(rename = "indexes")] pub won_indexes: Vec, - /// Message that is signed by the signer - /// - /// Used to buffer the signature for later if the aggregator has yet to create an open message - /// for the signed entity type. - #[serde(skip_serializing_if = "Option::is_none")] - pub signed_message: Option, + /// Status of the authentication of the signer that emitted the signature + #[serde(skip)] + pub authentication_status: SingleSignatureAuthenticationStatus, +} + +/// Status of the authentication of the signer that emitted the signature +#[derive(Debug, Copy, Clone, PartialEq, Eq, Default, Serialize, Deserialize)] +pub enum SingleSignatureAuthenticationStatus { + /// The signer that emitted the signature is authenticated + Authenticated, + /// The signer that emitted the signature is not authenticated + #[default] + Unauthenticated, } impl SingleSignatures { /// `SingleSignatures` factory - pub fn new( - party_id: PartyId, + pub fn new>( + party_id: T, signature: ProtocolSingleSignature, won_indexes: Vec, ) -> SingleSignatures { SingleSignatures { - party_id, - signature, - won_indexes, - signed_message: None, - } - } - - /// `SingleSignatures` factory including the signed message - pub fn new_with_signed_message( - party_id: PartyId, - signature: ProtocolSingleSignature, - won_indexes: Vec, - signed_message: String, - ) -> SingleSignatures { - SingleSignatures { - party_id, + party_id: party_id.into(), signature, won_indexes, - signed_message: Some(signed_message), + authentication_status: SingleSignatureAuthenticationStatus::Unauthenticated, } } @@ -63,6 +55,41 @@ impl SingleSignatures { pub fn to_protocol_signature(&self) -> StmSig { self.signature.clone().into() } + + /// Check that the signer that emitted the signature is authenticated + pub fn is_authenticated(&self) -> bool { + self.authentication_status == SingleSignatureAuthenticationStatus::Authenticated + } +} + +cfg_test_tools! { +impl SingleSignatures { + /// Create a fake [SingleSignatures] with valid cryptographic data for testing purposes. + // TODO: this method is slow due to the fixture creation, we should either make + // the fixture faster or find a faster alternative. + pub fn fake, TMessage: Into>(party_id: TPartyId, message: TMessage) -> Self { + use crate::entities::{ProtocolParameters}; + use crate::test_utils::{MithrilFixtureBuilder, StakeDistributionGenerationMethod}; + + let party_id = party_id.into(); + let message = message.into(); + + let fixture = MithrilFixtureBuilder::default() + .with_stake_distribution(StakeDistributionGenerationMethod::Custom( + std::collections::BTreeMap::from([(party_id.to_string(), 100)]), + )) + .with_protocol_parameters(ProtocolParameters::new(1, 1, 1.0)) + .build(); + let signature = fixture.signers_fixture()[0].sign(&message).unwrap(); + + Self { + party_id, + signature: signature.signature, + won_indexes: vec![10, 15], + authentication_status: SingleSignatureAuthenticationStatus::Unauthenticated, + } + } +} } impl Debug for SingleSignatures { diff --git a/mithril-common/src/protocol/mod.rs b/mithril-common/src/protocol/mod.rs index 860d3ab90f0..bc99cbae067 100644 --- a/mithril-common/src/protocol/mod.rs +++ b/mithril-common/src/protocol/mod.rs @@ -11,3 +11,21 @@ mod single_signer; pub use multi_signer::MultiSigner; pub use signer_builder::{SignerBuilder, SignerBuilderError}; pub use single_signer::SingleSigner; + +/// Trait to convert a type to a message that can be signed or verified by the Mithril protocol. +pub trait ToMessage: Sync + Send { + /// Return a String representation of the message. + fn to_message(&self) -> String; +} + +impl ToMessage for String { + fn to_message(&self) -> String { + self.clone() + } +} + +impl ToMessage for &str { + fn to_message(&self) -> String { + self.to_string() + } +} diff --git a/mithril-common/src/protocol/multi_signer.rs b/mithril-common/src/protocol/multi_signer.rs index 994b705a3db..52c52240486 100644 --- a/mithril-common/src/protocol/multi_signer.rs +++ b/mithril-common/src/protocol/multi_signer.rs @@ -6,7 +6,8 @@ use crate::{ ProtocolAggregateVerificationKey, ProtocolAggregationError, ProtocolClerk, ProtocolMultiSignature, }, - entities::{ProtocolMessage, SingleSignatures}, + entities::SingleSignatures, + protocol::ToMessage, StdResult, }; @@ -25,10 +26,10 @@ impl MultiSigner { } /// Aggregate the given single signatures into a multi-signature - pub fn aggregate_single_signatures( + pub fn aggregate_single_signatures( &self, single_signatures: &[SingleSignatures], - protocol_message: &ProtocolMessage, + message: &T, ) -> Result { let protocol_signatures: Vec<_> = single_signatures .iter() @@ -36,10 +37,7 @@ impl MultiSigner { .collect(); self.protocol_clerk - .aggregate( - &protocol_signatures, - protocol_message.compute_hash().as_bytes(), - ) + .aggregate(&protocol_signatures, message.to_message().as_bytes()) .map(|multi_sig| multi_sig.into()) } @@ -49,9 +47,9 @@ impl MultiSigner { } /// Verify a single signature - pub fn verify_single_signature( + pub fn verify_single_signature( &self, - message: &ProtocolMessage, + message: &T, single_signature: &SingleSignatures, ) -> StdResult<()> { let protocol_signature = single_signature.to_protocol_signature(); @@ -76,7 +74,7 @@ impl MultiSigner { &vk, &stake, &avk, - message.compute_hash().as_bytes(), + message.to_message().as_bytes(), ) .with_context(|| { format!( @@ -94,7 +92,7 @@ mod test { use mithril_stm::StmSignatureError; use crate::{ - entities::{ProtocolMessagePartKey, ProtocolParameters}, + entities::{ProtocolMessage, ProtocolMessagePartKey, ProtocolParameters}, protocol::SignerBuilder, test_utils::fake_keys, test_utils::{MithrilFixture, MithrilFixtureBuilder, StakeDistributionGenerationMethod}, diff --git a/mithril-common/src/protocol/single_signer.rs b/mithril-common/src/protocol/single_signer.rs index 97a430e2642..b707d6bb722 100644 --- a/mithril-common/src/protocol/single_signer.rs +++ b/mithril-common/src/protocol/single_signer.rs @@ -1,6 +1,7 @@ use crate::{ crypto_helper::ProtocolSigner, - entities::{PartyId, ProtocolMessage, SingleSignatures}, + entities::{PartyId, SingleSignatures}, + protocol::ToMessage, StdResult, }; @@ -22,17 +23,16 @@ impl SingleSigner { /// Issue a single signature for the given message. /// /// If no lottery are won None will be returned. - pub fn sign(&self, message: &ProtocolMessage) -> StdResult> { - let signed_message = message.compute_hash(); + pub fn sign(&self, message: &T) -> StdResult> { + let signed_message = message.to_message(); match self.protocol_signer.sign(signed_message.as_bytes()) { Some(signature) => { let won_indexes = signature.indexes.clone(); - Ok(Some(SingleSignatures::new_with_signed_message( + Ok(Some(SingleSignatures::new( self.party_id.to_owned(), signature.into(), won_indexes, - signed_message, ))) } None => Ok(None), @@ -74,32 +74,4 @@ mod test { assert!(signature.is_some()); } - - #[test] - fn embed_signed_message_in_issued_signature() { - let fixture = MithrilFixtureBuilder::default().with_signers(3).build(); - let signers = fixture.signers_fixture(); - let signer = signers.first().unwrap(); - - let (single_signer, _) = SignerBuilder::new( - &fixture.signers_with_stake(), - &fixture.protocol_parameters(), - ) - .unwrap() - .build_test_single_signer( - signer.signer_with_stake.clone(), - signer.kes_secret_key_path(), - ) - .unwrap(); - - let message = ProtocolMessage::default(); - let signature = single_signer - .sign(&message) - .expect("Single signer should be able to issue single signature"); - - assert_eq!( - Some(message.compute_hash()), - signature.and_then(|s| s.signed_message) - ); - } } diff --git a/mithril-common/src/test_utils/fixture_builder.rs b/mithril-common/src/test_utils/fixture_builder.rs index fd937c7900d..1ba9667f9e6 100644 --- a/mithril-common/src/test_utils/fixture_builder.rs +++ b/mithril-common/src/test_utils/fixture_builder.rs @@ -138,7 +138,7 @@ impl MithrilFixtureBuilder { &mut kes_keys_seed, ) } else { - format!("{party_index:<032}") + party_index.to_string() } }); signers_party_ids.collect::>() @@ -265,11 +265,11 @@ mod tests { #[test] fn changing_party_id_seed_change_all_builded_party_ids() { let first_signers = MithrilFixtureBuilder::default() - .with_signers(100) + .with_signers(20) .build() .signers_with_stake(); let different_party_id_seed_signers = MithrilFixtureBuilder::default() - .with_signers(100) + .with_signers(20) .with_party_id_seed([1u8; 32]) .build() .signers_with_stake(); diff --git a/mithril-common/src/test_utils/mithril_fixture.rs b/mithril-common/src/test_utils/mithril_fixture.rs index f473d7042af..4c339834e65 100644 --- a/mithril-common/src/test_utils/mithril_fixture.rs +++ b/mithril-common/src/test_utils/mithril_fixture.rs @@ -15,10 +15,10 @@ use crate::{ }, entities::{ Certificate, Epoch, HexEncodedAggregateVerificationKey, ImmutableFileNumber, PartyId, - ProtocolMessage, ProtocolParameters, Signer, SignerWithStake, SingleSignatures, Stake, - StakeDistribution, StakeDistributionParty, + ProtocolParameters, Signer, SignerWithStake, SingleSignatures, Stake, StakeDistribution, + StakeDistributionParty, }, - protocol::SignerBuilder, + protocol::{SignerBuilder, ToMessage}, }; /// A fixture of Mithril data types. @@ -199,7 +199,7 @@ impl MithrilFixture { /// Make all underlying signers sign the given message, filter the resulting list to remove /// the signers that did not sign because they loosed the lottery. - pub fn sign_all(&self, message: &ProtocolMessage) -> Vec { + pub fn sign_all(&self, message: &T) -> Vec { self.signers .par_iter() .filter_map(|s| s.sign(message)) @@ -227,9 +227,10 @@ impl From for Vec { impl SignerFixture { /// Sign the given protocol message. - pub fn sign(&self, protocol_message: &ProtocolMessage) -> Option { + pub fn sign(&self, message: &T) -> Option { + let message = message.to_message(); self.protocol_signer - .sign(protocol_message.compute_hash().as_bytes()) + .sign(message.as_bytes()) .map(|signature| { let won_indexes = signature.indexes.clone(); diff --git a/mithril-signer/Cargo.toml b/mithril-signer/Cargo.toml index 98d255f744e..6c43d149a72 100644 --- a/mithril-signer/Cargo.toml +++ b/mithril-signer/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mithril-signer" -version = "0.2.183" +version = "0.2.184" description = "A Mithril Signer" authors = { workspace = true } edition = { workspace = true } diff --git a/mithril-signer/src/message_adapters/to_register_signature_message.rs b/mithril-signer/src/message_adapters/to_register_signature_message.rs index 666c935cd5c..460f155ef4e 100644 --- a/mithril-signer/src/message_adapters/to_register_signature_message.rs +++ b/mithril-signer/src/message_adapters/to_register_signature_message.rs @@ -1,15 +1,23 @@ use anyhow::Context; -use mithril_common::entities::{SignedEntityType, SingleSignatures}; +use mithril_common::entities::{ProtocolMessage, SignedEntityType, SingleSignatures}; use mithril_common::messages::{RegisterSignatureMessage, TryToMessageAdapter}; +use mithril_common::protocol::ToMessage; use mithril_common::StdResult; pub struct ToRegisterSignatureMessageAdapter; -impl TryToMessageAdapter<(SignedEntityType, SingleSignatures), RegisterSignatureMessage> - for ToRegisterSignatureMessageAdapter +impl + TryToMessageAdapter< + (SignedEntityType, SingleSignatures, &ProtocolMessage), + RegisterSignatureMessage, + > for ToRegisterSignatureMessageAdapter { fn try_adapt( - (signed_entity_type, single_signature): (SignedEntityType, SingleSignatures), + (signed_entity_type, single_signature, protocol_message): ( + SignedEntityType, + SingleSignatures, + &ProtocolMessage, + ), ) -> StdResult { let message = RegisterSignatureMessage { signed_entity_type, @@ -18,7 +26,7 @@ impl TryToMessageAdapter<(SignedEntityType, SingleSignatures), RegisterSignature "'ToRegisterSignatureMessageAdapter' can not convert the single signature" })?, won_indexes: single_signature.won_indexes, - signed_message: single_signature.signed_message, + signed_message: Some(protocol_message.to_message()), }; Ok(message) @@ -35,14 +43,15 @@ mod tests { fn adapt_ok() { let message: RegisterSignatureMessage = ToRegisterSignatureMessageAdapter::try_adapt(( SignedEntityType::dummy(), - SingleSignatures { - signed_message: Some("signed_message".to_string()), - ..fake_data::single_signatures([1, 3].to_vec()) - }, + fake_data::single_signatures([1, 3].to_vec()), + &ProtocolMessage::default(), )) .unwrap(); assert_eq!("party_id".to_string(), message.party_id); - assert_eq!(Some("signed_message".to_string()), message.signed_message); + assert_eq!( + Some(ProtocolMessage::default().to_message()), + message.signed_message + ); } } diff --git a/mithril-signer/src/runtime/runner.rs b/mithril-signer/src/runtime/runner.rs index ed6d540ba79..17581540bb5 100644 --- a/mithril-signer/src/runtime/runner.rs +++ b/mithril-signer/src/runtime/runner.rs @@ -61,6 +61,7 @@ pub trait Runner: Send + Sync { &self, signed_entity_type: &SignedEntityType, maybe_signature: Option, + signed_message: &ProtocolMessage, ) -> StdResult<()>; /// Read the current era and update the EraChecker. @@ -390,6 +391,7 @@ impl Runner for SignerRunner { &self, signed_entity_type: &SignedEntityType, maybe_signature: Option, + protocol_message: &ProtocolMessage, ) -> StdResult<()> { debug!("RUNNER: send_single_signature"); @@ -398,7 +400,7 @@ impl Runner for SignerRunner { self.services .certificate_handler - .register_signatures(signed_entity_type, &single_signatures) + .register_signatures(signed_entity_type, &single_signatures, protocol_message) .await?; Ok(()) @@ -1047,7 +1049,7 @@ mod tests { certificate_handler .expect_register_signatures() .once() - .returning(|_, _| Ok(())); + .returning(|_, _, _| Ok(())); services.certificate_handler = Arc::new(certificate_handler); let runner = init_runner(Some(services), None).await; @@ -1055,6 +1057,7 @@ mod tests { .send_single_signature( &SignedEntityType::dummy(), Some(fake_data::single_signatures(vec![2, 5, 12])), + &ProtocolMessage::default(), ) .await .expect("send_single_signature should not fail"); diff --git a/mithril-signer/src/runtime/state_machine.rs b/mithril-signer/src/runtime/state_machine.rs index 057254d0a05..624c84cfeed 100644 --- a/mithril-signer/src/runtime/state_machine.rs +++ b/mithril-signer/src/runtime/state_machine.rs @@ -444,7 +444,7 @@ impl StateMachine { message: format!("Could not compute single signature during 'ready to sign → ready to sign' phase (current epoch {current_epoch:?})"), nested_error: Some(e) })?; - self.runner.send_single_signature(signed_entity_type, single_signatures).await + self.runner.send_single_signature(signed_entity_type, single_signatures, &message).await .map_err(|e| RuntimeError::KeepState { message: format!("Could not send single signature during 'ready to sign → ready to sign' phase (current epoch {current_epoch:?})"), nested_error: Some(e) @@ -952,7 +952,7 @@ mod tests { runner .expect_send_single_signature() .once() - .returning(|_, _| Ok(())); + .returning(|_, _, _| Ok(())); runner .expect_can_sign_signed_entity_type() .once() diff --git a/mithril-signer/src/services/aggregator_client.rs b/mithril-signer/src/services/aggregator_client.rs index ca211fb049c..e004bbc356d 100644 --- a/mithril-signer/src/services/aggregator_client.rs +++ b/mithril-signer/src/services/aggregator_client.rs @@ -8,7 +8,8 @@ use thiserror::Error; use mithril_common::{ api_version::APIVersionProvider, entities::{ - CertificatePending, Epoch, EpochSettings, SignedEntityType, Signer, SingleSignatures, + CertificatePending, Epoch, EpochSettings, ProtocolMessage, SignedEntityType, Signer, + SingleSignatures, }, messages::{ AggregatorFeaturesMessage, CertificatePendingMessage, EpochSettingsMessage, @@ -17,9 +18,6 @@ use mithril_common::{ StdError, MITHRIL_API_VERSION_HEADER, MITHRIL_SIGNER_VERSION_HEADER, }; -#[cfg(test)] -use mockall::automock; - use crate::message_adapters::{ FromEpochSettingsAdapter, FromPendingCertificateMessageAdapter, ToRegisterSignatureMessageAdapter, ToRegisterSignerMessageAdapter, @@ -74,7 +72,7 @@ impl AggregatorClientError { } /// Trait for mocking and testing a `AggregatorClient` -#[cfg_attr(test, automock)] +#[cfg_attr(test, mockall::automock)] #[async_trait] pub trait AggregatorClient: Sync + Send { /// Retrieves epoch settings from the aggregator @@ -98,6 +96,7 @@ pub trait AggregatorClient: Sync + Send { &self, signed_entity_type: &SignedEntityType, signatures: &SingleSignatures, + protocol_message: &ProtocolMessage, ) -> Result<(), AggregatorClientError>; /// Retrieves aggregator features message from the aggregator @@ -280,12 +279,14 @@ impl AggregatorClient for AggregatorHTTPClient { &self, signed_entity_type: &SignedEntityType, signatures: &SingleSignatures, + protocol_message: &ProtocolMessage, ) -> Result<(), AggregatorClientError> { debug!("Register signatures"); let url = format!("{}/register-signatures", self.aggregator_endpoint); let register_single_signature_message = ToRegisterSignatureMessageAdapter::try_adapt(( signed_entity_type.to_owned(), signatures.to_owned(), + protocol_message, )) .map_err(|e| AggregatorClientError::Adapter(anyhow!(e)))?; let response = self @@ -435,6 +436,7 @@ pub(crate) mod dumb { &self, _signed_entity_type: &SignedEntityType, _signatures: &SingleSignatures, + _protocol_message: &ProtocolMessage, ) -> Result<(), AggregatorClientError> { Ok(()) } @@ -933,7 +935,11 @@ mod tests { None, ); let register_signatures = certificate_handler - .register_signatures(&SignedEntityType::dummy(), &single_signatures) + .register_signatures( + &SignedEntityType::dummy(), + &single_signatures, + &ProtocolMessage::default(), + ) .await; register_signatures.expect("unexpected error"); } @@ -954,7 +960,11 @@ mod tests { None, ); let error = certificate_handler - .register_signatures(&SignedEntityType::dummy(), &single_signatures) + .register_signatures( + &SignedEntityType::dummy(), + &single_signatures, + &ProtocolMessage::default(), + ) .await .unwrap_err(); @@ -982,7 +992,11 @@ mod tests { None, ); match certificate_handler - .register_signatures(&SignedEntityType::dummy(), &single_signatures) + .register_signatures( + &SignedEntityType::dummy(), + &single_signatures, + &ProtocolMessage::default(), + ) .await .unwrap_err() { @@ -1006,7 +1020,11 @@ mod tests { None, ); match certificate_handler - .register_signatures(&SignedEntityType::dummy(), &single_signatures) + .register_signatures( + &SignedEntityType::dummy(), + &single_signatures, + &ProtocolMessage::default(), + ) .await .unwrap_err() { @@ -1030,7 +1048,11 @@ mod tests { None, ); match certificate_handler - .register_signatures(&SignedEntityType::dummy(), &single_signatures) + .register_signatures( + &SignedEntityType::dummy(), + &single_signatures, + &ProtocolMessage::default(), + ) .await .unwrap_err() { @@ -1055,7 +1077,11 @@ mod tests { ); let error = certificate_handler - .register_signatures(&SignedEntityType::dummy(), &single_signatures) + .register_signatures( + &SignedEntityType::dummy(), + &single_signatures, + &ProtocolMessage::default(), + ) .await .expect_err("register_signatures should fail"); diff --git a/mithril-signer/tests/test_extensions/certificate_handler.rs b/mithril-signer/tests/test_extensions/certificate_handler.rs index 6fc6611a6d0..4d6864cdcb1 100644 --- a/mithril-signer/tests/test_extensions/certificate_handler.rs +++ b/mithril-signer/tests/test_extensions/certificate_handler.rs @@ -4,8 +4,8 @@ use anyhow::anyhow; use async_trait::async_trait; use mithril_common::{ entities::{ - CertificatePending, Epoch, EpochSettings, SignedEntityConfig, SignedEntityType, - SignedEntityTypeDiscriminants, Signer, SingleSignatures, TimePoint, + CertificatePending, Epoch, EpochSettings, ProtocolMessage, SignedEntityConfig, + SignedEntityType, SignedEntityTypeDiscriminants, Signer, SingleSignatures, TimePoint, }, messages::AggregatorFeaturesMessage, test_utils::fake_data, @@ -157,6 +157,7 @@ impl AggregatorClient for FakeAggregator { &self, _signed_entity_type: &SignedEntityType, _signatures: &SingleSignatures, + _protocol_message: &ProtocolMessage, ) -> Result<(), AggregatorClientError> { Ok(()) } diff --git a/openapi.yaml b/openapi.yaml index 40822ccb130..9f96bd7080d 100644 --- a/openapi.yaml +++ b/openapi.yaml @@ -4,7 +4,7 @@ info: # `mithril-common/src/lib.rs` file. If you plan to update it # here to reflect changes in the API, please also update the constant in the # Rust file. - version: 0.1.29 + version: 0.1.30 title: Mithril Aggregator Server description: | The REST API provided by a Mithril Aggregator Node in a Mithril network. @@ -567,6 +567,8 @@ paths: responses: "201": description: signatures registration succeeded + "202": + description: signatures registration received and queued for later processing "400": description: signatures registration bad request content: