Skip to content

Commit

Permalink
Merge pull request #72 from ourzora/background-sync
Browse files Browse the repository at this point in the history
Parallel sync
  • Loading branch information
ligustah authored May 3, 2024
2 parents c4fd1f6 + 468b582 commit f4c1e27
Showing 1 changed file with 12 additions and 11 deletions.
23 changes: 12 additions & 11 deletions src/controller.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
use std::ops::Sub;
use std::time::{Duration, SystemTime};
use std::time::Duration;

use chrono::{DateTime, Utc};
use eyre::WrapErr;
use futures_ticker::Ticker;
use futures_util::StreamExt;
use libp2p::PeerId;
use sqlx::SqlitePool;
use tokio::select;
use tokio::sync::{mpsc, oneshot};
use tokio::sync::{mpsc, oneshot, Semaphore};

use crate::chain::inclusion_claim_correct;
use crate::config::{ChainInclusionMode, Config};
Expand Down Expand Up @@ -129,13 +126,13 @@ impl Controller {
self.handle_event(event).await;
}
_ = self.sync_ticker.next() => {
self.do_sync().await;
self.request_sync().await;
}
}
}
}

async fn do_sync(&self) {
async fn request_sync(&self) {
let from = Some(
chrono::Utc::now() - Duration::from_secs(60 * 60 * self.config.sync_lookback_hours),
);
Expand Down Expand Up @@ -176,9 +173,13 @@ impl Controller {
tracing::info!(histogram.sync_request_processed = 1);
}
P2PEvent::SyncResponse { premints } => {
for premint in premints {
let _ = self.validate_and_insert(premint).await;
}
let sem = Semaphore::new(10);
futures_util::future::join_all(premints.into_iter().map(|p| async {
let permit = sem.acquire().await.unwrap();
let _ = self.validate_and_insert(p).await;
drop(permit);
}))
.await;
}
}
}
Expand Down Expand Up @@ -269,7 +270,7 @@ impl Controller {
}
}
ControllerCommands::Sync => {
self.do_sync().await;
self.request_sync().await;
}
}
Ok(())
Expand Down

0 comments on commit f4c1e27

Please sign in to comment.