Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

queue: support priority queue #72

Merged
merged 10 commits into from
Dec 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ repository = "https://github.com/tikv/yatp/"

[dependencies]
crossbeam-deque = "0.8"
crossbeam-skiplist = "0.1"
crossbeam-utils = "0.8"
dashmap = "5.1"
fail = "0.5"
lazy_static = "1"
Expand Down
20 changes: 19 additions & 1 deletion benches/chained_spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ mod yatp_callback {

mod yatp_future {
use criterion::*;
use std::sync::mpsc;
use std::sync::{mpsc, Arc};
use yatp::queue::priority::TaskPriorityProvider;
use yatp::task::future::TaskCell;
use yatp::Remote;

Expand Down Expand Up @@ -73,6 +74,20 @@ mod yatp_future {
let pool = yatp::Builder::new("chained_spawn").build_multilevel_future_pool();
chained_spawn(b, pool, iter_count)
}

pub fn chained_spawn_priority(b: &mut Bencher<'_>, iter_count: usize) {
struct ConstantPriorityPrivider;

impl TaskPriorityProvider for ConstantPriorityPrivider {
fn priority_of(&self, _extras: &yatp::queue::Extras) -> u64 {
// return a constant value so the queue workes the same as FIFO queue.
0
}
}
let pool = yatp::Builder::new("chained_spawn")
.build_priority_future_pool(Arc::new(ConstantPriorityPrivider));
chained_spawn(b, pool, iter_count)
}
}

mod tokio {
Expand Down Expand Up @@ -153,6 +168,9 @@ pub fn chained_spawn(b: &mut Criterion) {
i,
|b, i| yatp_future::chained_spawn_multilevel(b, *i),
);
group.bench_with_input(BenchmarkId::new("yatp::future::priority", i), i, |b, i| {
yatp_future::chained_spawn_priority(b, *i)
});
group.bench_with_input(BenchmarkId::new("tokio", i), i, |b, i| {
tokio::chained_spawn(b, *i)
});
Expand Down
17 changes: 17 additions & 0 deletions benches/ping_pong.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ mod yatp_future {
use std::sync::atomic::*;
use std::sync::*;
use tokio::sync::oneshot;
use yatp::queue::priority::TaskPriorityProvider;
use yatp::task::future::TaskCell;

fn ping_pong(b: &mut Bencher<'_>, pool: yatp::ThreadPool<TaskCell>, ping_count: usize) {
Expand Down Expand Up @@ -105,6 +106,19 @@ mod yatp_future {
let pool = yatp::Builder::new("ping_pong").build_multilevel_future_pool();
ping_pong(b, pool, ping_count)
}

pub fn ping_pong_priority(b: &mut Bencher<'_>, ping_count: usize) {
struct ConstantPriorityPrivider;
impl TaskPriorityProvider for ConstantPriorityPrivider {
fn priority_of(&self, _extras: &yatp::queue::Extras) -> u64 {
// return a constant value so the queue workes the same as FIFO queue.
0
}
}
let pool = yatp::Builder::new("ping_pong")
.build_priority_future_pool(Arc::new(ConstantPriorityPrivider));
ping_pong(b, pool, ping_count)
}
}

mod tokio {
Expand Down Expand Up @@ -219,6 +233,9 @@ pub fn ping_pong(b: &mut Criterion) {
i,
|b, i| yatp_future::ping_pong_multilevel(b, *i),
);
group.bench_with_input(BenchmarkId::new("yatp::future::priority", i), i, |b, i| {
yatp_future::ping_pong_priority(b, *i)
});
group.bench_with_input(BenchmarkId::new("tokio", i), i, |b, i| {
tokio::ping_pong(b, *i)
});
Expand Down
17 changes: 17 additions & 0 deletions benches/spawn_many.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ mod yatp_future {
use criterion::*;
use std::sync::atomic::*;
use std::sync::*;
use yatp::queue::priority::TaskPriorityProvider;
use yatp::task::future::TaskCell;

fn spawn_many(b: &mut Bencher<'_>, pool: yatp::ThreadPool<TaskCell>, spawn_count: usize) {
Expand Down Expand Up @@ -68,6 +69,19 @@ mod yatp_future {
let pool = yatp::Builder::new("spawn_many").build_multilevel_future_pool();
spawn_many(b, pool, spawn_count)
}

pub fn spawn_many_priority(b: &mut Bencher<'_>, spawn_count: usize) {
struct ConstantPriorityPrivider;
impl TaskPriorityProvider for ConstantPriorityPrivider {
fn priority_of(&self, _extras: &yatp::queue::Extras) -> u64 {
// return a constant value so the queue workes the same as FIFO queue.
0
}
}
let pool = yatp::Builder::new("spawn_many")
.build_priority_future_pool(Arc::new(ConstantPriorityPrivider));
spawn_many(b, pool, spawn_count)
}
}

mod threadpool {
Expand Down Expand Up @@ -174,6 +188,9 @@ pub fn spawn_many(b: &mut Criterion) {
i,
|b, i| yatp_future::spawn_many_multilevel(b, *i),
);
group.bench_with_input(BenchmarkId::new("yatp::future::priority", i), i, |b, i| {
yatp_future::spawn_many_priority(b, *i)
});
group.bench_with_input(BenchmarkId::new("threadpool", i), i, |b, i| {
threadpool::spawn_many(b, *i)
});
Expand Down
19 changes: 18 additions & 1 deletion benches/yield_many.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ mod yatp_callback {

mod yatp_future {
use criterion::*;
use std::sync::mpsc;
use std::sync::{mpsc, Arc};
use yatp::queue::priority::TaskPriorityProvider;
use yatp::task::future::TaskCell;

fn yield_many(b: &mut Bencher<'_>, pool: yatp::ThreadPool<TaskCell>, yield_count: usize) {
Expand Down Expand Up @@ -94,6 +95,19 @@ mod yatp_future {
let pool = yatp::Builder::new("yield_many").build_multilevel_future_pool();
yield_many(b, pool, yield_count)
}

pub fn yield_many_priority(b: &mut Bencher<'_>, yield_count: usize) {
struct ConstantPriorityPrivider;
impl TaskPriorityProvider for ConstantPriorityPrivider {
fn priority_of(&self, _extras: &yatp::queue::Extras) -> u64 {
// return a constant value so the queue workes the same as FIFO queue.
0
}
}
let pool = yatp::Builder::new("yield_many")
.build_priority_future_pool(Arc::new(ConstantPriorityPrivider));
yield_many(b, pool, yield_count)
}
}

mod tokio {
Expand Down Expand Up @@ -167,6 +181,9 @@ pub fn yield_many(b: &mut Criterion) {
i,
|b, i| yatp_future::yield_many_multilevel(b, *i),
);
group.bench_with_input(BenchmarkId::new("yatp::future::priority", i), i, |b, i| {
yatp_future::yield_many_priority(b, *i)
});
group.bench_with_input(BenchmarkId::new("tokio", i), i, |b, i| {
tokio::yield_many(b, *i)
});
Expand Down
2 changes: 1 addition & 1 deletion src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

mod builder;
mod runner;
mod spawn;
pub(crate) mod spawn;
mod worker;

pub use self::builder::{Builder, SchedConfig};
Expand Down
15 changes: 14 additions & 1 deletion src/pool/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use crate::pool::spawn::QueueCore;
use crate::pool::worker::WorkerThread;
use crate::pool::{CloneRunnerBuilder, Local, Remote, Runner, RunnerBuilder, ThreadPool};
use crate::queue::{self, multilevel, LocalQueue, QueueType, TaskCell};
use crate::queue::{self, multilevel, priority, LocalQueue, QueueType, TaskCell};
use crate::task::{callback, future};
use std::sync::{
atomic::{AtomicUsize, Ordering},
Expand Down Expand Up @@ -290,6 +290,19 @@ impl Builder {
self.build_with_queue_and_runner(QueueType::Multilevel(queue_builder), runner_builder)
}

/// Spawn a priority future pool.
///
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment

/// It setups the pool with priority queue. Caller must provide a `TaskPriorityProvider` implementation to generate the proper priority value for each spawned task.
pub fn build_priority_future_pool(
&self,
priority_provider: Arc<dyn priority::TaskPriorityProvider>,
) -> ThreadPool<future::TaskCell> {
let fb = CloneRunnerBuilder(future::Runner::default());
let queue_builder = priority::Builder::new(priority::Config::default(), priority_provider);
let runner_builder = queue_builder.runner_builder(fb);
self.build_with_queue_and_runner(QueueType::Priority(queue_builder), runner_builder)
}

/// Spawns the thread pool immediately.
///
/// `queue_builder` is a closure that creates a task queue. It accepts the
Expand Down
20 changes: 19 additions & 1 deletion src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
//! data structs.

pub mod multilevel;
pub mod priority;

mod extras;
mod single_level;
Expand All @@ -18,7 +19,7 @@ pub use self::extras::Extras;
use std::time::Instant;

/// A cell containing a task and needed extra information.
pub trait TaskCell {
pub trait TaskCell: 'static {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The skiplist's value must be 'static. I think it's fair that a task spawned in the thread pool should live long enough.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see why skiplist's value should be 'static. Technically, as long as the value outlive skiplist, it should be OK. If that's an API shortcoming, then the constraint should be put into priority queue instead of here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like an API shortcoming. I tried making the constraint priority-queue-only, but because we use enum to dispatch instead of trait, the constraint propagates up.

/// Gets mutable extra information.
fn mut_extras(&mut self) -> &mut Extras;
}
Expand All @@ -42,6 +43,7 @@ pub(crate) struct TaskInjector<T>(InjectorInner<T>);
enum InjectorInner<T> {
SingleLevel(single_level::TaskInjector<T>),
Multilevel(multilevel::TaskInjector<T>),
Priority(priority::TaskInjector<T>),
}

impl<T: TaskCell + Send> TaskInjector<T> {
Expand All @@ -50,13 +52,15 @@ impl<T: TaskCell + Send> TaskInjector<T> {
match &self.0 {
InjectorInner::SingleLevel(q) => q.push(task_cell),
InjectorInner::Multilevel(q) => q.push(task_cell),
InjectorInner::Priority(q) => q.push(task_cell),
}
}

pub fn default_extras(&self) -> Extras {
match self.0 {
InjectorInner::SingleLevel(_) => Extras::single_level(),
InjectorInner::Multilevel(_) => Extras::multilevel_default(),
InjectorInner::Priority(_) => Extras::single_level(),
}
}
}
Expand All @@ -80,6 +84,7 @@ pub(crate) struct LocalQueue<T>(LocalQueueInner<T>);
enum LocalQueueInner<T> {
SingleLevel(single_level::LocalQueue<T>),
Multilevel(multilevel::LocalQueue<T>),
Priority(priority::LocalQueue<T>),
}

impl<T: TaskCell + Send> LocalQueue<T> {
Expand All @@ -88,6 +93,7 @@ impl<T: TaskCell + Send> LocalQueue<T> {
match &mut self.0 {
LocalQueueInner::SingleLevel(q) => q.push(task_cell),
LocalQueueInner::Multilevel(q) => q.push(task_cell),
LocalQueueInner::Priority(q) => q.push(task_cell),
}
}

Expand All @@ -97,13 +103,15 @@ impl<T: TaskCell + Send> LocalQueue<T> {
match &mut self.0 {
LocalQueueInner::SingleLevel(q) => q.pop(),
LocalQueueInner::Multilevel(q) => q.pop(),
LocalQueueInner::Priority(q) => q.pop(),
}
}

pub fn default_extras(&self) -> Extras {
match self.0 {
LocalQueueInner::SingleLevel(_) => Extras::single_level(),
LocalQueueInner::Multilevel(_) => Extras::multilevel_default(),
LocalQueueInner::Priority(_) => Extras::single_level(),
}
}

Expand All @@ -113,6 +121,7 @@ impl<T: TaskCell + Send> LocalQueue<T> {
match &mut self.0 {
LocalQueueInner::SingleLevel(q) => q.has_tasks_or_pull(),
LocalQueueInner::Multilevel(q) => q.has_tasks_or_pull(),
LocalQueueInner::Priority(q) => q.has_tasks_or_pull(),
}
}
}
Expand All @@ -125,6 +134,8 @@ pub enum QueueType {
///
/// More to see: https://en.wikipedia.org/wiki/Multilevel_feedback_queue.
Multilevel(multilevel::Builder),
/// A concurrent prioirty queue.
Priority(priority::Builder),
}

impl Default for QueueType {
Expand All @@ -139,10 +150,17 @@ impl From<multilevel::Builder> for QueueType {
}
}

impl From<priority::Builder> for QueueType {
fn from(b: priority::Builder) -> QueueType {
QueueType::Priority(b)
}
}

pub(crate) fn build<T>(ty: QueueType, local_num: usize) -> (TaskInjector<T>, Vec<LocalQueue<T>>) {
match ty {
QueueType::SingleLevel => single_level(local_num),
QueueType::Multilevel(b) => b.build(local_num),
QueueType::Priority(b) => b.build(local_num),
}
}

Expand Down
22 changes: 21 additions & 1 deletion src/queue/extras.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ pub struct Extras {
pub(crate) fixed_level: Option<u8>,
/// Number of execute times
pub(crate) exec_times: u32,
/// Extra metadata of this task. User can use this field to store arbitrary data. It is useful
/// in some case to implement more complext `TaskPriorityProvider` in the priority task queue.
pub(crate) metadata: Vec<u8>,
}

impl Extras {
Expand All @@ -43,6 +46,7 @@ impl Extras {
current_level: 0,
fixed_level: None,
exec_times: 0,
metadata: Vec::new(),
}
}

Expand All @@ -62,9 +66,10 @@ impl Extras {
task_id,
running_time: Some(Arc::new(ElapsedTime::default())),
total_running_time: None,
current_level: 0,
current_level: fixed_level.unwrap_or(0),
fixed_level,
exec_times: 0,
metadata: Vec::new(),
}
}

Expand All @@ -89,4 +94,19 @@ impl Extras {
pub fn current_level(&self) -> u8 {
self.current_level
}

/// Gets the metadata of this task.
pub fn metadata(&self) -> &[u8] {
&self.metadata
}

/// Gets the mutable metadata of this task.
pub fn metadata_mut(&mut self) -> &mut Vec<u8> {
&mut self.metadata
}

/// Set the metadata of this task.
pub fn set_metadata(&mut self, metadata: Vec<u8>) {
self.metadata = metadata;
}
}
Loading