Skip to content

Commit

Permalink
feature: periodically remove stale gateway messages (nymtech#5312)
Browse files Browse the repository at this point in the history
* add timestamp to stored client messages

* removed dead code

* starting node task to remove old messages

* added log for number of removed messages

* debug log on task finishing
  • Loading branch information
jstuczyn authored Jan 9, 2025
1 parent 3d2914b commit 226c040
Show file tree
Hide file tree
Showing 19 changed files with 199 additions and 477 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion common/gateway-storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ license.workspace = true
[dependencies]
bincode = { workspace = true }
defguard_wireguard_rs = { workspace = true }
log = { workspace = true }
sqlx = { workspace = true, features = [
"runtime-tokio-rustls",
"sqlite",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/*
* Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
* SPDX-License-Identifier: GPL-3.0-only
*/

ALTER TABLE message_store
ADD COLUMN timestamp TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP;
67 changes: 28 additions & 39 deletions common/gateway-storage/src/inboxes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
// SPDX-License-Identifier: GPL-3.0-only

use crate::models::StoredMessage;
use time::OffsetDateTime;
use tracing::debug;

#[derive(Clone)]
pub(crate) struct InboxManager {
pub struct InboxManager {
connection_pool: sqlx::SqlitePool,
/// Maximum number of messages that can be obtained from the database per operation.
/// It is used to prevent out of memory errors in the case of client receiving a lot of data while
Expand Down Expand Up @@ -71,44 +73,22 @@ impl InboxManager {
// get 1 additional message to check whether there will be more to grab
// next time
let limit = self.retrieval_limit + 1;
let mut res = if let Some(start_after) = start_after {
sqlx::query_as!(
StoredMessage,
r#"
SELECT
id as "id!",
client_address_bs58 as "client_address_bs58!",
content as "content!"
FROM message_store
WHERE client_address_bs58 = ? AND id > ?
ORDER BY id ASC
LIMIT ?;
"#,
client_address_bs58,
start_after,
limit
)
.fetch_all(&self.connection_pool)
.await?
} else {
sqlx::query_as!(
StoredMessage,
r#"
SELECT
id as "id!",
client_address_bs58 as "client_address_bs58!",
content as "content!"
FROM message_store
WHERE client_address_bs58 = ?
ORDER BY id ASC
LIMIT ?;
"#,
client_address_bs58,
limit
)
.fetch_all(&self.connection_pool)
.await?
};
let start_after = start_after.unwrap_or(-1);

let mut res: Vec<StoredMessage> = sqlx::query_as(
r#"
SELECT id, client_address_bs58, content, timestamp
FROM message_store
WHERE client_address_bs58 = ? AND id > ?
ORDER BY id ASC
LIMIT ?;
"#,
)
.bind(client_address_bs58)
.bind(start_after)
.bind(limit)
.fetch_all(&self.connection_pool)
.await?;

if res.len() > self.retrieval_limit as usize {
res.truncate(self.retrieval_limit as usize);
Expand Down Expand Up @@ -146,4 +126,13 @@ impl InboxManager {
.await?;
Ok(())
}

pub async fn remove_stale(&self, cutoff: OffsetDateTime) -> Result<(), sqlx::Error> {
let affected = sqlx::query!("DELETE FROM message_store WHERE timestamp < ?", cutoff)
.execute(&self.connection_pool)
.await?
.rows_affected();
debug!("Removed {affected} stale messages");
Ok(())
}
}
4 changes: 2 additions & 2 deletions common/gateway-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

use bandwidth::BandwidthManager;
use clients::{ClientManager, ClientType};
use inboxes::InboxManager;
use models::{
Client, PersistedBandwidth, PersistedSharedKeys, RedemptionProposal, StoredMessage,
VerifiedTicket, WireguardPeer,
Expand Down Expand Up @@ -31,6 +30,7 @@ mod tickets;
mod wireguard_peers;

pub use error::GatewayStorageError;
pub use inboxes::InboxManager;

// note that clone here is fine as upon cloning the same underlying pool will be used
#[derive(Clone)]
Expand All @@ -53,7 +53,7 @@ impl GatewayStorage {
&self.shared_key_manager
}

pub(crate) fn inbox_manager(&self) -> &InboxManager {
pub fn inbox_manager(&self) -> &InboxManager {
&self.inbox_manager
}

Expand Down
2 changes: 2 additions & 0 deletions common/gateway-storage/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,13 @@ impl TryFrom<PersistedSharedKeys> for SharedGatewayKey {
}
}

#[derive(FromRow)]
pub struct StoredMessage {
pub id: i64,
#[allow(dead_code)]
pub client_address_bs58: String,
pub content: Vec<u8>,
pub timestamp: OffsetDateTime,
}

#[derive(Debug, Clone, FromRow)]
Expand Down
63 changes: 6 additions & 57 deletions gateway/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only

use nym_network_defaults::TICKETBOOK_VALIDITY_DAYS;
use std::net::SocketAddr;
use std::time::Duration;
use url::Url;

// TODO: can we move those away?
pub const DEFAULT_CLIENT_BANDWIDTH_MAX_FLUSHING_RATE: Duration = Duration::from_millis(5);
pub const DEFAULT_CLIENT_BANDWIDTH_MAX_DELTA_FLUSHING_AMOUNT: i64 = 512 * 1024; // 512kB

#[derive(Debug)]
pub struct Config {
pub gateway: Gateway,
Expand Down Expand Up @@ -96,18 +91,13 @@ pub struct Debug {
/// Defines a maximum change in client bandwidth before it gets flushed to the persistent storage.
pub client_bandwidth_max_delta_flushing_amount: i64,

pub zk_nym_tickets: ZkNymTicketHandlerDebug,
}
/// Specifies how often the clean-up task should check for stale data.
pub stale_messages_cleaner_run_interval: Duration,

impl Default for Debug {
fn default() -> Self {
Debug {
client_bandwidth_max_flushing_rate: DEFAULT_CLIENT_BANDWIDTH_MAX_FLUSHING_RATE,
client_bandwidth_max_delta_flushing_amount:
DEFAULT_CLIENT_BANDWIDTH_MAX_DELTA_FLUSHING_AMOUNT,
zk_nym_tickets: Default::default(),
}
}
/// Specifies maximum age of stored messages before they are removed from the storage
pub stale_messages_max_age: Duration,

pub zk_nym_tickets: ZkNymTicketHandlerDebug,
}

#[derive(Debug, Clone)]
Expand All @@ -131,44 +121,3 @@ pub struct ZkNymTicketHandlerDebug {
/// That's required as nym-apis will purge all ticket information for tickets older than maximum validity.
pub maximum_time_between_redemption: Duration,
}

impl ZkNymTicketHandlerDebug {
pub const DEFAULT_REVOCATION_BANDWIDTH_PENALTY: f32 = 10.0;
pub const DEFAULT_PENDING_POLLER: Duration = Duration::from_secs(300);
pub const DEFAULT_MINIMUM_API_QUORUM: f32 = 0.8;
pub const DEFAULT_MINIMUM_REDEMPTION_TICKETS: usize = 100;

// use min(4/5 of max validity, validity - 1), but making sure it's no greater than 1 day
// ASSUMPTION: our validity period is AT LEAST 2 days
//
// this could have been a constant, but it's more readable as a function
pub const fn default_maximum_time_between_redemption() -> Duration {
let desired_secs = TICKETBOOK_VALIDITY_DAYS * (86400 * 4) / 5;
let desired_secs_alt = (TICKETBOOK_VALIDITY_DAYS - 1) * 86400;

// can't use `min` in const context
let target_secs = if desired_secs < desired_secs_alt {
desired_secs
} else {
desired_secs_alt
};

assert!(
target_secs > 86400,
"the maximum time between redemption can't be lower than 1 day!"
);
Duration::from_secs(target_secs as u64)
}
}

impl Default for ZkNymTicketHandlerDebug {
fn default() -> Self {
ZkNymTicketHandlerDebug {
revocation_bandwidth_penalty: Self::DEFAULT_REVOCATION_BANDWIDTH_PENALTY,
pending_poller: Self::DEFAULT_PENDING_POLLER,
minimum_api_quorum: Self::DEFAULT_MINIMUM_API_QUORUM,
minimum_redemption_tickets: Self::DEFAULT_MINIMUM_REDEMPTION_TICKETS,
maximum_time_between_redemption: Self::default_maximum_time_between_redemption(),
}
}
}
7 changes: 0 additions & 7 deletions gateway/src/node/mixnet_handling/mod.rs

This file was deleted.

Loading

0 comments on commit 226c040

Please sign in to comment.