Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement open_message timeout in aggregator #1401

Merged
merged 9 commits into from
Dec 18, 2023
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion mithril-aggregator/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mithril-aggregator"
version = "0.4.20"
version = "0.4.21"
description = "A Mithril Aggregator server"
authors = { workspace = true }
edition = { workspace = true }
Expand Down
9 changes: 9 additions & 0 deletions mithril-aggregator/src/database/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -601,5 +601,14 @@ alter table signer add column last_registered_at text null;
update signer set last_registered_at = created_at;
"#,
),
// Migration 20
// Alter `open_message` table to add `expires_at` and 'is_expired' fields
SqlMigration::new(
20,
r#"
alter table open_message add column is_expired bool not null default false;
alter table open_message add column expires_at text null;
"#,
),
]
}
107 changes: 94 additions & 13 deletions mithril-aggregator/src/database/provider/open_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,14 @@ pub struct OpenMessageRecord {
/// Has this open message been converted into a certificate?
pub is_certified: bool,

/// Has this open message expired
pub is_expired: bool,

/// Message creation datetime, it is set by the database.
pub created_at: DateTime<Utc>,

/// Message expiration datetime, if it exists.
pub expires_at: Option<DateTime<Utc>>,
}

impl OpenMessageRecord {
Expand All @@ -54,7 +60,9 @@ impl OpenMessageRecord {
signed_entity_type,
protocol_message: ProtocolMessage::new(),
is_certified: false,
is_expired: false,
created_at: Utc::now(),
expires_at: None,
}
}
}
Expand All @@ -67,7 +75,9 @@ impl From<OpenMessageWithSingleSignaturesRecord> for OpenMessageRecord {
signed_entity_type: value.signed_entity_type,
protocol_message: value.protocol_message,
is_certified: value.is_certified,
is_expired: value.is_expired,
created_at: value.created_at,
expires_at: value.expires_at,
}
}
}
Expand Down Expand Up @@ -109,21 +119,29 @@ impl SqLiteEntity for OpenMessageRecord {
})?;
let signed_entity_type = SignedEntityType::hydrate(signed_entity_type_id, &beacon_str)?;
let is_certified = row.read::<i64, _>(5) != 0;
let datetime = &row.read::<&str, _>(6);
let datetime = &row.read::<&str, _>(7);
let created_at =
DateTime::parse_from_rfc3339(datetime).map_err(|e| {
HydrationError::InvalidData(format!(
"Could not turn open_message.created_at field value '{datetime}' to rfc3339 Datetime. Error: {e}"
))
})?.with_timezone(&Utc);

let is_expired = row.read::<i64, _>(6) != 0;
let datetime = &row.read::<Option<&str>, _>(8);
let expires_at = datetime.map(|datetime| DateTime::parse_from_rfc3339(datetime).map_err(|e| {
HydrationError::InvalidData(format!(
"Could not turn open_message.expires_at field value '{datetime}' to rfc3339 Datetime. Error: {e}"
))
})).transpose()?.map(|datetime| datetime.with_timezone(&Utc));
let open_message = Self {
open_message_id,
epoch: Epoch(epoch_val),
signed_entity_type,
protocol_message,
is_certified,
is_expired,
created_at,
expires_at,
};

Ok(open_message)
Expand Down Expand Up @@ -153,7 +171,9 @@ impl SqLiteEntity for OpenMessageRecord {
"text",
),
("is_certified", "{:open_message:}.is_certified", "bool"),
("is_expired", "{:open_message:}.is_expired", "bool"),
("created_at", "{:open_message:}.created_at", "text"),
("expires_at", "{:open_message:}.expires_at", "text"),
])
}
}
Expand Down Expand Up @@ -184,6 +204,10 @@ impl<'client> OpenMessageProvider<'client> {
))
}

fn get_expired_entity_type_condition(&self, now: &str) -> WhereCondition {
WhereCondition::new("expires_at < ?*", vec![Value::String(now.to_string())])
}

// Useful in test and probably in the future.
#[allow(dead_code)]
fn get_open_message_id_condition(&self, open_message_id: &str) -> WhereCondition {
Expand Down Expand Up @@ -227,14 +251,18 @@ impl<'client> InsertOpenMessageProvider<'client> {
signed_entity_type: &SignedEntityType,
protocol_message: &ProtocolMessage,
) -> StdResult<WhereCondition> {
let expression = "(open_message_id, epoch_setting_id, beacon, signed_entity_type_id, protocol_message, created_at) values (?*, ?*, ?*, ?*, ?*, ?*)";
let expression = "(open_message_id, epoch_setting_id, beacon, signed_entity_type_id, protocol_message, expires_at, created_at) values (?*, ?*, ?*, ?*, ?*, ?*, ?*)";
let beacon_str = signed_entity_type.get_json_beacon()?;
let parameters = vec![
Value::String(Uuid::new_v4().to_string()),
Value::Integer(epoch.try_into()?),
Value::String(beacon_str),
Value::Integer(signed_entity_type.index() as i64),
Value::String(serde_json::to_string(protocol_message)?),
signed_entity_type
.get_open_message_timeout()
.map(|t| Value::String((Utc::now() + t).to_rfc3339()))
.unwrap_or(Value::Null),
Value::String(Utc::now().to_rfc3339()),
];

Expand Down Expand Up @@ -267,8 +295,8 @@ impl<'client> UpdateOpenMessageProvider<'client> {

fn get_update_condition(&self, open_message: &OpenMessageRecord) -> StdResult<WhereCondition> {
let expression = "epoch_setting_id = ?*, beacon = ?*, \
signed_entity_type_id = ?*, protocol_message = ?*, is_certified = ?* \
where open_message_id = ?*";
signed_entity_type_id = ?*, protocol_message = ?*, is_certified = ?*, \
is_expired = ?*, expires_at = ?* where open_message_id = ?*";
let beacon_str = open_message.signed_entity_type.get_json_beacon()?;
let parameters = vec![
Value::Integer(
Expand All @@ -281,6 +309,11 @@ where open_message_id = ?*";
Value::Integer(open_message.signed_entity_type.index() as i64),
Value::String(serde_json::to_string(&open_message.protocol_message)?),
Value::Integer(open_message.is_certified as i64),
Value::Integer(open_message.is_expired as i64),
open_message
.expires_at
.map(|d| Value::String(d.to_rfc3339()))
.unwrap_or(Value::Null),
Value::String(open_message.open_message_id.to_string()),
];

Expand Down Expand Up @@ -350,19 +383,25 @@ pub struct OpenMessageWithSingleSignaturesRecord {
/// Has this message been converted into a Certificate?
pub is_certified: bool,

/// Has this open message expired
pub is_expired: bool,

/// associated single signatures
pub single_signatures: Vec<SingleSignatures>,

/// Message creation datetime, it is set by the database.
pub created_at: DateTime<Utc>,

/// Message expiration datetime, if it exists.
pub expires_at: Option<DateTime<Utc>>,
}

impl SqLiteEntity for OpenMessageWithSingleSignaturesRecord {
fn hydrate(row: Row) -> Result<Self, HydrationError>
where
Self: Sized,
{
let single_signatures = &row.read::<&str, _>(7);
let single_signatures = &row.read::<&str, _>(9);
let single_signatures: Vec<SingleSignatures> = serde_json::from_str(single_signatures)
.map_err(|e| {
HydrationError::InvalidData(format!(
Expand All @@ -378,8 +417,10 @@ impl SqLiteEntity for OpenMessageWithSingleSignaturesRecord {
signed_entity_type: open_message.signed_entity_type,
protocol_message: open_message.protocol_message,
is_certified: open_message.is_certified,
is_expired: open_message.is_expired,
single_signatures,
created_at: open_message.created_at,
expires_at: open_message.expires_at,
};

Ok(open_message)
Expand Down Expand Up @@ -409,7 +450,9 @@ impl SqLiteEntity for OpenMessageWithSingleSignaturesRecord {
"text",
),
("is_certified", "{:open_message:}.is_certified", "bool"),
("is_expired", "{:open_message:}.is_expired", "bool"),
("created_at", "{:open_message:}.created_at", "text"),
("expires_at", "{:open_message:}.expires_at", "text"),
(
"single_signatures",
"case when {:single_signature:}.signer_id is null then json('[]') \
Expand Down Expand Up @@ -523,6 +566,21 @@ impl OpenMessageRepository {
Ok(messages.next())
}

/// Return the expired [OpenMessageRecord] for the given Epoch and [SignedEntityType] if it exists
pub async fn get_expired_open_message(
&self,
signed_entity_type: &SignedEntityType,
) -> StdResult<Option<OpenMessageRecord>> {
let provider = OpenMessageProvider::new(&self.connection);
let now = Utc::now().to_rfc3339();
let filters = provider
.get_expired_entity_type_condition(&now)
.and_where(provider.get_signed_entity_type_condition(signed_entity_type)?);
let mut messages = provider.find(filters)?;

Ok(messages.next())
}

/// Create a new [OpenMessageRecord] in the database.
pub async fn create_open_message(
&self,
Expand Down Expand Up @@ -609,7 +667,9 @@ values (1, '{"k": 100, "m": 5, "phi": 0.65 }'), (2, '{"k": 100, "m": 5, "phi": 0
'{ "message_parts": {
"next_aggregate_verification_key":"7b226d745f636f6d6d69746d656e74223a7b22726f6f74223a5b3131312c3230352c3133392c3131322c32382c392c3233382c3134382c3133342c302c3230372c3233302c3234312c3130352c3135372c3131302c3232362c3131342c32362c35332c3136362c3235342c3230382c3132372c3231362c3230362c3230302c34382c35352c32312c3231372c31335d2c226e725f6c6561766573223a332c22686173686572223a6e756c6c7d2c22746f74616c5f7374616b65223a32383439323639303636317d"
}}',
1
1,
0,
'2021-07-27T01:02:44.505640275+00:00'
);

insert into single_signature values(
Expand Down Expand Up @@ -638,7 +698,7 @@ values (1, '{"k": 100, "m": 5, "phi": 0.65 }'), (2, '{"k": 100, "m": 5, "phi": 0
.get_open_message(&SignedEntityType::MithrilStakeDistribution(Epoch(275)))
.await
.expect("Getting Golden open message should not fail")
.expect("A open message should exist for this signed entity type");
.expect("An open message should exist for this signed entity type");

repository
.get_open_message_with_single_signatures(&SignedEntityType::MithrilStakeDistribution(
Expand All @@ -647,7 +707,7 @@ values (1, '{"k": 100, "m": 5, "phi": 0.65 }'), (2, '{"k": 100, "m": 5, "phi": 0
.await
.expect("Getting Golden open message should not fail")
.expect(
"A open message with single signatures should exist for this signed entity type",
"An open message with single signatures should exist for this signed entity type",
);
}

Expand All @@ -663,8 +723,11 @@ values (1, '{"k": 100, "m": 5, "phi": 0.65 }'), (2, '{"k": 100, "m": 5, "phi": 0
"open_message.open_message_id as open_message_id, \
open_message.epoch_setting_id as epoch_setting_id, open_message.beacon as beacon, \
open_message.signed_entity_type_id as signed_entity_type_id, \
open_message.protocol_message as protocol_message, open_message.is_certified as is_certified, \
open_message.protocol_message as protocol_message, \
open_message.is_certified as is_certified, \
open_message.is_expired as is_expired, \
open_message.created_at as created_at, \
open_message.expires_at as expires_at, \
case when single_signature.signer_id is null then json('[]') \
else json_group_array( \
json_object( \
Expand All @@ -684,7 +747,7 @@ else json_group_array( \
let aliases = SourceAlias::new(&[("{:open_message:}", "open_message")]);

assert_eq!(
"open_message.open_message_id as open_message_id, open_message.epoch_setting_id as epoch_setting_id, open_message.beacon as beacon, open_message.signed_entity_type_id as signed_entity_type_id, open_message.protocol_message as protocol_message, open_message.is_certified as is_certified, open_message.created_at as created_at".to_string(),
"open_message.open_message_id as open_message_id, open_message.epoch_setting_id as epoch_setting_id, open_message.beacon as beacon, open_message.signed_entity_type_id as signed_entity_type_id, open_message.protocol_message as protocol_message, open_message.is_certified as is_certified, open_message.is_expired as is_expired, open_message.created_at as created_at, open_message.expires_at as expires_at".to_string(),
projection.expand(aliases)
)
}
Expand Down Expand Up @@ -745,6 +808,17 @@ else json_group_array( \
);
}

#[test]
fn provider_expired_entity_type_condition() {
let connection = Connection::open_thread_safe(":memory:").unwrap();
let provider = OpenMessageProvider::new(&connection);
let now = Utc::now().to_rfc3339();
let (expr, params) = provider.get_expired_entity_type_condition(&now).expand();

assert_eq!("expires_at < ?1".to_string(), expr);
assert_eq!(vec![Value::String(now)], params);
}

#[test]
fn insert_provider_condition() {
let connection = Connection::open_thread_safe(":memory:").unwrap();
Expand All @@ -763,7 +837,7 @@ else json_group_array( \
.unwrap()
.expand();

assert_eq!("(open_message_id, epoch_setting_id, beacon, signed_entity_type_id, protocol_message, created_at) values (?1, ?2, ?3, ?4, ?5, ?6)".to_string(), expr);
assert_eq!("(open_message_id, epoch_setting_id, beacon, signed_entity_type_id, protocol_message, expires_at, created_at) values (?1, ?2, ?3, ?4, ?5, ?6, ?7)".to_string(), expr);
assert_eq!(Value::Integer(12), params[1]);
assert_eq!(
Value::String(
Expand All @@ -788,15 +862,17 @@ else json_group_array( \
signed_entity_type: SignedEntityType::dummy(),
protocol_message: ProtocolMessage::new(),
is_certified: true,
is_expired: false,
created_at: DateTime::<Utc>::default(),
expires_at: None,
};
let (expr, params) = provider
.get_update_condition(&open_message)
.unwrap()
.expand();

assert_eq!(
"epoch_setting_id = ?1, beacon = ?2, signed_entity_type_id = ?3, protocol_message = ?4, is_certified = ?5 where open_message_id = ?6"
"epoch_setting_id = ?1, beacon = ?2, signed_entity_type_id = ?3, protocol_message = ?4, is_certified = ?5, is_expired = ?6, expires_at = ?7 where open_message_id = ?8"
.to_string(),
expr
);
Expand All @@ -807,6 +883,11 @@ else json_group_array( \
Value::Integer(open_message.signed_entity_type.index() as i64),
Value::String(serde_json::to_string(&open_message.protocol_message).unwrap()),
Value::Integer(open_message.is_certified as i64),
Value::Integer(open_message.is_expired as i64),
open_message
.expires_at
.map(|d| Value::String(d.to_rfc3339()))
.unwrap_or(Value::Null),
Value::String(open_message.open_message_id.to_string()),
],
params
Expand Down
Loading