diff --git a/Cargo.lock b/Cargo.lock index 57430859bfd..1d2280474da 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -259,7 +259,6 @@ dependencies = [ "libc 0.2.45 (registry+https://github.com/rust-lang/crates.io-index)", "panic_hook 0.0.1", "protobuf 2.0.4 (registry+https://github.com/rust-lang/crates.io-index)", - "quick-error 1.2.2 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", "static_assertions 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "tikv_alloc 0.1.0", diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 1f7dfe2aefa..6d8d62fc508 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -13,7 +13,7 @@ use std::cmp; use std::error; use std::fmt::{self, Debug, Display, Formatter}; use std::io::Error as IoError; -use std::sync::{atomic, Arc, Mutex}; +use std::sync::{atomic, Arc}; use std::u64; use engine::rocks::DB; @@ -25,12 +25,10 @@ use kvproto::kvrpcpb::{CommandPri, Context, KeyRange, LockInfo}; use crate::server::readpool::{self, Builder as ReadPoolBuilder, ReadPool}; use crate::server::ServerRaftStoreRouter; use tikv_util::collections::HashMap; -use tikv_util::worker::{self, Builder, ScheduleError, Worker}; use self::gc_worker::GCWorker; use self::metrics::*; use self::mvcc::Lock; -use self::txn::CMD_BATCH_SIZE; pub use self::config::{BlockCacheConfig, Config, DEFAULT_DATA_DIR, DEFAULT_ROCKSDB_SUB_DIR}; pub use self::gc_worker::{AutoGCConfig, GCSafePointProvider}; @@ -42,6 +40,7 @@ pub use self::kv::{ }; pub use self::mvcc::Scanner as StoreScanner; pub use self::readpool_impl::*; +use self::txn::scheduler::Scheduler as TxnScheduler; pub use self::txn::{FixtureStore, FixtureStoreScanner}; pub use self::txn::{Msg, Scanner, Scheduler, SnapshotStore, Store}; pub use self::types::{Key, KvPair, MvccInfo, Value}; @@ -497,10 +496,7 @@ pub struct Storage { // TODO: Too many Arcs, would be slow when clone. engine: E, - /// The worker to execute storage commands. - worker: Arc>>, - /// `worker_scheduler` is used to schedule tasks to run in `worker`. - worker_scheduler: worker::Scheduler, + sched: TxnScheduler, /// The thread pool used to run most read operations. read_pool: ReadPool, @@ -527,8 +523,7 @@ impl Clone for Storage { Self { engine: self.engine.clone(), - worker: self.worker.clone(), - worker_scheduler: self.worker_scheduler.clone(), + sched: self.sched.clone(), read_pool: self.read_pool.clone(), gc_worker: self.gc_worker.clone(), refs: self.refs.clone(), @@ -550,15 +545,6 @@ impl Drop for Storage { return; } - // This is the last reference of the storage. Now all its references are dropped. Stop and - // destroy the storage now. - let mut worker = self.worker.lock().unwrap(); - - let h = worker.stop().unwrap(); - if let Err(e) = h.join() { - error!("Failed to join sched_handle"; "err" => ?e); - } - let r = self.gc_worker.stop(); if let Err(e) = r { error!("Failed to stop gc_worker:"; "err" => ?e); @@ -577,16 +563,8 @@ impl Storage { local_storage: Option>, raft_store_router: Option, ) -> Result { - let worker = Arc::new(Mutex::new( - Builder::new("storage-scheduler") - .batch_size(CMD_BATCH_SIZE) - .pending_capacity(config.scheduler_notify_capacity) - .create(), - )); - let worker_scheduler = worker.lock().unwrap().scheduler(); - let runner = Scheduler::new( + let sched = TxnScheduler::new( engine.clone(), - worker_scheduler.clone(), config.scheduler_concurrency, config.scheduler_worker_pool_size, config.scheduler_pending_write_threshold.0 as usize, @@ -598,15 +576,13 @@ impl Storage { config.gc_ratio_threshold, ); - worker.lock().unwrap().start(runner)?; gc_worker.start()?; info!("Storage started."); Ok(Storage { engine, - worker, - worker_scheduler, + sched, read_pool, gc_worker, refs: Arc::new(atomic::AtomicUsize::new(1)), @@ -632,11 +608,8 @@ impl Storage { #[inline] fn schedule(&self, cmd: Command, cb: StorageCb) -> Result<()> { fail_point!("storage_drop_message", |_| Ok(())); - match self.worker_scheduler.schedule(Msg::RawCmd { cmd, cb }) { - Ok(()) => Ok(()), - Err(ScheduleError::Full(_)) => Err(Error::SchedTooBusy), - Err(ScheduleError::Stopped(_)) => Err(Error::Closed), - } + self.sched.run_cmd(cmd, cb); + Ok(()) } /// Get a snapshot of `engine`. diff --git a/src/storage/txn/latch.rs b/src/storage/txn/latch.rs index c2d48adced4..b7ccf2e2370 100644 --- a/src/storage/txn/latch.rs +++ b/src/storage/txn/latch.rs @@ -1,10 +1,9 @@ // Copyright 2016 TiKV Project Authors. Licensed under Apache-2.0. - -#![allow(deprecated)] - +use std::collections::hash_map::DefaultHasher; use std::collections::VecDeque; -use std::hash::{Hash, Hasher, SipHasher as DefaultHasher}; +use std::hash::{Hash, Hasher}; +use std::sync::Mutex; use std::usize; /// Latch which is used to serialize accesses to resources hashed to the same slot. @@ -63,7 +62,7 @@ impl Lock { /// Each latch is indexed by a slot ID, hence the term latch and slot are used interchangeably, but /// conceptually a latch is a queue, and a slot is an index to the queue. pub struct Latches { - slots: Vec, + slots: Vec>, size: usize, } @@ -72,11 +71,10 @@ impl Latches { /// /// The size will be rounded up to the power of 2. pub fn new(size: usize) -> Latches { - let power_of_two_size = usize::next_power_of_two(size); - Latches { - slots: vec![Latch::new(); power_of_two_size], - size: power_of_two_size, - } + let size = usize::next_power_of_two(size); + let mut slots = Vec::with_capacity(size); + (0..size).for_each(|_| slots.push(Mutex::new(Latch::new()))); + Latches { slots, size } } /// Creates a lock which specifies all the required latches for a command. @@ -96,11 +94,10 @@ impl Latches { /// This method will enqueue the command ID into the waiting queues of the latches. A latch is /// considered acquired if the command ID is at the front of the queue. Returns true if all the /// Latches are acquired, false otherwise. - pub fn acquire(&mut self, lock: &mut Lock, who: u64) -> bool { + pub fn acquire(&self, lock: &mut Lock, who: u64) -> bool { let mut acquired_count: usize = 0; for i in &lock.required_slots[lock.owned_count..] { - let latch = &mut self.slots[*i]; - + let mut latch = self.slots[*i].lock().unwrap(); let front = latch.waiting.front().cloned(); match front { Some(cid) => { @@ -117,7 +114,6 @@ impl Latches { } } } - lock.owned_count += acquired_count; lock.acquired() } @@ -125,13 +121,12 @@ impl Latches { /// Releases all latches owned by the `lock` of command with ID `who`, returns the wakeup list. /// /// Preconditions: the caller must ensure the command is at the front of the latches. - pub fn release(&mut self, lock: &Lock, who: u64) -> Vec { + pub fn release(&self, lock: &Lock, who: u64) -> Vec { let mut wakeup_list: Vec = vec![]; for i in &lock.required_slots[..lock.owned_count] { - let latch = &mut self.slots[*i]; + let mut latch = self.slots[*i].lock().unwrap(); let front = latch.waiting.pop_front().unwrap(); assert_eq!(front, who); - if let Some(wakeup) = latch.waiting.front() { wakeup_list.push(*wakeup); } @@ -156,7 +151,7 @@ mod tests { #[test] fn test_wakeup() { - let mut latches = Latches::new(256); + let latches = Latches::new(256); let slots_a: Vec = vec![1, 3, 5]; let mut lock_a = Lock::new(slots_a); @@ -184,7 +179,7 @@ mod tests { #[test] fn test_wakeup_by_multi_cmds() { - let mut latches = Latches::new(256); + let latches = Latches::new(256); let slots_a: Vec = vec![1, 2, 3]; let slots_b: Vec = vec![4, 5, 6]; diff --git a/src/storage/txn/mod.rs b/src/storage/txn/mod.rs index 5aaf197ce40..e7254014e6a 100644 --- a/src/storage/txn/mod.rs +++ b/src/storage/txn/mod.rs @@ -3,14 +3,14 @@ mod latch; mod process; pub mod sched_pool; -mod scheduler; +pub mod scheduler; mod store; use std::error; use std::io::Error as IoError; pub use self::process::RESOLVE_LOCK_BATCH_SIZE; -pub use self::scheduler::{Msg, Scheduler, CMD_BATCH_SIZE}; +pub use self::scheduler::{Msg, Scheduler}; pub use self::store::{FixtureStore, FixtureStoreScanner}; pub use self::store::{Scanner, SnapshotStore, Store}; use tikv_util::escape; diff --git a/src/storage/txn/process.rs b/src/storage/txn/process.rs index 84024f18bc9..ed9696627af 100644 --- a/src/storage/txn/process.rs +++ b/src/storage/txn/process.rs @@ -1,9 +1,7 @@ // Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0. -use std::mem; -use std::thread; use std::time::Duration; -use std::u64; +use std::{mem, thread, u64}; use futures::future; use kvproto::kvrpcpb::{CommandPri, Context, LockInfo}; @@ -12,19 +10,12 @@ use crate::storage::kv::{CbContext, Modify, Result as EngineResult}; use crate::storage::mvcc::{ Error as MvccError, Lock as MvccLock, MvccReader, MvccTxn, Write, MAX_TXN_WRITE_SIZE, }; +use crate::storage::txn::{sched_pool::*, scheduler::Msg, Error, Result}; use crate::storage::{ - Command, Engine, Error as StorageError, Result as StorageResult, ScanMode, Snapshot, - Statistics, StorageCb, + metrics::*, Command, Engine, Error as StorageError, Key, MvccInfo, Result as StorageResult, + ScanMode, Snapshot, Statistics, StorageCb, Value, }; -use crate::storage::{Key, MvccInfo, Value}; -use tikv_util::time::Instant; -use tikv_util::time::SlowTimer; -use tikv_util::worker::{self, ScheduleError}; - -use super::super::metrics::*; -use super::sched_pool::*; -use super::scheduler::Msg; -use super::{Error, Result}; +use tikv_util::time::{Instant, SlowTimer}; // To resolve a key, the write size is about 100~150 bytes, depending on key and value length. // The write batch will be around 32KB if we scan 256 keys each time. @@ -107,22 +98,26 @@ impl Task { } } -pub struct Executor { +pub trait MsgScheduler: Clone + Send + 'static { + fn on_msg(&self, task: Msg); +} + +pub struct Executor { // We put time consuming tasks to the thread pool. sched_pool: Option>, // And the tasks completes we post a completion to the `Scheduler`. - scheduler: Option>, + scheduler: Option, } -impl Executor { - pub fn new(scheduler: worker::Scheduler, pool: SchedPool) -> Self { +impl Executor { + pub fn new(scheduler: S, pool: SchedPool) -> Self { Executor { scheduler: Some(scheduler), sched_pool: Some(pool), } } - fn take_scheduler(&mut self) -> worker::Scheduler { + fn take_scheduler(&mut self) -> S { self.scheduler.take().unwrap() } @@ -130,6 +125,10 @@ impl Executor { self.sched_pool.take().unwrap() } + pub fn clone_pool(&self) -> SchedPool { + self.sched_pool.clone().unwrap() + } + /// Start the execution of the task. pub fn execute(mut self, cb_ctx: CbContext, snapshot: EngineResult, task: Task) { debug!( @@ -249,7 +248,7 @@ impl Executor { let sched = scheduler.clone(); // The callback to receive async results of write prepare from the storage engine. let engine_cb = Box::new(move |(_, result)| { - if notify_scheduler( + notify_scheduler( sched, Msg::WriteFinished { cid, @@ -257,11 +256,10 @@ impl Executor { result, tag, }, - ) { - KV_COMMAND_KEYWRITE_HISTOGRAM_VEC - .with_label_values(&[tag]) - .observe(rows as f64); - } + ); + KV_COMMAND_KEYWRITE_HISTOGRAM_VEC + .with_label_values(&[tag]) + .observe(rows as f64); }); if let Err(e) = engine.async_write(&ctx, to_be_write, engine_cb) { @@ -574,17 +572,8 @@ fn process_write_impl( Ok((ctx, pr, modifies, rows)) } -fn notify_scheduler(scheduler: worker::Scheduler, msg: Msg) -> bool { - match scheduler.schedule(msg) { - Ok(_) => true, - e @ Err(ScheduleError::Stopped(_)) => { - info!("scheduler stopped"; "err" => ?e); - false - } - Err(e) => { - panic!("schedule msg failed, err:{:?}", e); - } - } +pub fn notify_scheduler(scheduler: S, msg: Msg) { + scheduler.on_msg(msg); } // Make clippy happy. diff --git a/src/storage/txn/scheduler.rs b/src/storage/txn/scheduler.rs index 7a486254f90..39a3a96a0da 100644 --- a/src/storage/txn/scheduler.rs +++ b/src/storage/txn/scheduler.rs @@ -21,24 +21,26 @@ //! to the scheduler. use std::fmt::{self, Debug, Display, Formatter}; +use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; use std::u64; +use futures::future; use kvproto::kvrpcpb::CommandPri; use prometheus::HistogramTimer; +use tikv_util::collections::HashMap; use crate::storage::kv::Result as EngineResult; -use crate::storage::Key; +use crate::storage::txn::latch::{Latches, Lock}; +use crate::storage::txn::process::{ + execute_callback, notify_scheduler, Executor, MsgScheduler, ProcessResult, Task, +}; +use crate::storage::txn::sched_pool::SchedPool; +use crate::storage::txn::Error; +use crate::storage::{metrics::*, Key}; use crate::storage::{Command, Engine, Error as StorageError, StorageCb}; -use tikv_util::collections::HashMap; -use tikv_util::worker::{self, Runnable}; -use super::super::metrics::*; -use super::latch::{Latches, Lock}; -use super::process::{execute_callback, Executor, ProcessResult, Task}; -use super::sched_pool::*; -use super::Error; - -pub const CMD_BATCH_SIZE: usize = 256; +const TASKS_SLOTS_NUM: usize = 1 << 10; // 1024 slots. /// Message types for the scheduler event loop. pub enum Msg { @@ -85,6 +87,8 @@ impl Display for Msg { // It stores context of a task. struct TaskContext { + task: Option, + lock: Lock, cb: StorageCb, write_bytes: usize, @@ -96,25 +100,28 @@ struct TaskContext { } impl TaskContext { - fn new(lock: Lock, cb: StorageCb, cmd: &Command) -> TaskContext { + fn new(task: Task, latches: &Latches, cb: StorageCb) -> TaskContext { + let tag = task.cmd().tag(); + let lock = gen_command_lock(latches, task.cmd()); let write_bytes = if lock.is_write_lock() { - cmd.write_bytes() + task.cmd().write_bytes() } else { 0 }; TaskContext { + task: Some(task), lock, cb, write_bytes, - tag: cmd.tag(), + tag, latch_timer: Some( SCHED_LATCH_HISTOGRAM_VEC - .with_label_values(&[cmd.tag()]) + .with_label_values(&[tag]) .start_coarse_timer(), ), _cmd_timer: SCHED_HISTOGRAM_VEC - .with_label_values(&[cmd.tag()]) + .with_label_values(&[tag]) .start_coarse_timer(), } } @@ -124,27 +131,16 @@ impl TaskContext { } } -/// Scheduler which schedules the execution of `storage::Command`s. -pub struct Scheduler { - engine: E, - - /// cid -> Task - pending_tasks: HashMap, - - // cid -> TaskContext - task_contexts: HashMap, - - // actual scheduler to schedule the execution of commands - scheduler: worker::Scheduler, +struct SchedulerInner { + // slot_id -> { cid -> `TaskContext` } in the slot. + task_contexts: Vec>>, // cmd id generator - id_alloc: u64, + id_alloc: AtomicU64, // write concurrency control latches: Latches, - // TODO: Dynamically calculate this value according to processing - /// speed of recent write requests. sched_pending_write_threshold: usize, // worker pool @@ -154,26 +150,110 @@ pub struct Scheduler { high_priority_pool: SchedPool, // used to control write flow - running_write_bytes: usize, + running_write_bytes: AtomicUsize, +} + +#[inline] +fn id_index(cid: u64) -> usize { + cid as usize % TASKS_SLOTS_NUM } +impl SchedulerInner { + /// Generates the next command ID. + #[inline] + fn gen_id(&self) -> u64 { + let id = self.id_alloc.fetch_add(1, Ordering::Relaxed); + id + 1 + } + + fn dequeue_task(&self, cid: u64) -> Task { + let mut tasks = self.task_contexts[id_index(cid)].lock().unwrap(); + let task = tasks.get_mut(&cid).unwrap().task.take().unwrap(); + assert_eq!(task.cid, cid); + task + } + + fn enqueue_task(&self, task: Task, callback: StorageCb) { + let cid = task.cid; + let tctx = TaskContext::new(task, &self.latches, callback); + + let running_write_bytes = self + .running_write_bytes + .fetch_add(tctx.write_bytes, Ordering::AcqRel) as i64; + SCHED_WRITING_BYTES_GAUGE.set(running_write_bytes + tctx.write_bytes as i64); + SCHED_CONTEX_GAUGE.inc(); + + let mut tasks = self.task_contexts[id_index(cid)].lock().unwrap(); + if tasks.insert(cid, tctx).is_some() { + panic!("TaskContext cid={} shouldn't exist", cid); + } + } + + fn dequeue_task_context(&self, cid: u64) -> TaskContext { + let tctx = self.task_contexts[id_index(cid)] + .lock() + .unwrap() + .remove(&cid) + .unwrap(); + + let running_write_bytes = self + .running_write_bytes + .fetch_sub(tctx.write_bytes, Ordering::AcqRel) as i64; + SCHED_WRITING_BYTES_GAUGE.set(running_write_bytes - tctx.write_bytes as i64); + SCHED_CONTEX_GAUGE.dec(); + + tctx + } + + fn too_busy(&self) -> bool { + fail_point!("txn_scheduler_busy", |_| true); + self.running_write_bytes.load(Ordering::Acquire) >= self.sched_pending_write_threshold + } + + /// Tries to acquire all the required latches for a command. + /// + /// Returns `true` if successful; returns `false` otherwise. + fn acquire_lock(&self, cid: u64) -> bool { + let mut task_contexts = self.task_contexts[id_index(cid)].lock().unwrap(); + let tctx = task_contexts.get_mut(&cid).unwrap(); + if self.latches.acquire(&mut tctx.lock, cid) { + tctx.on_schedule(); + return true; + } + false + } +} + +/// Scheduler which schedules the execution of `storage::Command`s. +#[derive(Clone)] +pub struct Scheduler { + engine: E, + inner: Arc>, +} + +unsafe impl Send for Scheduler {} + impl Scheduler { /// Creates a scheduler. pub fn new( engine: E, - scheduler: worker::Scheduler, concurrency: usize, worker_pool_size: usize, sched_pending_write_threshold: usize, ) -> Self { - Scheduler { - engine: engine.clone(), - // TODO: GC these two maps. - pending_tasks: Default::default(), - task_contexts: Default::default(), - scheduler, - id_alloc: 0, + // Add 2 logs records how long is need to initialize TASKS_SLOTS_NUM * 2048000 `Mutex`es. + // In a 3.5G Hz machine it needs 1.3s, which is a notable duration during start-up. + info!("Scheduler::new is called to initialize the transaction scheduler"); + let mut task_contexts = Vec::with_capacity(TASKS_SLOTS_NUM); + for _ in 0..TASKS_SLOTS_NUM { + task_contexts.push(Mutex::new(Default::default())); + } + + let inner = Arc::new(SchedulerInner { + task_contexts, + id_alloc: AtomicU64::new(0), latches: Latches::new(concurrency), + running_write_bytes: AtomicUsize::new(0), sched_pending_write_threshold, worker_pool: SchedPool::new(engine.clone(), worker_pool_size, "sched-worker-pool"), high_priority_pool: SchedPool::new( @@ -181,82 +261,44 @@ impl Scheduler { std::cmp::max(1, worker_pool_size / 2), "sched-high-pri-pool", ), - running_write_bytes: 0, - } - } - - /// Generates the next command ID. - fn gen_id(&mut self) -> u64 { - self.id_alloc += 1; - self.id_alloc - } - - fn dequeue_task(&mut self, cid: u64) -> Task { - let task = self.pending_tasks.remove(&cid).unwrap(); - assert_eq!(task.cid, cid); - task - } - - fn enqueue_task(&mut self, task: Task, callback: StorageCb) { - let cid = task.cid; - - let tctx = { - let cmd = task.cmd(); - let lock = self.gen_lock(cmd); - TaskContext::new(lock, callback, cmd) - }; - - self.running_write_bytes += tctx.write_bytes; - SCHED_WRITING_BYTES_GAUGE.set(self.running_write_bytes as i64); + }); - if self.pending_tasks.insert(cid, task).is_some() { - panic!("command cid={} shouldn't exist", cid); - } - SCHED_CONTEX_GAUGE.set(self.pending_tasks.len() as i64); - if self.task_contexts.insert(cid, tctx).is_some() { - panic!("TaskContext cid={} shouldn't exist", cid); - } + info!("Scheduler::new is finished, the transaction scheduler is initialized"); + Scheduler { engine, inner } } - fn dequeue_task_context(&mut self, cid: u64) -> TaskContext { - let tctx = self.task_contexts.remove(&cid).unwrap(); - - self.running_write_bytes -= tctx.write_bytes; - SCHED_WRITING_BYTES_GAUGE.set(self.running_write_bytes as i64); - SCHED_CONTEX_GAUGE.set(self.pending_tasks.len() as i64); - - tctx + pub fn run_cmd(&self, cmd: Command, callback: StorageCb) { + self.on_receive_new_cmd(cmd, callback); } +} - fn fetch_executor(&self, priority: CommandPri, is_sys_cmd: bool) -> Executor { +impl Scheduler { + fn fetch_executor(&self, priority: CommandPri, is_sys_cmd: bool) -> Executor { let pool = if priority == CommandPri::High || is_sys_cmd { - self.high_priority_pool.clone() + self.inner.high_priority_pool.clone() } else { - self.worker_pool.clone() + self.inner.worker_pool.clone() }; - let scheduler = self.scheduler.clone(); - Executor::new(scheduler, pool) + Executor::new(self.clone(), pool) } - /// Event handler for new command. - /// - /// This method will try to acquire all the necessary latches. If all the necessary latches are - /// acquired, the method initiates a get snapshot operation for furthur processing; otherwise, - /// the method adds the command to the waiting queue(s). The command will be handled later in - /// `try_to_wake_up` when its turn comes. - /// - /// Note that once a command is ready to execute, the snapshot is always up-to-date during the - /// execution because 1) all the conflicting commands (if any) must be in the waiting queues; - /// 2) there may be non-conflicitng commands running concurrently, but it doesn't matter. - fn schedule_command(&mut self, cmd: Command, callback: StorageCb) { - let cid = self.gen_id(); - debug!("received new command"; "cid" => cid, "cmd" => %cmd); + /// Releases all the latches held by a command. + fn release_lock(&self, lock: &Lock, cid: u64) { + let wakeup_list = self.inner.latches.release(lock, cid); + for wcid in wakeup_list { + self.try_to_wake_up(wcid); + } + } + + fn schedule_command(&self, cmd: Command, callback: StorageCb) { + let cid = self.inner.gen_id(); + debug!("received new command"; "cid" => cid, "cmd" => ?cmd); let tag = cmd.tag(); let priority_tag = cmd.priority_tag(); let task = Task::new(cid, cmd); // TODO: enqueue_task should return an reference of the tctx. - self.enqueue_task(task, callback); + self.inner.enqueue_task(task, callback); self.try_to_wake_up(cid); SCHED_STAGE_COUNTER_VEC @@ -269,26 +311,15 @@ impl Scheduler { /// Tries to acquire all the necessary latches. If all the necessary latches are acquired, /// the method initiates a get snapshot operation for furthur processing. - fn try_to_wake_up(&mut self, cid: u64) { - let wake = if let Some(tctx) = self.acquire_lock(cid) { - tctx.on_schedule(); - true - } else { - false - }; - if wake { + fn try_to_wake_up(&self, cid: u64) { + if self.inner.acquire_lock(cid) { self.get_snapshot(cid); } } - fn too_busy(&self) -> bool { - fail_point!("txn_scheduler_busy", |_| true); - self.running_write_bytes >= self.sched_pending_write_threshold - } - - fn on_receive_new_cmd(&mut self, cmd: Command, callback: StorageCb) { + fn on_receive_new_cmd(&self, cmd: Command, callback: StorageCb) { // write flow control - if cmd.need_flow_control() && self.too_busy() { + if cmd.need_flow_control() && self.inner.too_busy() { SCHED_TOO_BUSY_COUNTER_VEC .with_label_values(&[cmd.tag()]) .inc(); @@ -305,33 +336,47 @@ impl Scheduler { /// Initiates an async operation to get a snapshot from the storage engine, then posts a /// `SnapshotFinished` message back to the event loop when it finishes. - fn get_snapshot(&mut self, cid: u64) { - let task = self.dequeue_task(cid); + fn get_snapshot(&self, cid: u64) { + let task = self.inner.dequeue_task(cid); let tag = task.tag; let ctx = task.context().clone(); - let executor = self.fetch_executor(task.priority(), task.cmd().is_sys_cmd()); + let engine = self.engine.clone(); + let executor = self.fetch_executor(task.priority(), task.cmd().is_sys_cmd()); + let sched_pool = executor.clone_pool(); + let inner = self.clone(); let cb = Box::new(move |(cb_ctx, snapshot)| { executor.execute(cb_ctx, snapshot, task); }); - if let Err(e) = self.engine.async_snapshot(&ctx, cb) { - SCHED_STAGE_COUNTER_VEC - .with_label_values(&[tag, "async_snapshot_err"]) - .inc(); - error!("engine async_snapshot failed"; "err" => ?e); - self.finish_with_err(cid, e.into()); - } else { - SCHED_STAGE_COUNTER_VEC - .with_label_values(&[tag, "snapshot"]) - .inc(); - } + sched_pool.pool.spawn(move || { + if let Err(e) = engine.async_snapshot(&ctx, cb) { + SCHED_STAGE_COUNTER_VEC + .with_label_values(&[tag, "async_snapshot_err"]) + .inc(); + + error!("engine async_snapshot failed, err: {:?}", e); + notify_scheduler( + inner, + Msg::FinishedWithErr { + cid, + err: e.into(), + tag, + }, + ); + } else { + SCHED_STAGE_COUNTER_VEC + .with_label_values(&[tag, "snapshot"]) + .inc(); + } + future::ok::<_, ()>(()) + }); } /// Calls the callback with an error. - fn finish_with_err(&mut self, cid: u64, err: Error) { - debug!("command finished with error"; "cid" => cid); - let tctx = self.dequeue_task_context(cid); + fn finish_with_err(&self, cid: u64, err: Error) { + debug!("write command finished with error"; "cid" => cid); + let tctx = self.inner.dequeue_task_context(cid); SCHED_STAGE_COUNTER_VEC .with_label_values(&[tctx.tag, "error"]) @@ -349,13 +394,13 @@ impl Scheduler { /// /// If a next command is present, continues to execute; otherwise, delivers the result to the /// callback. - fn on_read_finished(&mut self, cid: u64, pr: ProcessResult, tag: &str) { + fn on_read_finished(&self, cid: u64, pr: ProcessResult, tag: &str) { SCHED_STAGE_COUNTER_VEC .with_label_values(&[tag, "read_finish"]) .inc(); debug!("read command finished"; "cid" => cid); - let tctx = self.dequeue_task_context(cid); + let tctx = self.inner.dequeue_task_context(cid); if let ProcessResult::NextCommand { cmd } = pr { SCHED_STAGE_COUNTER_VEC .with_label_values(&[tag, "next_cmd"]) @@ -369,23 +414,17 @@ impl Scheduler { } /// Event handler for the success of write. - fn on_write_finished( - &mut self, - cid: u64, - pr: ProcessResult, - result: EngineResult<()>, - tag: &str, - ) { + fn on_write_finished(&self, cid: u64, pr: ProcessResult, result: EngineResult<()>, tag: &str) { SCHED_STAGE_COUNTER_VEC .with_label_values(&[tag, "write_finish"]) .inc(); - debug!("write finished for command"; "cid" => cid); - let tctx = self.dequeue_task_context(cid); + debug!("write command finished"; "cid" => cid); + let tctx = self.inner.dequeue_task_context(cid); let pr = match result { Ok(()) => pr, Err(e) => ProcessResult::Failed { - err: crate::storage::Error::from(e), + err: StorageError::from(e), }, }; if let ProcessResult::NextCommand { cmd } = pr { @@ -399,50 +438,20 @@ impl Scheduler { self.release_lock(&tctx.lock, cid); } - - /// Generates the lock for a command. - /// - /// Basically, read-only commands require no latches, write commands require latches hashed - /// by the referenced keys. - fn gen_lock(&self, cmd: &Command) -> Lock { - gen_command_lock(&self.latches, cmd) - } - - /// Tries to acquire all the required latches for a command. - /// - /// Returns `Some(TaskContext)` if successful; returns `None` otherwise. - fn acquire_lock(&mut self, cid: u64) -> Option<&mut TaskContext> { - let tctx = self.task_contexts.get_mut(&cid).unwrap(); - if self.latches.acquire(&mut tctx.lock, cid) { - Some(tctx) - } else { - None - } - } - - /// Releases all the latches held by a command. - fn release_lock(&mut self, lock: &Lock, cid: u64) { - let wakeup_list = self.latches.release(lock, cid); - for wcid in wakeup_list { - self.try_to_wake_up(wcid); - } - } } -impl Runnable for Scheduler { - fn run_batch(&mut self, msgs: &mut Vec) { - for msg in msgs.drain(..) { - match msg { - Msg::RawCmd { cmd, cb } => self.on_receive_new_cmd(cmd, cb), - Msg::ReadFinished { cid, tag, pr } => self.on_read_finished(cid, pr, tag), - Msg::WriteFinished { - cid, - tag, - pr, - result, - } => self.on_write_finished(cid, pr, result, tag), - Msg::FinishedWithErr { cid, err, .. } => self.finish_with_err(cid, err), - } +impl MsgScheduler for Scheduler { + fn on_msg(&self, task: Msg) { + match task { + Msg::ReadFinished { cid, tag, pr } => self.on_read_finished(cid, pr, tag), + Msg::WriteFinished { + cid, + tag, + pr, + result, + } => self.on_write_finished(cid, pr, result, tag), + Msg::FinishedWithErr { cid, err, .. } => self.finish_with_err(cid, err), + _ => unreachable!(), } } } @@ -536,8 +545,7 @@ mod tests { }, ]; - let mut latches = Latches::new(1024); - + let latches = Latches::new(1024); let write_locks: Vec = write_cmds .into_iter() .enumerate()