Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

queue: support priority queue #72

Merged
merged 10 commits into from
Dec 29, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ repository = "https://github.com/tikv/yatp/"

[dependencies]
crossbeam-deque = "0.8"
crossbeam-skiplist = "0.1"
crossbeam-utils = "0.8"
dashmap = "5.1"
fail = "0.5"
lazy_static = "1"
Expand Down
2 changes: 1 addition & 1 deletion src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

mod builder;
mod runner;
mod spawn;
pub(crate) mod spawn;
mod worker;

pub use self::builder::{Builder, SchedConfig};
Expand Down
13 changes: 12 additions & 1 deletion src/pool/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use crate::pool::spawn::QueueCore;
use crate::pool::worker::WorkerThread;
use crate::pool::{CloneRunnerBuilder, Local, Remote, Runner, RunnerBuilder, ThreadPool};
use crate::queue::{self, multilevel, LocalQueue, QueueType, TaskCell};
use crate::queue::{self, multilevel, priority, LocalQueue, QueueType, TaskCell};
use crate::task::{callback, future};
use std::sync::{
atomic::{AtomicUsize, Ordering},
Expand Down Expand Up @@ -290,6 +290,17 @@ impl Builder {
self.build_with_queue_and_runner(QueueType::Multilevel(queue_builder), runner_builder)
}

///
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment

pub fn build_priority_future_pool(
&self,
priority_priovider: Arc<dyn priority::TaskPriorityProvider>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

priovider -> provider

) -> ThreadPool<future::TaskCell> {
let fb = CloneRunnerBuilder(future::Runner::default());
let queue_builder = priority::Builder::new(priority::Config::default(), priority_priovider);
let runner_builder = queue_builder.runner_builder(fb);
self.build_with_queue_and_runner(QueueType::Priority(queue_builder), runner_builder)
}

/// Spawns the thread pool immediately.
///
/// `queue_builder` is a closure that creates a task queue. It accepts the
Expand Down
20 changes: 19 additions & 1 deletion src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
//! data structs.

pub mod multilevel;
pub mod priority;

mod extras;
mod single_level;
Expand All @@ -18,7 +19,7 @@ pub use self::extras::Extras;
use std::time::Instant;

/// A cell containing a task and needed extra information.
pub trait TaskCell {
pub trait TaskCell: 'static {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The skiplist's value must be 'static. I think it's fair that a task spawned in the thread pool should live long enough.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see why skiplist's value should be 'static. Technically, as long as the value outlive skiplist, it should be OK. If that's an API shortcoming, then the constraint should be put into priority queue instead of here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like an API shortcoming. I tried making the constraint priority-queue-only, but because we use enum to dispatch instead of trait, the constraint propagates up.

/// Gets mutable extra information.
fn mut_extras(&mut self) -> &mut Extras;
}
Expand All @@ -42,6 +43,7 @@ pub(crate) struct TaskInjector<T>(InjectorInner<T>);
enum InjectorInner<T> {
SingleLevel(single_level::TaskInjector<T>),
Multilevel(multilevel::TaskInjector<T>),
Priority(priority::TaskInjector<T>),
}

impl<T: TaskCell + Send> TaskInjector<T> {
Expand All @@ -50,13 +52,15 @@ impl<T: TaskCell + Send> TaskInjector<T> {
match &self.0 {
InjectorInner::SingleLevel(q) => q.push(task_cell),
InjectorInner::Multilevel(q) => q.push(task_cell),
InjectorInner::Priority(q) => q.push(task_cell),
}
}

pub fn default_extras(&self) -> Extras {
match self.0 {
InjectorInner::SingleLevel(_) => Extras::single_level(),
InjectorInner::Multilevel(_) => Extras::multilevel_default(),
InjectorInner::Priority(_) => Extras::single_level(),
}
}
}
Expand All @@ -80,6 +84,7 @@ pub(crate) struct LocalQueue<T>(LocalQueueInner<T>);
enum LocalQueueInner<T> {
SingleLevel(single_level::LocalQueue<T>),
Multilevel(multilevel::LocalQueue<T>),
Priority(priority::LocalQueue<T>),
}

impl<T: TaskCell + Send> LocalQueue<T> {
Expand All @@ -88,6 +93,7 @@ impl<T: TaskCell + Send> LocalQueue<T> {
match &mut self.0 {
LocalQueueInner::SingleLevel(q) => q.push(task_cell),
LocalQueueInner::Multilevel(q) => q.push(task_cell),
LocalQueueInner::Priority(q) => q.push(task_cell),
}
}

Expand All @@ -97,13 +103,15 @@ impl<T: TaskCell + Send> LocalQueue<T> {
match &mut self.0 {
LocalQueueInner::SingleLevel(q) => q.pop(),
LocalQueueInner::Multilevel(q) => q.pop(),
LocalQueueInner::Priority(q) => q.pop(),
}
}

pub fn default_extras(&self) -> Extras {
match self.0 {
LocalQueueInner::SingleLevel(_) => Extras::single_level(),
LocalQueueInner::Multilevel(_) => Extras::multilevel_default(),
LocalQueueInner::Priority(_) => Extras::single_level(),
}
}

Expand All @@ -113,6 +121,7 @@ impl<T: TaskCell + Send> LocalQueue<T> {
match &mut self.0 {
LocalQueueInner::SingleLevel(q) => q.has_tasks_or_pull(),
LocalQueueInner::Multilevel(q) => q.has_tasks_or_pull(),
LocalQueueInner::Priority(q) => q.has_tasks_or_pull(),
}
}
}
Expand All @@ -125,6 +134,8 @@ pub enum QueueType {
///
/// More to see: https://en.wikipedia.org/wiki/Multilevel_feedback_queue.
Multilevel(multilevel::Builder),
/// A concurrent prioirty queue.
Priority(priority::Builder),
}

impl Default for QueueType {
Expand All @@ -139,10 +150,17 @@ impl From<multilevel::Builder> for QueueType {
}
}

impl From<priority::Builder> for QueueType {
fn from(b: priority::Builder) -> QueueType {
QueueType::Priority(b)
}
}

pub(crate) fn build<T>(ty: QueueType, local_num: usize) -> (TaskInjector<T>, Vec<LocalQueue<T>>) {
match ty {
QueueType::SingleLevel => single_level(local_num),
QueueType::Multilevel(b) => b.build(local_num),
QueueType::Priority(b) => b.build(local_num),
}
}

Expand Down
27 changes: 26 additions & 1 deletion src/queue/extras.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ pub struct Extras {
pub(crate) fixed_level: Option<u8>,
/// Number of execute times
pub(crate) exec_times: u32,
/// The task group id. Used in priority queue.
pub(crate) group_id: u64,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not group name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. But I think it a bit weird here because the group_id/name is only fits in some specified cases(e.g. in tikv), but may not fit for other cases. I'm not sure if it is better to be defined as extra_data: Vec<u8>, so the user can put arbitary data in this field as they need.

}

impl Extras {
Expand All @@ -43,6 +45,7 @@ impl Extras {
current_level: 0,
fixed_level: None,
exec_times: 0,
group_id: 0,
}
}

Expand All @@ -62,9 +65,26 @@ impl Extras {
task_id,
running_time: Some(Arc::new(ElapsedTime::default())),
total_running_time: None,
current_level: 0,
current_level: fixed_level.unwrap_or(0),
fixed_level,
exec_times: 0,
group_id: 0,
}
}

/// Creates an `Extra` for task cells pushed into a priority task queue
/// with custom settings.
pub fn new_priority(group_id: u64, task_id: u64, fixed_level: Option<u8>) -> Extras {
Extras {
start_time: Instant::now(),
schedule_time: None,
task_id,
running_time: None,
total_running_time: None,
current_level: fixed_level.unwrap_or(0),
fixed_level,
exec_times: 0,
group_id,
}
}

Expand All @@ -89,4 +109,9 @@ impl Extras {
pub fn current_level(&self) -> u8 {
self.current_level
}

/// Gets the group id of this task
pub fn group_id(&self) -> u64 {
self.group_id
}
}
83 changes: 54 additions & 29 deletions src/queue/multilevel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use std::{f64, fmt, iter};

const LEVEL_NUM: usize = 3;
/// Number of levels
pub const LEVEL_NUM: usize = 3;

/// The chance ratio of level 1 and level 2 tasks.
const CHANCE_RATIO: u32 = 4;
Expand All @@ -33,7 +34,7 @@ const DEFAULT_CLEANUP_OLD_MAP_INTERVAL: Duration = Duration::from_secs(10);
/// When local total elapsed time exceeds this value in microseconds, the local
/// metrics is flushed to the global atomic metrics and try to trigger chance
/// adjustment.
const FLUSH_LOCAL_THRESHOLD_US: u64 = 100_000;
pub(super) const FLUSH_LOCAL_THRESHOLD_US: u64 = 100_000;

/// When the incremental total elapsed time exceeds this value, it will try to
/// adjust level chances and reset the total elapsed time.
Expand Down Expand Up @@ -322,8 +323,7 @@ where
struct LevelManager {
level0_elapsed_us: IntCounter,
total_elapsed_us: IntCounter,
task_elapsed_map: TaskElapsedMap,
level_time_threshold: [Duration; LEVEL_NUM - 1],
task_level_mgr: TaskLevelManager,
level0_chance: Gauge,
level0_proportion_target: f64,
adjusting: AtomicBool,
Expand All @@ -347,25 +347,8 @@ impl LevelManager {
where
T: TaskCell,
{
let extras = task_cell.mut_extras();
let task_id = extras.task_id;
let current_level = match extras.fixed_level {
Some(level) => level,
None => {
let running_time = extras
.total_running_time
.get_or_insert_with(|| self.task_elapsed_map.get_elapsed(task_id));
let running_time = running_time.as_duration();
self.level_time_threshold
.iter()
.enumerate()
.find(|(_, &threshold)| running_time < threshold)
.map(|(level, _)| level)
.unwrap_or(LEVEL_NUM - 1) as u8
}
};
extras.current_level = current_level;
extras.schedule_time = Some(now());
self.task_level_mgr.adjust_task_level(task_cell);
task_cell.mut_extras().schedule_time = Some(now());
}

fn maybe_adjust_chance(&self) {
Expand Down Expand Up @@ -422,7 +405,7 @@ impl LevelManager {
std::cmp::min(total_tasks / level_0_tasks, LEVEL_MAX_QUEUE_MAX_STEAL_SIZE)
};
self.max_level_queue_steal_size
.store(new_steal_count as usize, SeqCst);
.store(new_steal_count, SeqCst);
for (i, c) in self.last_exec_tasks_per_level.iter().enumerate() {
c.set(cur_total_tasks_per_level[i]);
}
Expand All @@ -432,14 +415,52 @@ impl LevelManager {
}
}

pub(super) struct TaskLevelManager {
task_elapsed_map: TaskElapsedMap,
level_time_threshold: [Duration; LEVEL_NUM - 1],
}

impl TaskLevelManager {
pub fn new(level_time_threshold: [Duration; LEVEL_NUM - 1]) -> Self {
Self {
task_elapsed_map: TaskElapsedMap::default(),
level_time_threshold,
}
}

pub fn adjust_task_level<T>(&self, task_cell: &mut T)
where
T: TaskCell,
{
let extras = task_cell.mut_extras();
let task_id = extras.task_id;
let current_level = match extras.fixed_level {
Some(level) => level,
None => {
let running_time = extras
.total_running_time
.get_or_insert_with(|| self.task_elapsed_map.get_elapsed(task_id));
let running_time = running_time.as_duration();
self.level_time_threshold
.iter()
.enumerate()
.find(|(_, &threshold)| running_time < threshold)
.map(|(level, _)| level)
.unwrap_or(LEVEL_NUM - 1) as u8
}
};
extras.current_level = current_level;
}
}

pub(crate) struct ElapsedTime(AtomicU64);

impl ElapsedTime {
pub(crate) fn as_duration(&self) -> Duration {
Duration::from_micros(self.0.load(Relaxed) as u64)
Duration::from_micros(self.0.load(Relaxed))
}

fn inc_by(&self, t: Duration) {
pub(super) fn inc_by(&self, t: Duration) {
self.0.fetch_add(t.as_micros() as u64, Relaxed);
}

Expand Down Expand Up @@ -650,8 +671,7 @@ impl Builder {
let manager = Arc::new(LevelManager {
level0_elapsed_us,
total_elapsed_us,
task_elapsed_map: Default::default(),
level_time_threshold: config.level_time_threshold,
task_level_mgr: TaskLevelManager::new(config.level_time_threshold),
level0_chance,
level0_proportion_target: config.level0_proportion_target,
adjusting: AtomicBool::new(false),
Expand Down Expand Up @@ -998,7 +1018,12 @@ mod tests {
assert!(runner.handle(&mut locals[0], task_cell));
}
assert!(
manager.task_elapsed_map.get_elapsed(1).as_duration() >= Duration::from_millis(100)
manager
.task_level_mgr
.task_elapsed_map
.get_elapsed(1)
.as_duration()
>= Duration::from_millis(100)
);
}

Expand Down
Loading