From b93daf8e701df55de291a5764a25aeb42ca3efc6 Mon Sep 17 00:00:00 2001 From: Klimenty Tsoutsman Date: Sat, 16 Sep 2023 23:09:44 +1000 Subject: [PATCH 1/3] Update `test_scheduler` The new test is significantly more robust than the old one. As of right now, the test isn't particularly useful because we don't have task migration, but #1042 adds implicit task migration when unblocking a task. Hence, the test has a focus on blocking and unblocking tasks. Signed-off-by: Klimenty Tsoutsman --- Cargo.lock | 5 +- applications/test_scheduler/Cargo.toml | 26 ++-- applications/test_scheduler/src/lib.rs | 176 +++++++++++++++++-------- kernel/cpu/src/aarch64.rs | 5 + kernel/cpu/src/x86_64.rs | 5 + kernel/task_struct/src/lib.rs | 24 ++++ 6 files changed, 172 insertions(+), 69 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 182e3e0153..7e8bf30935 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3887,10 +3887,13 @@ dependencies = [ name = "test_scheduler" version = "0.1.0" dependencies = [ + "app_io", "cpu", "log", - "scheduler", + "rand", + "random", "spawn", + "sync_block", "task", ] diff --git a/applications/test_scheduler/Cargo.toml b/applications/test_scheduler/Cargo.toml index 35b8a30f17..2dd7040cea 100644 --- a/applications/test_scheduler/Cargo.toml +++ b/applications/test_scheduler/Cargo.toml @@ -2,20 +2,18 @@ name = "test_scheduler" version = "0.1.0" authors = ["Namitha Liyanage "] +edition = "2021" [dependencies] +app_io = { path = "../../kernel/app_io" } +cpu = { path = "../../kernel/cpu" } +log = "0.4.8" +random = { path = "../../kernel/random" } +spawn = { path = "../../kernel/spawn" } +sync_block = { path = "../../kernel/sync_block" } +task = { path = "../../kernel/task" } -[dependencies.log] -version = "0.4.8" - -[dependencies.spawn] -path = "../../kernel/spawn" - -[dependencies.scheduler] -path = "../../kernel/scheduler" - -[dependencies.task] -path = "../../kernel/task" - -[dependencies.cpu] -path = "../../kernel/cpu" +[dependencies.rand] +version = "0.8.5" +default-features = false +features = ["small_rng"] diff --git a/applications/test_scheduler/src/lib.rs b/applications/test_scheduler/src/lib.rs index 8bc0731d06..aefa3f0a66 100644 --- a/applications/test_scheduler/src/lib.rs +++ b/applications/test_scheduler/src/lib.rs @@ -1,78 +1,146 @@ #![no_std] -#[macro_use] extern crate log; extern crate alloc; -extern crate spawn; -extern crate scheduler; -extern crate task; -extern crate cpu; -use core::convert::TryFrom; - -use alloc::string::String; -use alloc::vec::Vec; -use cpu::CpuId; +use alloc::{format, string::String, vec::Vec}; +use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use app_io::println; +use cpu::{cpus, CpuId}; +use rand::seq::SliceRandom; +use sync_block::RwLock; +use task::TaskRef; pub fn main(_args: Vec) -> isize { - let cpu_1 = CpuId::try_from(1).expect("CPU ID 1 did not exist"); - - let taskref1 = spawn::new_task_builder(worker, ()) - .name(String::from("test1")) - .pin_on_cpu(cpu_1) - .spawn().expect("failed to initiate task"); + println!("testing pinned"); + test_pinned(); + println!("testing unpinned"); + test_unpinned(); + 0 +} - if let Err(e) = scheduler::set_priority(&taskref1, 30) { - error!("scheduler_eval(): Could not set priority to taskref1: {}", e); +// Spawn a bunch of pinned tasks, and then each pinned task randomly blocks and +// unblocks other tasks than are pinned to the same CPU. +// +// The tasks must be pinned to the same CPU to avoid a deadlock where two tasks +// on different CPUs block each other at the same time and then yield. +pub fn test_pinned() { + static TASKS: RwLock)>> = RwLock::new(Vec::new()); + static READY: AtomicBool = AtomicBool::new(false); + + let tasks = cpus() + .map(|cpu| { + ( + cpu.clone(), + (0..100) + .map(move |id| { + spawn::new_task_builder(pinned_worker, cpu) + .name(format!("test-scheduler-pinned-{cpu}-{id}")) + .pin_on_cpu(cpu) + .block() + .spawn() + .expect("failed to spawn task") + }) + .collect::>(), + ) + }) + .collect::>(); + + *TASKS.write() = tasks + .iter() + .map(|(cpu, task_iter)| (*cpu, task_iter.iter().map(|task| (*task).clone()).collect())) + .collect(); + + for (_, task_list) in tasks.iter() { + for task in task_list { + task.unblock().unwrap(); + } } - debug!("Spawned Task 1"); - - let taskref2 = spawn::new_task_builder(worker, ()) - .name(String::from("test2")) - .pin_on_cpu(cpu_1) - .spawn().expect("failed to initiate task"); + READY.store(true, Ordering::Release); - if let Err(e) = scheduler::set_priority(&taskref2, 20) { - error!("scheduler_eval(): Could not set priority to taskref2: {}", e); + for (_, task_list) in tasks { + for task in task_list { + task.join().unwrap(); + } } - debug!("Spawned Task 2"); - - let taskref3 = spawn::new_task_builder(worker, ()) - .name(String::from("test3")) - .pin_on_cpu(cpu_1) - .spawn().expect("failed to initiate task"); - - if let Err(e) = scheduler::set_priority(&taskref3, 10) { - error!("scheduler_eval(): Could not set priority to taskref3: {}", e); + // We have to drop the tasks so that the `test-scheduler` crate can be dropped. + *TASKS.write() = Vec::new(); + + fn pinned_worker(pinned_cpu: CpuId) { + let mut rng = random::init_rng::().unwrap(); + while !READY.load(Ordering::Acquire) {} + + let locked = TASKS.read(); + let tasks = &locked.iter().find(|(cpu, _)| *cpu == pinned_cpu).unwrap().1; + for _ in 0..100 { + assert_eq!( + cpu::current_cpu(), + pinned_cpu, + "pinned worker migrated cores" + ); + + let random_task = tasks.choose(&mut rng).unwrap(); + + let chose_self = + task::with_current_task(|current_task| random_task == current_task).unwrap(); + if chose_self { + continue; + } + + let _ = random_task.block_no_log(); + task::schedule(); + let _ = random_task.unblock_no_log(); + } } +} - debug!("Spawned Task 3"); +/// Spawn a bunch of unpinned tasks, and then block and unblock random tasks +/// from the main thread. +pub fn test_unpinned() { + const NUM_TASKS: usize = 500; + + static READY: AtomicBool = AtomicBool::new(false); + static NUM_RUNNING: AtomicUsize = AtomicUsize::new(NUM_TASKS); + + let tasks = (0..NUM_TASKS) + .map(move |id| { + spawn::new_task_builder(unpinned_worker, ()) + .name(format!("test-scheduler-unpinned-{id}")) + .block() + .spawn() + .expect("failed to spawn task") + }) + .collect::>(); + + for task in tasks.iter() { + task.unblock().unwrap(); + } - debug!("Spawned all tasks"); + READY.store(true, Ordering::Release); - let _priority1 = scheduler::get_priority(&taskref1); - let _priority2 = scheduler::get_priority(&taskref2); - let _priority3 = scheduler::get_priority(&taskref3); + // Cause some mayhem. + let mut rng = random::init_rng::().unwrap(); + while NUM_RUNNING.load(Ordering::Relaxed) != 0 { + let random_task = tasks.choose(&mut rng).unwrap(); + let _ = random_task.block_no_log(); + // Let the worker tasks on this core run. + task::schedule(); + let _ = random_task.unblock_no_log(); + } - #[cfg(epoch_scheduler)] - { - assert_eq!(_priority1,Some(30)); - assert_eq!(_priority2,Some(20)); - assert_eq!(_priority3,Some(10)); + for task in tasks { + task.join().unwrap(); } - taskref1.join().expect("Task 1 join failed"); - taskref2.join().expect("Task 2 join failed"); - taskref3.join().expect("Task 3 join failed"); + fn unpinned_worker(_: ()) { + while !READY.load(Ordering::Acquire) {} - 0 -} + for _ in 0..1000 { + task::schedule(); + } -fn worker(_: ()) { - for i in 0..1000 { - debug!("Task_ID : {} , Instance : {}", task::get_my_current_task_id(), i); - scheduler::schedule(); + NUM_RUNNING.fetch_sub(1, Ordering::Relaxed); } } diff --git a/kernel/cpu/src/aarch64.rs b/kernel/cpu/src/aarch64.rs index 84865dd0ac..31ea3b1dac 100644 --- a/kernel/cpu/src/aarch64.rs +++ b/kernel/cpu/src/aarch64.rs @@ -40,6 +40,11 @@ pub fn register_cpu(bootstrap: bool) -> Result<(), &'static str> { } } +// Returns an iterator over the available CPUs. +pub fn cpus() -> impl Iterator { + ONLINE_CPUS.read().clone().into_iter() +} + /// Returns the number of CPUs (SMP cores) that exist and /// are currently initialized on this system. pub fn cpu_count() -> u32 { diff --git a/kernel/cpu/src/x86_64.rs b/kernel/cpu/src/x86_64.rs index de158ef812..5fc1776524 100644 --- a/kernel/cpu/src/x86_64.rs +++ b/kernel/cpu/src/x86_64.rs @@ -26,6 +26,11 @@ impl TryFrom for CpuId { } } +// Returns an iterator over the available CPUs. +pub fn cpus() -> impl Iterator { + apic::get_lapics().iter().map(|(apic_id, _)| (*apic_id).into()) +} + /// Returns the number of CPUs (SMP cores) that exist and /// are currently initialized on this system. diff --git a/kernel/task_struct/src/lib.rs b/kernel/task_struct/src/lib.rs index 7f0e47b82c..a85d098970 100755 --- a/kernel/task_struct/src/lib.rs +++ b/kernel/task_struct/src/lib.rs @@ -485,6 +485,18 @@ impl Task { } } + /// Same as [`block`], but doesn't print a warning if the task is already + /// blocked. + /// + /// This method can be useful if blocking potentially blocked tasks in a + /// loop (e.g. `test_scheduler`). Logging is very slow, and this function + /// can lead to a `100x` performance improvement. + pub fn block_no_log(&self) { + let _ = self + .runstate + .compare_exchange(RunState::Runnable, RunState::Blocked); + } + /// Blocks this `Task` if it is a newly-spawned task currently being initialized. /// /// This is a special case only to be used when spawning a new task that @@ -517,6 +529,18 @@ impl Task { } } + /// Same as [`unblock`], but doesn't print a warning if the task is already + /// unblocked. + /// + /// This method can be useful if unblocking potentially unblocked tasks in a + /// loop (e.g. `test_scheduler`). Logging is very slow, and this function + /// can lead to a `100x` performance improvement. + pub fn unblock_no_log(&self) { + let _ = self + .runstate + .compare_exchange(RunState::Blocked, RunState::Runnable); + } + /// Makes this `Task` `Runnable` if it is a newly-spawned and fully initialized task. /// /// This is a special case only to be used when spawning a new task that From 86939c827b6d9e6175f77f76c89abdde0944437c Mon Sep 17 00:00:00 2001 From: Klimenty Tsoutsman Date: Sat, 16 Sep 2023 23:17:18 +1000 Subject: [PATCH 2/3] Remove whitespace Signed-off-by: Klimenty Tsoutsman --- kernel/cpu/src/x86_64.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/kernel/cpu/src/x86_64.rs b/kernel/cpu/src/x86_64.rs index 5fc1776524..2f85d5ddd0 100644 --- a/kernel/cpu/src/x86_64.rs +++ b/kernel/cpu/src/x86_64.rs @@ -31,7 +31,6 @@ pub fn cpus() -> impl Iterator { apic::get_lapics().iter().map(|(apic_id, _)| (*apic_id).into()) } - /// Returns the number of CPUs (SMP cores) that exist and /// are currently initialized on this system. pub fn cpu_count() -> u32 { From 730278d7a2a92cc87e3f1a661948f4c15d12b437 Mon Sep 17 00:00:00 2001 From: Klimenty Tsoutsman Date: Sat, 16 Sep 2023 23:30:27 +1000 Subject: [PATCH 3/3] Clippy Signed-off-by: Klimenty Tsoutsman --- applications/test_scheduler/src/lib.rs | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/applications/test_scheduler/src/lib.rs b/applications/test_scheduler/src/lib.rs index aefa3f0a66..d4e227158c 100644 --- a/applications/test_scheduler/src/lib.rs +++ b/applications/test_scheduler/src/lib.rs @@ -31,6 +31,7 @@ pub fn test_pinned() { let tasks = cpus() .map(|cpu| { ( + #[allow(clippy::clone_on_copy)] cpu.clone(), (0..100) .map(move |id| { @@ -70,7 +71,9 @@ pub fn test_pinned() { fn pinned_worker(pinned_cpu: CpuId) { let mut rng = random::init_rng::().unwrap(); - while !READY.load(Ordering::Acquire) {} + while !READY.load(Ordering::Acquire) { + core::hint::spin_loop(); + } let locked = TASKS.read(); let tasks = &locked.iter().find(|(cpu, _)| *cpu == pinned_cpu).unwrap().1; @@ -89,9 +92,9 @@ pub fn test_pinned() { continue; } - let _ = random_task.block_no_log(); + random_task.block_no_log(); task::schedule(); - let _ = random_task.unblock_no_log(); + random_task.unblock_no_log(); } } } @@ -124,10 +127,10 @@ pub fn test_unpinned() { let mut rng = random::init_rng::().unwrap(); while NUM_RUNNING.load(Ordering::Relaxed) != 0 { let random_task = tasks.choose(&mut rng).unwrap(); - let _ = random_task.block_no_log(); + random_task.block_no_log(); // Let the worker tasks on this core run. task::schedule(); - let _ = random_task.unblock_no_log(); + random_task.unblock_no_log(); } for task in tasks { @@ -135,7 +138,9 @@ pub fn test_unpinned() { } fn unpinned_worker(_: ()) { - while !READY.load(Ordering::Acquire) {} + while !READY.load(Ordering::Acquire) { + core::hint::spin_loop(); + } for _ in 0..1000 { task::schedule();