diff --git a/crates/services/src/async_processor.rs b/crates/services/src/async_processor.rs index db929176483..01446a7136b 100644 --- a/crates/services/src/async_processor.rs +++ b/crates/services/src/async_processor.rs @@ -126,32 +126,34 @@ mod tests { }; use tokio::time::Instant; - #[test] - fn one_spawn_single_tasks_works() { + #[tokio::test] + async fn one_spawn_single_tasks_works() { // Given - let number_of_pending_tasks = 1; + const NUMBER_OF_PENDING_TASKS: usize = 1; let heavy_task_processor = - AsyncProcessor::new("Test", 1, number_of_pending_tasks).unwrap(); + AsyncProcessor::new("Test", 1, NUMBER_OF_PENDING_TASKS).unwrap(); // When - let (sender, mut receiver) = tokio::sync::oneshot::channel(); + let (sender, receiver) = tokio::sync::oneshot::channel(); let result = heavy_task_processor.try_spawn(async move { sender.send(()).unwrap(); }); // Then result.expect("Expected Ok result"); - sleep(Duration::from_secs(1)); - receiver.try_recv().unwrap(); + tokio::time::timeout(Duration::from_secs(5), receiver) + .await + .unwrap() + .unwrap(); } #[tokio::test] async fn one_spawn_single_tasks_works__thread_id_is_different_than_main() { // Given - let number_of_threads = 10; - let number_of_pending_tasks = 10000; + const MAX_NUMBER_OF_THREADS: usize = 10; + const NUMBER_OF_PENDING_TASKS: usize = 10000; let heavy_task_processor = - AsyncProcessor::new("Test", number_of_threads, number_of_pending_tasks) + AsyncProcessor::new("Test", MAX_NUMBER_OF_THREADS, NUMBER_OF_PENDING_TASKS) .unwrap(); let main_handler = tokio::spawn(async move { std::thread::current().id() }); let main_id = main_handler.await.unwrap(); @@ -159,13 +161,10 @@ mod tests { // When let futures = iter::repeat_with(|| { heavy_task_processor - .try_spawn(async move { - tokio::time::sleep(Duration::from_secs(1)).await; - std::thread::current().id() - }) + .try_spawn(async move { std::thread::current().id() }) .unwrap() }) - .take(number_of_pending_tasks) + .take(NUMBER_OF_PENDING_TASKS) .collect::>(); // Then @@ -175,16 +174,20 @@ mod tests { .map(|r| r.unwrap()) .collect::>(); + // Main thread was not used. assert!(!unique_thread_ids.contains(&main_id)); - assert_eq!(unique_thread_ids.len(), number_of_threads); + // There's been at least one worker thread used. + assert!(!unique_thread_ids.is_empty()); + // There were no more worker threads above the threshold. + assert!(unique_thread_ids.len() <= MAX_NUMBER_OF_THREADS); } #[test] fn second_spawn_fails_when_limit_is_one_and_first_in_progress() { // Given - let number_of_pending_tasks = 1; + const NUMBER_OF_PENDING_TASKS: usize = 1; let heavy_task_processor = - AsyncProcessor::new("Test", 1, number_of_pending_tasks).unwrap(); + AsyncProcessor::new("Test", 1, NUMBER_OF_PENDING_TASKS).unwrap(); let first_spawn_result = heavy_task_processor.try_spawn(async move { sleep(Duration::from_secs(1)); }); @@ -196,15 +199,15 @@ mod tests { }); // Then - let err = second_spawn_result.expect_err("Expected Ok result"); + let err = second_spawn_result.expect_err("Should error"); assert_eq!(err, OutOfCapacity); } #[test] fn second_spawn_works_when_first_is_finished() { - let number_of_pending_tasks = 1; + const NUMBER_OF_PENDING_TASKS: usize = 1; let heavy_task_processor = - AsyncProcessor::new("Test", 1, number_of_pending_tasks).unwrap(); + AsyncProcessor::new("Test", 1, NUMBER_OF_PENDING_TASKS).unwrap(); // Given let (sender, receiver) = tokio::sync::oneshot::channel(); @@ -229,11 +232,11 @@ mod tests { #[test] fn can_spawn_10_tasks_when_limit_is_10() { // Given - let number_of_pending_tasks = 10; + const NUMBER_OF_PENDING_TASKS: usize = 10; let heavy_task_processor = - AsyncProcessor::new("Test", 1, number_of_pending_tasks).unwrap(); + AsyncProcessor::new("Test", 1, NUMBER_OF_PENDING_TASKS).unwrap(); - for _ in 0..number_of_pending_tasks { + for _ in 0..NUMBER_OF_PENDING_TASKS { // When let result = heavy_task_processor.try_spawn(async move { tokio::time::sleep(Duration::from_secs(1)).await; @@ -245,19 +248,19 @@ mod tests { } #[tokio::test] - async fn executes_10_tasks_for_10_seconds_with_one_thread() { + async fn executes_5_tasks_for_5_seconds_with_one_thread() { // Given - let number_of_pending_tasks = 10; - let number_of_threads = 1; + const NUMBER_OF_PENDING_TASKS: usize = 5; + const NUMBER_OF_THREADS: usize = 1; let heavy_task_processor = - AsyncProcessor::new("Test", number_of_threads, number_of_pending_tasks) + AsyncProcessor::new("Test", NUMBER_OF_THREADS, NUMBER_OF_PENDING_TASKS) .unwrap(); // When let (broadcast_sender, mut broadcast_receiver) = tokio::sync::broadcast::channel(1024); let instant = Instant::now(); - for _ in 0..number_of_pending_tasks { + for _ in 0..NUMBER_OF_PENDING_TASKS { let broadcast_sender = broadcast_sender.clone(); let result = heavy_task_processor.try_spawn(async move { sleep(Duration::from_secs(1)); @@ -269,29 +272,36 @@ mod tests { // Then while broadcast_receiver.recv().await.is_ok() {} - assert!(instant.elapsed() >= Duration::from_secs(10)); + // 5 tasks running on 1 thread, each task taking 1 second, + // should complete in approximately 5 seconds overall. + // Allowing some LEEWAY to account for runtime overhead. + const LEEWAY: Duration = Duration::from_millis(300); + assert!(instant.elapsed() < Duration::from_secs(5) + LEEWAY); + // Make sure that the tasks were not executed in parallel. + assert!(instant.elapsed() >= Duration::from_secs(5)); // Wait for the metrics to be updated. tokio::time::sleep(Duration::from_secs(1)).await; let duration = Duration::from_nanos(heavy_task_processor.metric.busy.get()); - assert_eq!(duration.as_secs(), 10); + assert_eq!(duration.as_secs(), 5); let duration = Duration::from_nanos(heavy_task_processor.metric.idle.get()); assert_eq!(duration.as_secs(), 0); } #[tokio::test] - async fn executes_10_tasks_for_2_seconds_with_10_thread() { + async fn executes_10_blocking_tasks_for_1_second_with_10_threads__records_busy_time() + { // Given - let number_of_pending_tasks = 10; - let number_of_threads = 10; + const NUMBER_OF_PENDING_TASKS: usize = 10; + const NUMBER_OF_THREADS: usize = 10; let heavy_task_processor = - AsyncProcessor::new("Test", number_of_threads, number_of_pending_tasks) + AsyncProcessor::new("Test", NUMBER_OF_THREADS, NUMBER_OF_PENDING_TASKS) .unwrap(); // When let (broadcast_sender, mut broadcast_receiver) = tokio::sync::broadcast::channel(1024); let instant = Instant::now(); - for _ in 0..number_of_pending_tasks { + for _ in 0..NUMBER_OF_PENDING_TASKS { let broadcast_sender = broadcast_sender.clone(); let result = heavy_task_processor.try_spawn(async move { sleep(Duration::from_secs(1)); @@ -303,7 +313,11 @@ mod tests { // Then while broadcast_receiver.recv().await.is_ok() {} - assert!(instant.elapsed() <= Duration::from_secs(2)); + // 10 blocking tasks running on 10 threads, each task taking 1 second, + // should complete in approximately 1 second overall. + // Allowing some LEEWAY to account for runtime overhead. + const LEEWAY: Duration = Duration::from_millis(300); + assert!(instant.elapsed() <= Duration::from_secs(1) + LEEWAY); // Wait for the metrics to be updated. tokio::time::sleep(Duration::from_secs(1)).await; let duration = Duration::from_nanos(heavy_task_processor.metric.busy.get()); @@ -313,19 +327,20 @@ mod tests { } #[tokio::test] - async fn executes_10_tasks_for_2_seconds_with_1_thread() { + async fn executes_10_non_blocking_tasks_for_1_second_with_10_threads__records_idle_time( + ) { // Given - let number_of_pending_tasks = 10; - let number_of_threads = 10; + const NUMBER_OF_PENDING_TASKS: usize = 10; + const NUMBER_OF_THREADS: usize = 10; let heavy_task_processor = - AsyncProcessor::new("Test", number_of_threads, number_of_pending_tasks) + AsyncProcessor::new("Test", NUMBER_OF_THREADS, NUMBER_OF_PENDING_TASKS) .unwrap(); // When let (broadcast_sender, mut broadcast_receiver) = tokio::sync::broadcast::channel(1024); let instant = Instant::now(); - for _ in 0..number_of_pending_tasks { + for _ in 0..NUMBER_OF_PENDING_TASKS { let broadcast_sender = broadcast_sender.clone(); let result = heavy_task_processor.try_spawn(async move { tokio::time::sleep(Duration::from_secs(1)).await; @@ -337,7 +352,11 @@ mod tests { // Then while broadcast_receiver.recv().await.is_ok() {} - assert!(instant.elapsed() <= Duration::from_secs(2)); + // 10 non-blocking tasks running on 10 threads, each task taking 1 second, + // should complete in approximately 1 second overall. + // Allowing some LEEWAY to account for runtime overhead. + const LEEWAY: Duration = Duration::from_millis(300); + assert!(instant.elapsed() <= Duration::from_secs(1) + LEEWAY); // Wait for the metrics to be updated. tokio::time::sleep(Duration::from_secs(1)).await; let duration = Duration::from_nanos(heavy_task_processor.metric.busy.get());