From f049f809faf1950fe8b50f0ebe3626c815ad3730 Mon Sep 17 00:00:00 2001 From: Michael van Niekerk Date: Fri, 9 Aug 2024 22:05:43 +0200 Subject: [PATCH] Channels with lists --- examples/simple_job.rs | 2 +- src/context.rs | 36 ++++++++++++++++++++++++++++++++++++ src/job/runner.rs | 6 +++++- src/job_scheduler.rs | 20 ++++++++++++++++++-- src/notification/runner.rs | 6 +++++- 5 files changed, 65 insertions(+), 5 deletions(-) diff --git a/examples/simple_job.rs b/examples/simple_job.rs index 6310722..17229cd 100644 --- a/examples/simple_job.rs +++ b/examples/simple_job.rs @@ -11,7 +11,7 @@ async fn main() { .with_max_level(Level::TRACE) .finish(); tracing::subscriber::set_global_default(subscriber).expect("Setting default subscriber failed"); - let sched = JobScheduler::new().await; + let sched = JobScheduler::new_with_channel_size(1000).await; let mut sched = sched.unwrap(); let jobs = run_example(&mut sched) .await diff --git a/src/context.rs b/src/context.rs index fe66b45..9a0b6a9 100644 --- a/src/context.rs +++ b/src/context.rs @@ -67,6 +67,42 @@ impl Context { notification_code, } } + + pub fn new_with_channel_size( + metadata_storage: Arc>>, + notification_storage: Arc>>, + job_code: Arc>>, + notification_code: Arc>>, + channel_size: usize, + ) -> Self { + let (job_activation_tx, _job_activation_rx) = tokio::sync::broadcast::channel(channel_size); + let (notify_tx, _notify_rx) = tokio::sync::broadcast::channel(channel_size); + let (job_create_tx, _job_create_rx) = tokio::sync::broadcast::channel(channel_size); + let (job_created_tx, _job_created_rx) = tokio::sync::broadcast::channel(channel_size); + let (job_delete_tx, _job_delete_rx) = tokio::sync::broadcast::channel(channel_size); + let (job_deleted_tx, _job_deleted_rx) = tokio::sync::broadcast::channel(channel_size); + let (notify_create_tx, _notify_create_rx) = tokio::sync::broadcast::channel(channel_size); + let (notify_created_tx, _notify_created_rx) = tokio::sync::broadcast::channel(channel_size); + let (notify_delete_tx, _notify_delete_rx) = tokio::sync::broadcast::channel(channel_size); + let (notify_deleted_tx, _notify_deleted_rx) = tokio::sync::broadcast::channel(channel_size); + + Self { + job_activation_tx, + notify_tx, + job_create_tx, + job_created_tx, + job_delete_tx, + job_deleted_tx, + notify_create_tx, + notify_created_tx, + notify_delete_tx, + notify_deleted_tx, + metadata_storage, + notification_storage, + job_code, + notification_code, + } + } } impl Clone for Context { diff --git a/src/job/runner.rs b/src/job/runner.rs index 8e6bc8f..2bef567 100644 --- a/src/job/runner.rs +++ b/src/job/runner.rs @@ -9,6 +9,7 @@ use crate::JobSchedulerError; use std::future::Future; use std::pin::Pin; use std::sync::Arc; +use tokio::sync::broadcast::error::RecvError; use tokio::sync::broadcast::{Receiver, Sender}; use tokio::sync::RwLock; use tracing::error; @@ -28,7 +29,10 @@ impl JobRunner { let val = rx.recv().await; if let Err(e) = val { error!("Error receiving {:?}", e); - break; + if matches!(e, RecvError::Closed) { + break; + } + continue; } let uuid = val.unwrap(); { diff --git a/src/job_scheduler.rs b/src/job_scheduler.rs index b8238c0..c252872 100644 --- a/src/job_scheduler.rs +++ b/src/job_scheduler.rs @@ -59,6 +59,7 @@ impl JobsSchedulerLocked { notification_storage: Arc>>, job_code: Arc>>, notify_code: Arc>>, + channel_size: usize, ) -> Result, JobSchedulerError> { { let mut metadata_storage = metadata_storage.write().await; @@ -68,11 +69,12 @@ impl JobsSchedulerLocked { let mut notification_storage = notification_storage.write().await; notification_storage.init().await?; } - let context = Context::new( + let context = Context::new_with_channel_size( metadata_storage, notification_storage, job_code.clone(), notify_code.clone(), + channel_size, ); { let mut job_code = job_code.write().await; @@ -158,8 +160,19 @@ impl JobsSchedulerLocked { /// /// Create a new `MetaDataStorage` and `NotificationStore` using the `SimpleMetadataStore`, `SimpleNotificationStore`, - /// `SimpleJobCode` and `SimpleNotificationCode` implementation + /// `SimpleJobCode` and `SimpleNotificationCode` implementation with channel size of 200 pub async fn new() -> Result { + Self::new_with_channel_size(200).await + } + + /// + /// Create a new `MetaDataStorage` and `NotificationStore` using the `SimpleMetadataStore`, `SimpleNotificationStore`, + /// `SimpleJobCode` and `SimpleNotificationCode` implementation + /// + /// The channel_size parameter is used to set the size of the channels used to communicate between the actors. + /// The amount in short affects how many messages can be buffered before the sender is blocked. + /// When the sender is blocked, the processing is lagged. + pub async fn new_with_channel_size(channel_size: usize) -> Result { let metadata_storage = SimpleMetadataStore::default(); let metadata_storage: Arc>> = Arc::new(RwLock::new(Box::new(metadata_storage))); @@ -181,6 +194,7 @@ impl JobsSchedulerLocked { notification_storage, job_code, notify_code, + channel_size, ) .await .map_err(|_| JobSchedulerError::CantInit)?; @@ -209,6 +223,7 @@ impl JobsSchedulerLocked { notification_storage: Box, job_code: Box, notification_code: Box, + channel_size: usize, ) -> Result { let metadata_storage = Arc::new(RwLock::new(metadata_storage)); let notification_storage = Arc::new(RwLock::new(notification_storage)); @@ -220,6 +235,7 @@ impl JobsSchedulerLocked { notification_storage, job_code, notification_code, + channel_size, ) .await?; diff --git a/src/notification/runner.rs b/src/notification/runner.rs index d45540b..754179d 100644 --- a/src/notification/runner.rs +++ b/src/notification/runner.rs @@ -9,6 +9,7 @@ use crate::JobSchedulerError; use std::future::Future; use std::pin::Pin; use std::sync::Arc; +use tokio::sync::broadcast::error::RecvError; use tokio::sync::broadcast::Receiver; use tokio::sync::RwLock; use tracing::error; @@ -27,7 +28,10 @@ impl NotificationRunner { let val = rx.recv().await; if let Err(e) = val { error!("Error receiving value {:?}", e); - break; + if matches!(e, RecvError::Closed) { + break; + } + continue; } let (job_id, state) = val.unwrap(); let mut storage = storage.write().await;