Skip to content

Commit

Permalink
fix comments and add benchmark
Browse files Browse the repository at this point in the history
Signed-off-by: glorv <glorvs@163.com>
  • Loading branch information
glorv committed Dec 26, 2022
1 parent c9c1050 commit 9dcf235
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 17 deletions.
20 changes: 19 additions & 1 deletion benches/chained_spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ mod yatp_callback {

mod yatp_future {
use criterion::*;
use std::sync::mpsc;
use std::sync::{mpsc, Arc};
use yatp::queue::priority::TaskPriorityProvider;
use yatp::task::future::TaskCell;
use yatp::Remote;

Expand Down Expand Up @@ -73,6 +74,20 @@ mod yatp_future {
let pool = yatp::Builder::new("chained_spawn").build_multilevel_future_pool();
chained_spawn(b, pool, iter_count)
}

pub fn chained_spawn_priority(b: &mut Bencher<'_>, iter_count: usize) {
struct ConstantPriorityPrivider;

impl TaskPriorityProvider for ConstantPriorityPrivider {
fn priority_of(&self, _extras: &yatp::queue::Extras) -> u64 {
// return a constant value so the queue workes the same as FIFO queue.
0
}
}
let pool = yatp::Builder::new("chained_spawn")
.build_priority_future_pool(Arc::new(ConstantPriorityPrivider));
chained_spawn(b, pool, iter_count)
}
}

mod tokio {
Expand Down Expand Up @@ -153,6 +168,9 @@ pub fn chained_spawn(b: &mut Criterion) {
i,
|b, i| yatp_future::chained_spawn_multilevel(b, *i),
);
group.bench_with_input(BenchmarkId::new("yatp::future::priority", i), i, |b, i| {
yatp_future::chained_spawn_priority(b, *i)
});
group.bench_with_input(BenchmarkId::new("tokio", i), i, |b, i| {
tokio::chained_spawn(b, *i)
});
Expand Down
17 changes: 17 additions & 0 deletions benches/ping_pong.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ mod yatp_future {
use std::sync::atomic::*;
use std::sync::*;
use tokio::sync::oneshot;
use yatp::queue::priority::TaskPriorityProvider;
use yatp::task::future::TaskCell;

fn ping_pong(b: &mut Bencher<'_>, pool: yatp::ThreadPool<TaskCell>, ping_count: usize) {
Expand Down Expand Up @@ -105,6 +106,19 @@ mod yatp_future {
let pool = yatp::Builder::new("ping_pong").build_multilevel_future_pool();
ping_pong(b, pool, ping_count)
}

pub fn ping_pong_priority(b: &mut Bencher<'_>, ping_count: usize) {
struct ConstantPriorityPrivider;
impl TaskPriorityProvider for ConstantPriorityPrivider {
fn priority_of(&self, _extras: &yatp::queue::Extras) -> u64 {
// return a constant value so the queue workes the same as FIFO queue.
0
}
}
let pool = yatp::Builder::new("ping_pong")
.build_priority_future_pool(Arc::new(ConstantPriorityPrivider));
ping_pong(b, pool, ping_count)
}
}

mod tokio {
Expand Down Expand Up @@ -219,6 +233,9 @@ pub fn ping_pong(b: &mut Criterion) {
i,
|b, i| yatp_future::ping_pong_multilevel(b, *i),
);
group.bench_with_input(BenchmarkId::new("yatp::future::priority", i), i, |b, i| {
yatp_future::ping_pong_priority(b, *i)
});
group.bench_with_input(BenchmarkId::new("tokio", i), i, |b, i| {
tokio::ping_pong(b, *i)
});
Expand Down
17 changes: 17 additions & 0 deletions benches/spawn_many.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ mod yatp_future {
use criterion::*;
use std::sync::atomic::*;
use std::sync::*;
use yatp::queue::priority::TaskPriorityProvider;
use yatp::task::future::TaskCell;

fn spawn_many(b: &mut Bencher<'_>, pool: yatp::ThreadPool<TaskCell>, spawn_count: usize) {
Expand Down Expand Up @@ -68,6 +69,19 @@ mod yatp_future {
let pool = yatp::Builder::new("spawn_many").build_multilevel_future_pool();
spawn_many(b, pool, spawn_count)
}

pub fn spawn_many_priority(b: &mut Bencher<'_>, spawn_count: usize) {
struct ConstantPriorityPrivider;
impl TaskPriorityProvider for ConstantPriorityPrivider {
fn priority_of(&self, _extras: &yatp::queue::Extras) -> u64 {
// return a constant value so the queue workes the same as FIFO queue.
0
}
}
let pool = yatp::Builder::new("spawn_many")
.build_priority_future_pool(Arc::new(ConstantPriorityPrivider));
spawn_many(b, pool, spawn_count)
}
}

mod threadpool {
Expand Down Expand Up @@ -174,6 +188,9 @@ pub fn spawn_many(b: &mut Criterion) {
i,
|b, i| yatp_future::spawn_many_multilevel(b, *i),
);
group.bench_with_input(BenchmarkId::new("yatp::future::priority", i), i, |b, i| {
yatp_future::spawn_many_priority(b, *i)
});
group.bench_with_input(BenchmarkId::new("threadpool", i), i, |b, i| {
threadpool::spawn_many(b, *i)
});
Expand Down
19 changes: 18 additions & 1 deletion benches/yield_many.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ mod yatp_callback {

mod yatp_future {
use criterion::*;
use std::sync::mpsc;
use std::sync::{mpsc, Arc};
use yatp::queue::priority::TaskPriorityProvider;
use yatp::task::future::TaskCell;

fn yield_many(b: &mut Bencher<'_>, pool: yatp::ThreadPool<TaskCell>, yield_count: usize) {
Expand Down Expand Up @@ -94,6 +95,19 @@ mod yatp_future {
let pool = yatp::Builder::new("yield_many").build_multilevel_future_pool();
yield_many(b, pool, yield_count)
}

pub fn yield_many_priority(b: &mut Bencher<'_>, yield_count: usize) {
struct ConstantPriorityPrivider;
impl TaskPriorityProvider for ConstantPriorityPrivider {
fn priority_of(&self, _extras: &yatp::queue::Extras) -> u64 {
// return a constant value so the queue workes the same as FIFO queue.
0
}
}
let pool = yatp::Builder::new("yield_many")
.build_priority_future_pool(Arc::new(ConstantPriorityPrivider));
yield_many(b, pool, yield_count)
}
}

mod tokio {
Expand Down Expand Up @@ -167,6 +181,9 @@ pub fn yield_many(b: &mut Criterion) {
i,
|b, i| yatp_future::yield_many_multilevel(b, *i),
);
group.bench_with_input(BenchmarkId::new("yatp::future::priority", i), i, |b, i| {
yatp_future::yield_many_priority(b, *i)
});
group.bench_with_input(BenchmarkId::new("tokio", i), i, |b, i| {
tokio::yield_many(b, *i)
});
Expand Down
2 changes: 1 addition & 1 deletion src/queue/extras.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub struct Extras {
pub(crate) fixed_level: Option<u8>,
/// Number of execute times
pub(crate) exec_times: u32,
/// Extra metadata of this task. User can use this field to store arbitrary data. It is useful
/// Extra metadata of this task. User can use this field to store arbitrary data. It is useful
/// in some case to implement more complext `TaskPriorityProvider` in the priority task queue.
pub(crate) metadata: Vec<u8>,
}
Expand Down
19 changes: 5 additions & 14 deletions src/queue/priority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,22 +59,13 @@ where
}
}

/// The local queue is just a proxy of the global queue.
pub(crate) struct LocalQueue<T> {
queue: Arc<QueueCore<T>>,
task_manager: PriorityTaskManager,
}
/// priority queue does not have local queue, all tasks are always put in the global queue.
pub(crate) type LocalQueue<T> = TaskInjector<T>;

impl<T> LocalQueue<T>
where
T: TaskCell + Send,
{
pub(super) fn push(&mut self, mut task_cell: T) {
let priority = self.task_manager.prepare_before_push(&mut task_cell);
set_schedule_time(&mut task_cell);
self.queue.push(task_cell, priority);
}

pub(super) fn pop(&mut self) -> Option<Pop<T>> {
self.queue.pop()
}
Expand All @@ -88,7 +79,7 @@ where
pub trait TaskPriorityProvider: Send + Sync + 'static {
/// Return a priority value of this task, all tasks in the priority
/// queue is ordered by this value.
fn get_priority(&self, extras: &Extras) -> u64;
fn priority_of(&self, extras: &Extras) -> u64;
}

#[derive(Clone)]
Expand All @@ -103,7 +94,7 @@ impl PriorityTaskManager {
T: TaskCell,
{
self.level_manager.adjust_task_level(task_cell);
self.priority_manager.get_priority(task_cell.mut_extras())
self.priority_manager.priority_of(task_cell.mut_extras())
}
}

Expand Down Expand Up @@ -415,7 +406,7 @@ mod tests {
struct OrderByIdProvider;

impl TaskPriorityProvider for OrderByIdProvider {
fn get_priority(&self, extras: &Extras) -> u64 {
fn priority_of(&self, extras: &Extras) -> u64 {
return extras.task_id();
}
}
Expand Down

0 comments on commit 9dcf235

Please sign in to comment.