Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update test_scheduler #1043

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 12 additions & 14 deletions applications/test_scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,18 @@
name = "test_scheduler"
version = "0.1.0"
authors = ["Namitha Liyanage <namithaliyanage@gmail.com>"]
edition = "2021"

[dependencies]
app_io = { path = "../../kernel/app_io" }
cpu = { path = "../../kernel/cpu" }
log = "0.4.8"
random = { path = "../../kernel/random" }
spawn = { path = "../../kernel/spawn" }
sync_block = { path = "../../kernel/sync_block" }
task = { path = "../../kernel/task" }

[dependencies.log]
version = "0.4.8"

[dependencies.spawn]
path = "../../kernel/spawn"

[dependencies.scheduler]
path = "../../kernel/scheduler"

[dependencies.task]
path = "../../kernel/task"

[dependencies.cpu]
path = "../../kernel/cpu"
[dependencies.rand]
version = "0.8.5"
default-features = false
features = ["small_rng"]
181 changes: 127 additions & 54 deletions applications/test_scheduler/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,78 +1,151 @@
#![no_std]

#[macro_use] extern crate log;
extern crate alloc;
extern crate spawn;
extern crate scheduler;
extern crate task;
extern crate cpu;

use core::convert::TryFrom;

use alloc::string::String;
use alloc::vec::Vec;
use cpu::CpuId;
use alloc::{format, string::String, vec::Vec};
use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};

use app_io::println;
use cpu::{cpus, CpuId};
use rand::seq::SliceRandom;
use sync_block::RwLock;
use task::TaskRef;

pub fn main(_args: Vec<String>) -> isize {
let cpu_1 = CpuId::try_from(1).expect("CPU ID 1 did not exist");

let taskref1 = spawn::new_task_builder(worker, ())
.name(String::from("test1"))
.pin_on_cpu(cpu_1)
.spawn().expect("failed to initiate task");
println!("testing pinned");
test_pinned();
println!("testing unpinned");
test_unpinned();
0
}

if let Err(e) = scheduler::set_priority(&taskref1, 30) {
error!("scheduler_eval(): Could not set priority to taskref1: {}", e);
// Spawn a bunch of pinned tasks, and then each pinned task randomly blocks and
// unblocks other tasks than are pinned to the same CPU.
//
// The tasks must be pinned to the same CPU to avoid a deadlock where two tasks
// on different CPUs block each other at the same time and then yield.
pub fn test_pinned() {
static TASKS: RwLock<Vec<(CpuId, Vec<TaskRef>)>> = RwLock::new(Vec::new());
static READY: AtomicBool = AtomicBool::new(false);

let tasks = cpus()
.map(|cpu| {
(
#[allow(clippy::clone_on_copy)]
cpu.clone(),
(0..100)
.map(move |id| {
spawn::new_task_builder(pinned_worker, cpu)
.name(format!("test-scheduler-pinned-{cpu}-{id}"))
.pin_on_cpu(cpu)
.block()
.spawn()
.expect("failed to spawn task")
})
.collect::<Vec<_>>(),
)
})
.collect::<Vec<_>>();

*TASKS.write() = tasks
.iter()
.map(|(cpu, task_iter)| (*cpu, task_iter.iter().map(|task| (*task).clone()).collect()))
.collect();

for (_, task_list) in tasks.iter() {
for task in task_list {
task.unblock().unwrap();
}
}

debug!("Spawned Task 1");

let taskref2 = spawn::new_task_builder(worker, ())
.name(String::from("test2"))
.pin_on_cpu(cpu_1)
.spawn().expect("failed to initiate task");
READY.store(true, Ordering::Release);

if let Err(e) = scheduler::set_priority(&taskref2, 20) {
error!("scheduler_eval(): Could not set priority to taskref2: {}", e);
for (_, task_list) in tasks {
for task in task_list {
task.join().unwrap();
}
}

debug!("Spawned Task 2");

let taskref3 = spawn::new_task_builder(worker, ())
.name(String::from("test3"))
.pin_on_cpu(cpu_1)
.spawn().expect("failed to initiate task");

if let Err(e) = scheduler::set_priority(&taskref3, 10) {
error!("scheduler_eval(): Could not set priority to taskref3: {}", e);
// We have to drop the tasks so that the `test-scheduler` crate can be dropped.
*TASKS.write() = Vec::new();

fn pinned_worker(pinned_cpu: CpuId) {
let mut rng = random::init_rng::<rand::rngs::SmallRng>().unwrap();
while !READY.load(Ordering::Acquire) {
core::hint::spin_loop();
}

let locked = TASKS.read();
let tasks = &locked.iter().find(|(cpu, _)| *cpu == pinned_cpu).unwrap().1;
for _ in 0..100 {
assert_eq!(
cpu::current_cpu(),
pinned_cpu,
"pinned worker migrated cores"
);

let random_task = tasks.choose(&mut rng).unwrap();

let chose_self =
task::with_current_task(|current_task| random_task == current_task).unwrap();
if chose_self {
continue;
}

random_task.block_no_log();
task::schedule();
random_task.unblock_no_log();
}
}
}

debug!("Spawned Task 3");
/// Spawn a bunch of unpinned tasks, and then block and unblock random tasks
/// from the main thread.
pub fn test_unpinned() {
const NUM_TASKS: usize = 500;

static READY: AtomicBool = AtomicBool::new(false);
static NUM_RUNNING: AtomicUsize = AtomicUsize::new(NUM_TASKS);

let tasks = (0..NUM_TASKS)
.map(move |id| {
spawn::new_task_builder(unpinned_worker, ())
.name(format!("test-scheduler-unpinned-{id}"))
.block()
.spawn()
.expect("failed to spawn task")
})
.collect::<Vec<_>>();

for task in tasks.iter() {
task.unblock().unwrap();
}

debug!("Spawned all tasks");
READY.store(true, Ordering::Release);

let _priority1 = scheduler::get_priority(&taskref1);
let _priority2 = scheduler::get_priority(&taskref2);
let _priority3 = scheduler::get_priority(&taskref3);
// Cause some mayhem.
let mut rng = random::init_rng::<rand::rngs::SmallRng>().unwrap();
while NUM_RUNNING.load(Ordering::Relaxed) != 0 {
let random_task = tasks.choose(&mut rng).unwrap();
random_task.block_no_log();
// Let the worker tasks on this core run.
task::schedule();
random_task.unblock_no_log();
}

#[cfg(epoch_scheduler)]
{
assert_eq!(_priority1,Some(30));
assert_eq!(_priority2,Some(20));
assert_eq!(_priority3,Some(10));
for task in tasks {
task.join().unwrap();
}

taskref1.join().expect("Task 1 join failed");
taskref2.join().expect("Task 2 join failed");
taskref3.join().expect("Task 3 join failed");
fn unpinned_worker(_: ()) {
while !READY.load(Ordering::Acquire) {
core::hint::spin_loop();
}

0
}
for _ in 0..1000 {
task::schedule();
}

fn worker(_: ()) {
for i in 0..1000 {
debug!("Task_ID : {} , Instance : {}", task::get_my_current_task_id(), i);
scheduler::schedule();
NUM_RUNNING.fetch_sub(1, Ordering::Relaxed);
}
}
5 changes: 5 additions & 0 deletions kernel/cpu/src/aarch64.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ pub fn register_cpu(bootstrap: bool) -> Result<(), &'static str> {
}
}

// Returns an iterator over the available CPUs.
pub fn cpus() -> impl Iterator<Item = CpuId> {
ONLINE_CPUS.read().clone().into_iter()
}

/// Returns the number of CPUs (SMP cores) that exist and
/// are currently initialized on this system.
pub fn cpu_count() -> u32 {
Expand Down
4 changes: 4 additions & 0 deletions kernel/cpu/src/x86_64.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ impl TryFrom<u32> for CpuId {
}
}

// Returns an iterator over the available CPUs.
pub fn cpus() -> impl Iterator<Item = CpuId> {
apic::get_lapics().iter().map(|(apic_id, _)| (*apic_id).into())
}

/// Returns the number of CPUs (SMP cores) that exist and
/// are currently initialized on this system.
Expand Down
24 changes: 24 additions & 0 deletions kernel/task_struct/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,18 @@ impl Task {
}
}

/// Same as [`block`], but doesn't print a warning if the task is already
/// blocked.
///
/// This method can be useful if blocking potentially blocked tasks in a
/// loop (e.g. `test_scheduler`). Logging is very slow, and this function
/// can lead to a `100x` performance improvement.
pub fn block_no_log(&self) {
let _ = self
.runstate
.compare_exchange(RunState::Runnable, RunState::Blocked);
}

/// Blocks this `Task` if it is a newly-spawned task currently being initialized.
///
/// This is a special case only to be used when spawning a new task that
Expand Down Expand Up @@ -517,6 +529,18 @@ impl Task {
}
}

/// Same as [`unblock`], but doesn't print a warning if the task is already
/// unblocked.
///
/// This method can be useful if unblocking potentially unblocked tasks in a
/// loop (e.g. `test_scheduler`). Logging is very slow, and this function
/// can lead to a `100x` performance improvement.
pub fn unblock_no_log(&self) {
let _ = self
.runstate
.compare_exchange(RunState::Blocked, RunState::Runnable);
}

/// Makes this `Task` `Runnable` if it is a newly-spawned and fully initialized task.
///
/// This is a special case only to be used when spawning a new task that
Expand Down