Skip to content

Commit

Permalink
fix: pool timeout bug (#38)
Browse files Browse the repository at this point in the history
  • Loading branch information
pete-eiger authored Mar 15, 2024
1 parent 4abffbd commit 92e0f0a
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 58 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
run: git submodule update --init --recursive
- uses: actions/setup-go@v4
with:
go-version: '1.19'
go-version: '1.20'
- uses: actions-rs/toolchain@v1
with:
profile: minimal
Expand Down Expand Up @@ -77,7 +77,7 @@ jobs:
run: git submodule update --init --recursive
- uses: actions/setup-go@v4
with:
go-version: '1.19'
go-version: '1.20'
- uses: actions-rs/toolchain@v1
with:
profile: minimal
Expand Down

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ impl Config {
self.filter_protocol,
self.discv5_enrs.clone(),
self.discv5_port,
self.discv5_enrs().clone().unwrap_or(vec![]),
self.discv5_enrs().clone().unwrap_or_default(),
Some(cf_nameserver().to_string()),
)
.await
Expand Down
76 changes: 60 additions & 16 deletions src/db/resolver.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use async_graphql::{OutputType, SimpleObject};
use chrono::Utc;
use serde::{de::DeserializeOwned, Serialize};
use sqlx::{types::Json, FromRow, PgPool, Row as SqliteRow};
use sqlx::{postgres::PgQueryResult, types::Json, FromRow, PgPool, Row as SqliteRow};
use std::ops::Deref;
use tracing::trace;

Expand Down Expand Up @@ -83,6 +83,23 @@ ORDER BY id
Ok(rows)
}

pub async fn count_messages(pool: &PgPool) -> anyhow::Result<i64> {
let result = sqlx::query!(
r#"
SELECT COUNT(*) as "count!: i64"
FROM messages
"#
)
.fetch_one(pool)
.await
.map_err(|e| {
trace!("Database query error: {:#?}", e);
anyhow::Error::new(e)
})?;

Ok(result.count)
}

pub async fn list_rows<T>(pool: &PgPool) -> Result<Vec<GraphQLRow<T>>, anyhow::Error>
where
T: Clone + Serialize + DeserializeOwned + OutputType + std::marker::Unpin,
Expand Down Expand Up @@ -201,24 +218,51 @@ RETURNING id
Ok(deleted_ids.try_into().unwrap())
}

/// Function to delete messages older than `retention` minutes
/// Returns the number of messages deleted
pub async fn prune_old_messages(pool: &PgPool, retention: i32) -> Result<i64, anyhow::Error> {
/// Function to delete messages older than `retention` minutes in batches
/// Returns the total number of messages deleted
/// Arguments:
/// - `pool`: &PgPool - A reference to the PostgreSQL connection pool
/// - `retention`: i32 - The retention time in minutes
/// - `batch_size`: i64 - The number of messages to delete in each batch
pub async fn prune_old_messages(
pool: &PgPool,
retention: i32,
batch_size: i64,
) -> Result<i64, anyhow::Error> {
let cutoff_nonce = Utc::now().timestamp() - (retention as i64 * 60);
let mut total_deleted = 0i64;

loop {
let delete_query = sqlx::query(
r#"
WITH deleted AS (
SELECT id
FROM messages
WHERE (message->>'nonce')::bigint < $1
ORDER BY id ASC
LIMIT $2
FOR UPDATE SKIP LOCKED
)
DELETE FROM messages
WHERE id IN (SELECT id FROM deleted)
RETURNING id
"#,
)
.bind(cutoff_nonce)
.bind(batch_size);

let deleted_count = sqlx::query!(
r#"
DELETE FROM messages
WHERE (message->>'nonce')::bigint < $1
RETURNING id
"#,
cutoff_nonce
)
.fetch_all(pool)
.await?
.len() as i64;
let result: PgQueryResult = delete_query.execute(pool).await?;
let deleted_count = result.rows_affected() as i64;

total_deleted += deleted_count;

// Break the loop if we deleted fewer rows than the batch size, indicating we've processed all eligible messages.
if deleted_count < batch_size {
break;
}
}

Ok(deleted_count)
Ok(total_deleted)
}

pub async fn list_active_indexers(
Expand Down
23 changes: 12 additions & 11 deletions src/operator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ use tracing::{debug, info, trace, warn};

use graphcast_sdk::graphcast_agent::{message_typing::GraphcastMessage, GraphcastAgent};

use crate::db::resolver::{prune_old_messages, retain_max_storage};
use crate::db::resolver::{count_messages, prune_old_messages, retain_max_storage};
use crate::metrics::{CONNECTED_PEERS, GOSSIP_PEERS, PRUNED_MESSAGES, RECEIVED_MESSAGES};
use crate::{
config::Config,
db::resolver::{add_message, list_messages},
db::resolver::add_message,
message_types::{PublicPoiMessage, SimpleMessage, UpgradeIntentMessage},
metrics::{handle_serve_metrics, ACTIVE_PEERS, CACHED_MESSAGES},
server::run_server,
Expand Down Expand Up @@ -109,7 +109,7 @@ impl RadioOperator {
let skip_iteration_clone = skip_iteration.clone();

let mut network_update_interval = interval(Duration::from_secs(600));
let mut comparison_interval = interval(Duration::from_secs(30));
let mut summary_interval = interval(Duration::from_secs(180));

let iteration_timeout = Duration::from_secs(180);
let update_timeout = Duration::from_secs(5);
Expand Down Expand Up @@ -160,7 +160,7 @@ impl RadioOperator {
.set(self.graphcast_agent.number_of_peers().try_into().unwrap());
}
},
_ = comparison_interval.tick() => {
_ = summary_interval.tick() => {
trace!("Local summary update");
if skip_iteration.load(Ordering::SeqCst) {
skip_iteration.store(false, Ordering::SeqCst);
Expand All @@ -185,10 +185,12 @@ impl RadioOperator {
};
}

let batch_size = 1000;

// Always prune old messages based on RETENTION
match timeout(
update_timeout,
prune_old_messages(&self.db, self.config.retention)
prune_old_messages(&self.db, self.config.retention, batch_size)
).await {
Err(e) => debug!(err = tracing::field::debug(e), "Pruning by retention timed out"),
Ok(Ok(num_pruned)) => {
Expand All @@ -199,14 +201,13 @@ impl RadioOperator {
};

// List the remaining messages
let result = timeout(update_timeout, list_messages::<GraphcastMessage<PublicPoiMessage>>(&self.db)).await;
let result = timeout(update_timeout, count_messages(&self.db)).await.expect("could not count messages");

match result {
Err(e) => warn!(err = tracing::field::debug(e), "Public PoI messages summary timed out"),
Ok(msgs) => {
let msg_num = msgs.map_or(0, |m| m.len());
CACHED_MESSAGES.set(msg_num as i64);
info!(total_messages = msg_num,
Err(e) => warn!(err = tracing::field::debug(e), "Database query for message count timed out"),
Ok(count) => {
CACHED_MESSAGES.set(count);
info!(total_messages = count,
total_num_pruned,
"Monitoring summary"
)
Expand Down
12 changes: 6 additions & 6 deletions src/server/model/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use async_graphql::{Context, EmptySubscription, Object, OutputType, Schema, SimpleObject};

use chrono::{Duration, Utc};
use chrono::Utc;
use serde::{de::DeserializeOwned, Serialize};
use sqlx::{Pool, Postgres};
use std::sync::Arc;
use std::{sync::Arc, time::Duration};
use thiserror::Error;

use crate::{
Expand Down Expand Up @@ -61,13 +61,13 @@ impl QueryRoot {
&self,
ctx: &Context<'_>,
indexers: Option<Vec<String>>,
minutes_ago: Option<i64>,
minutes_ago: Option<u64>,
) -> Result<Vec<String>, HttpServiceError> {
let pool = ctx.data_unchecked::<Pool<Postgres>>();
// Use a default time window if not specified
// Default to 1440 minutes (24 hours) if not provided
let minutes_ago = minutes_ago.unwrap_or(1440);
let from_timestamp = (Utc::now() - Duration::minutes(minutes_ago)).timestamp();
let from_timestamp = (Utc::now() - Duration::from_secs(minutes_ago * 60)).timestamp();

let active_indexers = list_active_indexers(pool, indexers, from_timestamp).await?;
Ok(active_indexers)
Expand All @@ -77,11 +77,11 @@ impl QueryRoot {
&self,
ctx: &Context<'_>,
indexers: Option<Vec<String>>,
minutes_ago: Option<i64>,
minutes_ago: Option<u64>,
) -> Result<Vec<IndexerStats>, HttpServiceError> {
let pool = ctx.data_unchecked::<Pool<Postgres>>();
let minutes_ago = minutes_ago.unwrap_or(1440);
let from_timestamp = (Utc::now() - Duration::minutes(minutes_ago)).timestamp();
let from_timestamp = (Utc::now() - Duration::from_secs(minutes_ago * 60)).timestamp();

let stats = get_indexer_stats(pool, indexers, from_timestamp).await?;
Ok(stats)
Expand Down

0 comments on commit 92e0f0a

Please sign in to comment.