diff --git a/Cargo.toml b/Cargo.toml index 9f21b27..8fb06a4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,8 @@ repository = "https://github.com/tikv/yatp/" [dependencies] crossbeam-deque = "0.8" +crossbeam-skiplist = "0.1" +crossbeam-utils = "0.8" dashmap = "5.1" fail = "0.5" lazy_static = "1" diff --git a/benches/chained_spawn.rs b/benches/chained_spawn.rs index ad8a311..6dece48 100644 --- a/benches/chained_spawn.rs +++ b/benches/chained_spawn.rs @@ -35,7 +35,8 @@ mod yatp_callback { mod yatp_future { use criterion::*; - use std::sync::mpsc; + use std::sync::{mpsc, Arc}; + use yatp::queue::priority::TaskPriorityProvider; use yatp::task::future::TaskCell; use yatp::Remote; @@ -73,6 +74,20 @@ mod yatp_future { let pool = yatp::Builder::new("chained_spawn").build_multilevel_future_pool(); chained_spawn(b, pool, iter_count) } + + pub fn chained_spawn_priority(b: &mut Bencher<'_>, iter_count: usize) { + struct ConstantPriorityPrivider; + + impl TaskPriorityProvider for ConstantPriorityPrivider { + fn priority_of(&self, _extras: &yatp::queue::Extras) -> u64 { + // return a constant value so the queue workes the same as FIFO queue. + 0 + } + } + let pool = yatp::Builder::new("chained_spawn") + .build_priority_future_pool(Arc::new(ConstantPriorityPrivider)); + chained_spawn(b, pool, iter_count) + } } mod tokio { @@ -153,6 +168,9 @@ pub fn chained_spawn(b: &mut Criterion) { i, |b, i| yatp_future::chained_spawn_multilevel(b, *i), ); + group.bench_with_input(BenchmarkId::new("yatp::future::priority", i), i, |b, i| { + yatp_future::chained_spawn_priority(b, *i) + }); group.bench_with_input(BenchmarkId::new("tokio", i), i, |b, i| { tokio::chained_spawn(b, *i) }); diff --git a/benches/ping_pong.rs b/benches/ping_pong.rs index 8df2027..503878a 100644 --- a/benches/ping_pong.rs +++ b/benches/ping_pong.rs @@ -53,6 +53,7 @@ mod yatp_future { use std::sync::atomic::*; use std::sync::*; use tokio::sync::oneshot; + use yatp::queue::priority::TaskPriorityProvider; use yatp::task::future::TaskCell; fn ping_pong(b: &mut Bencher<'_>, pool: yatp::ThreadPool, ping_count: usize) { @@ -105,6 +106,19 @@ mod yatp_future { let pool = yatp::Builder::new("ping_pong").build_multilevel_future_pool(); ping_pong(b, pool, ping_count) } + + pub fn ping_pong_priority(b: &mut Bencher<'_>, ping_count: usize) { + struct ConstantPriorityPrivider; + impl TaskPriorityProvider for ConstantPriorityPrivider { + fn priority_of(&self, _extras: &yatp::queue::Extras) -> u64 { + // return a constant value so the queue workes the same as FIFO queue. + 0 + } + } + let pool = yatp::Builder::new("ping_pong") + .build_priority_future_pool(Arc::new(ConstantPriorityPrivider)); + ping_pong(b, pool, ping_count) + } } mod tokio { @@ -219,6 +233,9 @@ pub fn ping_pong(b: &mut Criterion) { i, |b, i| yatp_future::ping_pong_multilevel(b, *i), ); + group.bench_with_input(BenchmarkId::new("yatp::future::priority", i), i, |b, i| { + yatp_future::ping_pong_priority(b, *i) + }); group.bench_with_input(BenchmarkId::new("tokio", i), i, |b, i| { tokio::ping_pong(b, *i) }); diff --git a/benches/spawn_many.rs b/benches/spawn_many.rs index e407723..e1c1aaa 100644 --- a/benches/spawn_many.rs +++ b/benches/spawn_many.rs @@ -35,6 +35,7 @@ mod yatp_future { use criterion::*; use std::sync::atomic::*; use std::sync::*; + use yatp::queue::priority::TaskPriorityProvider; use yatp::task::future::TaskCell; fn spawn_many(b: &mut Bencher<'_>, pool: yatp::ThreadPool, spawn_count: usize) { @@ -68,6 +69,19 @@ mod yatp_future { let pool = yatp::Builder::new("spawn_many").build_multilevel_future_pool(); spawn_many(b, pool, spawn_count) } + + pub fn spawn_many_priority(b: &mut Bencher<'_>, spawn_count: usize) { + struct ConstantPriorityPrivider; + impl TaskPriorityProvider for ConstantPriorityPrivider { + fn priority_of(&self, _extras: &yatp::queue::Extras) -> u64 { + // return a constant value so the queue workes the same as FIFO queue. + 0 + } + } + let pool = yatp::Builder::new("spawn_many") + .build_priority_future_pool(Arc::new(ConstantPriorityPrivider)); + spawn_many(b, pool, spawn_count) + } } mod threadpool { @@ -174,6 +188,9 @@ pub fn spawn_many(b: &mut Criterion) { i, |b, i| yatp_future::spawn_many_multilevel(b, *i), ); + group.bench_with_input(BenchmarkId::new("yatp::future::priority", i), i, |b, i| { + yatp_future::spawn_many_priority(b, *i) + }); group.bench_with_input(BenchmarkId::new("threadpool", i), i, |b, i| { threadpool::spawn_many(b, *i) }); diff --git a/benches/yield_many.rs b/benches/yield_many.rs index 77aae2f..1e76f13 100644 --- a/benches/yield_many.rs +++ b/benches/yield_many.rs @@ -61,7 +61,8 @@ mod yatp_callback { mod yatp_future { use criterion::*; - use std::sync::mpsc; + use std::sync::{mpsc, Arc}; + use yatp::queue::priority::TaskPriorityProvider; use yatp::task::future::TaskCell; fn yield_many(b: &mut Bencher<'_>, pool: yatp::ThreadPool, yield_count: usize) { @@ -94,6 +95,19 @@ mod yatp_future { let pool = yatp::Builder::new("yield_many").build_multilevel_future_pool(); yield_many(b, pool, yield_count) } + + pub fn yield_many_priority(b: &mut Bencher<'_>, yield_count: usize) { + struct ConstantPriorityPrivider; + impl TaskPriorityProvider for ConstantPriorityPrivider { + fn priority_of(&self, _extras: &yatp::queue::Extras) -> u64 { + // return a constant value so the queue workes the same as FIFO queue. + 0 + } + } + let pool = yatp::Builder::new("yield_many") + .build_priority_future_pool(Arc::new(ConstantPriorityPrivider)); + yield_many(b, pool, yield_count) + } } mod tokio { @@ -167,6 +181,9 @@ pub fn yield_many(b: &mut Criterion) { i, |b, i| yatp_future::yield_many_multilevel(b, *i), ); + group.bench_with_input(BenchmarkId::new("yatp::future::priority", i), i, |b, i| { + yatp_future::yield_many_priority(b, *i) + }); group.bench_with_input(BenchmarkId::new("tokio", i), i, |b, i| { tokio::yield_many(b, *i) }); diff --git a/src/pool.rs b/src/pool.rs index 932ad0b..1b4f684 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -7,7 +7,7 @@ mod builder; mod runner; -mod spawn; +pub(crate) mod spawn; mod worker; pub use self::builder::{Builder, SchedConfig}; diff --git a/src/pool/builder.rs b/src/pool/builder.rs index 3e431bd..29e3be3 100644 --- a/src/pool/builder.rs +++ b/src/pool/builder.rs @@ -3,7 +3,7 @@ use crate::pool::spawn::QueueCore; use crate::pool::worker::WorkerThread; use crate::pool::{CloneRunnerBuilder, Local, Remote, Runner, RunnerBuilder, ThreadPool}; -use crate::queue::{self, multilevel, LocalQueue, QueueType, TaskCell}; +use crate::queue::{self, multilevel, priority, LocalQueue, QueueType, TaskCell}; use crate::task::{callback, future}; use std::sync::{ atomic::{AtomicUsize, Ordering}, @@ -290,6 +290,19 @@ impl Builder { self.build_with_queue_and_runner(QueueType::Multilevel(queue_builder), runner_builder) } + /// Spawn a priority future pool. + /// + /// It setups the pool with priority queue. Caller must provide a `TaskPriorityProvider` implementation to generate the proper priority value for each spawned task. + pub fn build_priority_future_pool( + &self, + priority_provider: Arc, + ) -> ThreadPool { + let fb = CloneRunnerBuilder(future::Runner::default()); + let queue_builder = priority::Builder::new(priority::Config::default(), priority_provider); + let runner_builder = queue_builder.runner_builder(fb); + self.build_with_queue_and_runner(QueueType::Priority(queue_builder), runner_builder) + } + /// Spawns the thread pool immediately. /// /// `queue_builder` is a closure that creates a task queue. It accepts the diff --git a/src/queue.rs b/src/queue.rs index 2109e2c..9b06cc8 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -9,6 +9,7 @@ //! data structs. pub mod multilevel; +pub mod priority; mod extras; mod single_level; @@ -18,7 +19,7 @@ pub use self::extras::Extras; use std::time::Instant; /// A cell containing a task and needed extra information. -pub trait TaskCell { +pub trait TaskCell: 'static { /// Gets mutable extra information. fn mut_extras(&mut self) -> &mut Extras; } @@ -42,6 +43,7 @@ pub(crate) struct TaskInjector(InjectorInner); enum InjectorInner { SingleLevel(single_level::TaskInjector), Multilevel(multilevel::TaskInjector), + Priority(priority::TaskInjector), } impl TaskInjector { @@ -50,6 +52,7 @@ impl TaskInjector { match &self.0 { InjectorInner::SingleLevel(q) => q.push(task_cell), InjectorInner::Multilevel(q) => q.push(task_cell), + InjectorInner::Priority(q) => q.push(task_cell), } } @@ -57,6 +60,7 @@ impl TaskInjector { match self.0 { InjectorInner::SingleLevel(_) => Extras::single_level(), InjectorInner::Multilevel(_) => Extras::multilevel_default(), + InjectorInner::Priority(_) => Extras::single_level(), } } } @@ -80,6 +84,7 @@ pub(crate) struct LocalQueue(LocalQueueInner); enum LocalQueueInner { SingleLevel(single_level::LocalQueue), Multilevel(multilevel::LocalQueue), + Priority(priority::LocalQueue), } impl LocalQueue { @@ -88,6 +93,7 @@ impl LocalQueue { match &mut self.0 { LocalQueueInner::SingleLevel(q) => q.push(task_cell), LocalQueueInner::Multilevel(q) => q.push(task_cell), + LocalQueueInner::Priority(q) => q.push(task_cell), } } @@ -97,6 +103,7 @@ impl LocalQueue { match &mut self.0 { LocalQueueInner::SingleLevel(q) => q.pop(), LocalQueueInner::Multilevel(q) => q.pop(), + LocalQueueInner::Priority(q) => q.pop(), } } @@ -104,6 +111,7 @@ impl LocalQueue { match self.0 { LocalQueueInner::SingleLevel(_) => Extras::single_level(), LocalQueueInner::Multilevel(_) => Extras::multilevel_default(), + LocalQueueInner::Priority(_) => Extras::single_level(), } } @@ -113,6 +121,7 @@ impl LocalQueue { match &mut self.0 { LocalQueueInner::SingleLevel(q) => q.has_tasks_or_pull(), LocalQueueInner::Multilevel(q) => q.has_tasks_or_pull(), + LocalQueueInner::Priority(q) => q.has_tasks_or_pull(), } } } @@ -125,6 +134,8 @@ pub enum QueueType { /// /// More to see: https://en.wikipedia.org/wiki/Multilevel_feedback_queue. Multilevel(multilevel::Builder), + /// A concurrent prioirty queue. + Priority(priority::Builder), } impl Default for QueueType { @@ -139,10 +150,17 @@ impl From for QueueType { } } +impl From for QueueType { + fn from(b: priority::Builder) -> QueueType { + QueueType::Priority(b) + } +} + pub(crate) fn build(ty: QueueType, local_num: usize) -> (TaskInjector, Vec>) { match ty { QueueType::SingleLevel => single_level(local_num), QueueType::Multilevel(b) => b.build(local_num), + QueueType::Priority(b) => b.build(local_num), } } diff --git a/src/queue/extras.rs b/src/queue/extras.rs index 1c8a6b1..9fd5b27 100644 --- a/src/queue/extras.rs +++ b/src/queue/extras.rs @@ -28,6 +28,9 @@ pub struct Extras { pub(crate) fixed_level: Option, /// Number of execute times pub(crate) exec_times: u32, + /// Extra metadata of this task. User can use this field to store arbitrary data. It is useful + /// in some case to implement more complext `TaskPriorityProvider` in the priority task queue. + pub(crate) metadata: Vec, } impl Extras { @@ -43,6 +46,7 @@ impl Extras { current_level: 0, fixed_level: None, exec_times: 0, + metadata: Vec::new(), } } @@ -62,9 +66,10 @@ impl Extras { task_id, running_time: Some(Arc::new(ElapsedTime::default())), total_running_time: None, - current_level: 0, + current_level: fixed_level.unwrap_or(0), fixed_level, exec_times: 0, + metadata: Vec::new(), } } @@ -89,4 +94,19 @@ impl Extras { pub fn current_level(&self) -> u8 { self.current_level } + + /// Gets the metadata of this task. + pub fn metadata(&self) -> &[u8] { + &self.metadata + } + + /// Gets the mutable metadata of this task. + pub fn metadata_mut(&mut self) -> &mut Vec { + &mut self.metadata + } + + /// Set the metadata of this task. + pub fn set_metadata(&mut self, metadata: Vec) { + self.metadata = metadata; + } } diff --git a/src/queue/multilevel.rs b/src/queue/multilevel.rs index 27b9439..a676149 100644 --- a/src/queue/multilevel.rs +++ b/src/queue/multilevel.rs @@ -23,7 +23,8 @@ use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; use std::{f64, fmt, iter}; -const LEVEL_NUM: usize = 3; +/// Number of levels +pub const LEVEL_NUM: usize = 3; /// The chance ratio of level 1 and level 2 tasks. const CHANCE_RATIO: u32 = 4; @@ -33,7 +34,7 @@ const DEFAULT_CLEANUP_OLD_MAP_INTERVAL: Duration = Duration::from_secs(10); /// When local total elapsed time exceeds this value in microseconds, the local /// metrics is flushed to the global atomic metrics and try to trigger chance /// adjustment. -const FLUSH_LOCAL_THRESHOLD_US: u64 = 100_000; +pub(super) const FLUSH_LOCAL_THRESHOLD_US: u64 = 100_000; /// When the incremental total elapsed time exceeds this value, it will try to /// adjust level chances and reset the total elapsed time. @@ -322,8 +323,7 @@ where struct LevelManager { level0_elapsed_us: IntCounter, total_elapsed_us: IntCounter, - task_elapsed_map: TaskElapsedMap, - level_time_threshold: [Duration; LEVEL_NUM - 1], + task_level_mgr: TaskLevelManager, level0_chance: Gauge, level0_proportion_target: f64, adjusting: AtomicBool, @@ -347,25 +347,8 @@ impl LevelManager { where T: TaskCell, { - let extras = task_cell.mut_extras(); - let task_id = extras.task_id; - let current_level = match extras.fixed_level { - Some(level) => level, - None => { - let running_time = extras - .total_running_time - .get_or_insert_with(|| self.task_elapsed_map.get_elapsed(task_id)); - let running_time = running_time.as_duration(); - self.level_time_threshold - .iter() - .enumerate() - .find(|(_, &threshold)| running_time < threshold) - .map(|(level, _)| level) - .unwrap_or(LEVEL_NUM - 1) as u8 - } - }; - extras.current_level = current_level; - extras.schedule_time = Some(now()); + self.task_level_mgr.adjust_task_level(task_cell); + task_cell.mut_extras().schedule_time = Some(now()); } fn maybe_adjust_chance(&self) { @@ -422,7 +405,7 @@ impl LevelManager { std::cmp::min(total_tasks / level_0_tasks, LEVEL_MAX_QUEUE_MAX_STEAL_SIZE) }; self.max_level_queue_steal_size - .store(new_steal_count as usize, SeqCst); + .store(new_steal_count, SeqCst); for (i, c) in self.last_exec_tasks_per_level.iter().enumerate() { c.set(cur_total_tasks_per_level[i]); } @@ -432,14 +415,52 @@ impl LevelManager { } } +pub(super) struct TaskLevelManager { + task_elapsed_map: TaskElapsedMap, + level_time_threshold: [Duration; LEVEL_NUM - 1], +} + +impl TaskLevelManager { + pub fn new(level_time_threshold: [Duration; LEVEL_NUM - 1]) -> Self { + Self { + task_elapsed_map: TaskElapsedMap::default(), + level_time_threshold, + } + } + + pub fn adjust_task_level(&self, task_cell: &mut T) + where + T: TaskCell, + { + let extras = task_cell.mut_extras(); + let task_id = extras.task_id; + let current_level = match extras.fixed_level { + Some(level) => level, + None => { + let running_time = extras + .total_running_time + .get_or_insert_with(|| self.task_elapsed_map.get_elapsed(task_id)); + let running_time = running_time.as_duration(); + self.level_time_threshold + .iter() + .enumerate() + .find(|(_, &threshold)| running_time < threshold) + .map(|(level, _)| level) + .unwrap_or(LEVEL_NUM - 1) as u8 + } + }; + extras.current_level = current_level; + } +} + pub(crate) struct ElapsedTime(AtomicU64); impl ElapsedTime { pub(crate) fn as_duration(&self) -> Duration { - Duration::from_micros(self.0.load(Relaxed) as u64) + Duration::from_micros(self.0.load(Relaxed)) } - fn inc_by(&self, t: Duration) { + pub(super) fn inc_by(&self, t: Duration) { self.0.fetch_add(t.as_micros() as u64, Relaxed); } @@ -650,8 +671,7 @@ impl Builder { let manager = Arc::new(LevelManager { level0_elapsed_us, total_elapsed_us, - task_elapsed_map: Default::default(), - level_time_threshold: config.level_time_threshold, + task_level_mgr: TaskLevelManager::new(config.level_time_threshold), level0_chance, level0_proportion_target: config.level0_proportion_target, adjusting: AtomicBool::new(false), @@ -998,7 +1018,12 @@ mod tests { assert!(runner.handle(&mut locals[0], task_cell)); } assert!( - manager.task_elapsed_map.get_elapsed(1).as_duration() >= Duration::from_millis(100) + manager + .task_level_mgr + .task_elapsed_map + .get_elapsed(1) + .as_duration() + >= Duration::from_millis(100) ); } diff --git a/src/queue/priority.rs b/src/queue/priority.rs new file mode 100644 index 0000000..48056ef --- /dev/null +++ b/src/queue/priority.rs @@ -0,0 +1,482 @@ +// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. + +//! A priority task queue. Tasks are scheduled based on its priority, tasks with small priority +//! value will be scheduler earlier than bigger onces. User should implement The [`TaskPriorityProvider`] +//! to provide the priority value for each task. The priority value is fetched from the +//! [`TaskPriorityProvider`] at each time the task is scheduled. +//! +//! The task queue requires that the accompanying [`PriorityRunner`] must beused to collect necessary +//! information. + +use std::{ + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, + time::{Duration, Instant}, +}; + +use crossbeam_skiplist::SkipMap; +use crossbeam_utils::atomic::AtomicCell; +use prometheus::local::LocalIntCounter; +use prometheus::IntCounter; + +use crate::metrics::*; +use crate::pool::{Local, Runner, RunnerBuilder}; +use crate::queue::{ + multilevel::{TaskLevelManager, FLUSH_LOCAL_THRESHOLD_US, LEVEL_NUM}, + Extras, Pop, TaskCell, +}; + +// a wrapper of u64 with an extra sequence number to avoid duplicate value. +#[derive(Eq, PartialEq, Ord, PartialOrd)] +struct MapKey(u64, u64); + +/// The injector of a single level work stealing task queue. +#[derive(Clone)] +pub struct TaskInjector { + queue: Arc>, + task_manager: PriorityTaskManager, +} + +fn set_schedule_time(task_cell: &mut T) +where + T: TaskCell, +{ + task_cell.mut_extras().schedule_time = Some(Instant::now()); +} + +impl TaskInjector +where + T: TaskCell + Send, +{ + /// Pushes the task cell to the queue. The schedule time in the extras is + /// assigned to be now. + pub fn push(&self, mut task_cell: T) { + let priority = self.task_manager.prepare_before_push(&mut task_cell); + set_schedule_time(&mut task_cell); + self.queue.push(task_cell, priority); + } +} + +/// priority queue does not have local queue, all tasks are always put in the global queue. +pub(crate) type LocalQueue = TaskInjector; + +impl LocalQueue +where + T: TaskCell + Send, +{ + pub(super) fn pop(&mut self) -> Option> { + self.queue.pop() + } + + pub(super) fn has_tasks_or_pull(&mut self) -> bool { + !self.queue.is_empty() + } +} + +/// A trait used to generate priority value for each task. +pub trait TaskPriorityProvider: Send + Sync + 'static { + /// Return a priority value of this task, all tasks in the priority + /// queue is ordered by this value. + fn priority_of(&self, extras: &Extras) -> u64; +} + +#[derive(Clone)] +struct PriorityTaskManager { + level_manager: Arc, + priority_manager: Arc, +} + +impl PriorityTaskManager { + fn prepare_before_push(&self, task_cell: &mut T) -> u64 + where + T: TaskCell, + { + self.level_manager.adjust_task_level(task_cell); + self.priority_manager.priority_of(task_cell.mut_extras()) + } +} + +/// The global priority queue. We use a `SkipMap` as a priority queue, +/// The key is the priority and value is the task. +struct QueueCore { + pq: SkipMap>, + /// a global sequence generator to ensure all task keys are unique. + sequence: AtomicU64, +} + +impl QueueCore { + fn new() -> Self { + Self { + pq: SkipMap::new(), + sequence: AtomicU64::new(0), + } + } + + fn is_empty(&self) -> bool { + self.pq.is_empty() + } +} + +impl QueueCore { + fn push(&self, msg: T, priority: u64) { + self.pq.insert(self.gen_key(priority), Slot::new(msg)); + } + + pub fn pop(&self) -> Option> { + fn into_pop(mut t: T) -> Pop + where + T: TaskCell, + { + let schedule_time = t.mut_extras().schedule_time.unwrap(); + Pop { + task_cell: t, + schedule_time, + from_local: false, + } + } + + self.pq + .pop_front() + .map(|e| into_pop(e.value().take().unwrap())) + } + + #[inline] + fn gen_key(&self, priority: u64) -> MapKey { + MapKey(priority, self.sequence.fetch_add(1, Ordering::Relaxed)) + } +} + +/// A holder to store task. Wrap the task in a AtomicCell becuase crossbeam-skip only provide +/// readonly acess to a popped Entry. +struct Slot { + cell: AtomicCell>, +} + +impl Slot { + fn new(value: T) -> Self { + Self { + cell: AtomicCell::new(Some(value)), + } + } + + fn take(&self) -> Option { + self.cell.take() + } +} + +/// The runner for priority queues. +/// +/// The runner helps collect additional information to support auto-adjust task level. +/// [`PriorityRunnerBuiler`] is the [`RunnerBuilder`] for this runner. +pub struct PriorityRunner { + inner: R, + local_level0_elapsed_us: LocalIntCounter, + local_total_elapsed_us: LocalIntCounter, +} + +impl Runner for PriorityRunner +where + R: Runner, + T: TaskCell, +{ + type TaskCell = T; + + fn start(&mut self, local: &mut Local) { + self.inner.start(local) + } + + fn handle(&mut self, local: &mut Local, mut task_cell: T) -> bool { + let extras = task_cell.mut_extras(); + let level = extras.current_level(); + let total_running_time = extras.total_running_time.clone(); + let begin = Instant::now(); + let res = self.inner.handle(local, task_cell); + let elapsed = begin.elapsed(); + let elapsed_us = elapsed.as_micros() as u64; + if let Some(ref running_time) = total_running_time { + running_time.inc_by(elapsed); + } + if level == 0 { + self.local_level0_elapsed_us.inc_by(elapsed_us); + } + self.local_total_elapsed_us.inc_by(elapsed_us); + let local_total = self.local_total_elapsed_us.get(); + if local_total > FLUSH_LOCAL_THRESHOLD_US { + self.local_level0_elapsed_us.flush(); + self.local_total_elapsed_us.flush(); + } + res + } + + fn pause(&mut self, local: &mut Local) -> bool { + self.inner.pause(local) + } + + fn resume(&mut self, local: &mut Local) { + self.inner.resume(local) + } + + fn end(&mut self, local: &mut Local) { + self.inner.end(local) + } +} + +/// The runner builder for priority task queues. +/// +/// It can be created by [`Builder::runner_builder`]. +pub struct PriorityRunnerBuiler { + inner: B, + level0_elapsed_us: IntCounter, + total_elapsed_us: IntCounter, +} + +impl RunnerBuilder for PriorityRunnerBuiler +where + B: RunnerBuilder, + R: Runner, + T: TaskCell, +{ + type Runner = PriorityRunner; + + fn build(&mut self) -> Self::Runner { + PriorityRunner { + inner: self.inner.build(), + local_level0_elapsed_us: self.level0_elapsed_us.local(), + local_total_elapsed_us: self.total_elapsed_us.local(), + } + } +} + +/// The configurations of priority task queues. +pub struct Config { + name: Option, + level_time_threshold: [Duration; LEVEL_NUM - 1], +} + +impl Config { + /// Sets the name of the priority task queue. Metrics are available if name is provided. + pub fn name(mut self, name: Option>) -> Self { + self.name = name.map(Into::into); + self + } +} + +impl Default for Config { + fn default() -> Config { + Config { + name: None, + level_time_threshold: [Duration::from_millis(5), Duration::from_millis(100)], + } + } +} + +/// The builder of a priority task queue. +pub struct Builder { + manager: PriorityTaskManager, + level0_elapsed_us: IntCounter, + total_elapsed_us: IntCounter, +} + +impl Builder { + /// Creates a priority task queue builder with specified config and [`TaskPriorityProvider`]. + pub fn new(config: Config, priority_manager: Arc) -> Builder { + let Config { + name, + level_time_threshold, + } = config; + let (level0_elapsed_us, total_elapsed_us) = if let Some(name) = name { + ( + MULTILEVEL_LEVEL_ELAPSED + .get_metric_with_label_values(&[&name, "0"]) + .unwrap(), + MULTILEVEL_LEVEL_ELAPSED + .get_metric_with_label_values(&[&name, "total"]) + .unwrap(), + ) + } else { + ( + IntCounter::new("_", "_").unwrap(), + IntCounter::new("_", "_").unwrap(), + ) + }; + Self { + manager: PriorityTaskManager { + level_manager: Arc::new(TaskLevelManager::new(level_time_threshold)), + priority_manager, + }, + level0_elapsed_us, + total_elapsed_us, + } + } + + /// Creates a runner builder for the multilevel task queue with a normal runner builder. + pub fn runner_builder(&self, inner_runner_builder: B) -> PriorityRunnerBuiler { + PriorityRunnerBuiler { + inner: inner_runner_builder, + level0_elapsed_us: self.level0_elapsed_us.clone(), + total_elapsed_us: self.total_elapsed_us.clone(), + } + } + + pub(crate) fn build_raw(self, local_num: usize) -> (TaskInjector, Vec>) { + let queue = Arc::new(QueueCore::new()); + let local_queue = std::iter::repeat_with(|| LocalQueue { + queue: queue.clone(), + task_manager: self.manager.clone(), + }) + .take(local_num) + .collect(); + + let injector = TaskInjector { + queue, + task_manager: self.manager, + }; + + (injector, local_queue) + } + + pub(crate) fn build( + self, + local_num: usize, + ) -> (super::TaskInjector, Vec>) { + let (injector, locals) = self.build_raw(local_num); + ( + super::TaskInjector(super::InjectorInner::Priority(injector)), + locals + .into_iter() + .map(|i| super::LocalQueue(super::LocalQueueInner::Priority(i))) + .collect(), + ) + } +} + +#[cfg(test)] +mod tests { + use std::thread; + + use super::*; + use crate::queue::{Extras, InjectorInner}; + use rand::RngCore; + #[derive(Debug)] + struct MockTask { + sleep_ms: u64, + extras: Extras, + } + + impl MockTask { + fn new(sleep_ms: u64, task_id: u64) -> Self { + MockTask { + sleep_ms, + extras: Extras::new_multilevel(task_id, None), + } + } + } + + impl TaskCell for MockTask { + fn mut_extras(&mut self) -> &mut Extras { + &mut self.extras + } + } + + struct MockRunner; + + impl Runner for MockRunner { + type TaskCell = MockTask; + + fn handle(&mut self, _local: &mut Local, task_cell: MockTask) -> bool { + thread::sleep(Duration::from_millis(task_cell.sleep_ms)); + true + } + } + + struct MockRunnerBuilder; + + impl RunnerBuilder for MockRunnerBuilder { + type Runner = MockRunner; + + fn build(&mut self) -> MockRunner { + MockRunner + } + } + + #[test] + fn test_priority_queue() { + struct OrderByIdProvider; + + impl TaskPriorityProvider for OrderByIdProvider { + fn priority_of(&self, extras: &Extras) -> u64 { + return extras.task_id(); + } + } + + let local_count = 5usize; + let builder = Builder::new(Config::default(), Arc::new(OrderByIdProvider)); + let mut runner = builder.runner_builder(MockRunnerBuilder).build(); + let (injecter, locals) = builder.build(local_count); + let pq = match &injecter.0 { + InjectorInner::Priority(p) => p.queue.clone(), + _ => unreachable!(), + }; + let core = Arc::new(crate::pool::spawn::QueueCore::new( + injecter, + crate::pool::SchedConfig::default(), + )); + let mut locals: Vec<_> = locals + .into_iter() + .enumerate() + .map(|(i, l)| Local::new(i, l, core.clone())) + .collect(); + + let mut local = locals.pop().unwrap(); + let remote = crate::Remote::new(core.clone()); + for i in (0..5).rev() { + remote.spawn(MockTask::new(0, i)); + } + + for i in 0..5 { + let mut task = pq.pop().unwrap().task_cell; + assert_eq!(task.mut_extras().task_id(), i); + } + + // test multiple threads + let task_per_thread = 10usize; + let mut handlers = Vec::with_capacity(local_count); + for mut local in locals { + let h = std::thread::spawn(move || { + let mut rng = rand::thread_rng(); + for _i in 0..task_per_thread { + let task = MockTask::new(0, rng.next_u64()); + local.spawn(task); + } + }); + handlers.push(h); + } + for h in handlers { + h.join().unwrap(); + } + let mut last_id = 0u64; + for _ in 0..task_per_thread * (local_count - 1) { + let mut task = pq.pop().unwrap().task_cell; + assert!(task.mut_extras().task_id() >= last_id); + last_id = task.mut_extras().task_id(); + } + assert!(pq.is_empty()); + + // test dynamic level + + let mut run_task = |sleep_ms, level| { + remote.spawn(MockTask::new(sleep_ms, 1)); + let mut task = pq.pop().unwrap().task_cell; + assert_eq!(task.mut_extras().current_level(), level); + runner.handle(&mut local, task); + }; + + run_task(5, 0); + // after 5ms, the task should be put to level1 + run_task(95, 1); + // after 100ms, the task should be put to level2 + run_task(1, 2); + } +}