Skip to content

Commit

Permalink
txn: remove scheduler (tikv#4098)
Browse files Browse the repository at this point in the history
Signed-off-by: qupeng <qupeng@pingcap.com>
  • Loading branch information
hicqu authored and jswh committed May 27, 2019
1 parent 0dfefac commit d0298ca
Show file tree
Hide file tree
Showing 6 changed files with 245 additions and 281 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 8 additions & 35 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::cmp;
use std::error;
use std::fmt::{self, Debug, Display, Formatter};
use std::io::Error as IoError;
use std::sync::{atomic, Arc, Mutex};
use std::sync::{atomic, Arc};
use std::u64;

use engine::rocks::DB;
Expand All @@ -25,12 +25,10 @@ use kvproto::kvrpcpb::{CommandPri, Context, KeyRange, LockInfo};
use crate::server::readpool::{self, Builder as ReadPoolBuilder, ReadPool};
use crate::server::ServerRaftStoreRouter;
use tikv_util::collections::HashMap;
use tikv_util::worker::{self, Builder, ScheduleError, Worker};

use self::gc_worker::GCWorker;
use self::metrics::*;
use self::mvcc::Lock;
use self::txn::CMD_BATCH_SIZE;

pub use self::config::{BlockCacheConfig, Config, DEFAULT_DATA_DIR, DEFAULT_ROCKSDB_SUB_DIR};
pub use self::gc_worker::{AutoGCConfig, GCSafePointProvider};
Expand All @@ -42,6 +40,7 @@ pub use self::kv::{
};
pub use self::mvcc::Scanner as StoreScanner;
pub use self::readpool_impl::*;
use self::txn::scheduler::Scheduler as TxnScheduler;
pub use self::txn::{FixtureStore, FixtureStoreScanner};
pub use self::txn::{Msg, Scanner, Scheduler, SnapshotStore, Store};
pub use self::types::{Key, KvPair, MvccInfo, Value};
Expand Down Expand Up @@ -497,10 +496,7 @@ pub struct Storage<E: Engine> {
// TODO: Too many Arcs, would be slow when clone.
engine: E,

/// The worker to execute storage commands.
worker: Arc<Mutex<Worker<Msg>>>,
/// `worker_scheduler` is used to schedule tasks to run in `worker`.
worker_scheduler: worker::Scheduler<Msg>,
sched: TxnScheduler<E>,

/// The thread pool used to run most read operations.
read_pool: ReadPool,
Expand All @@ -527,8 +523,7 @@ impl<E: Engine> Clone for Storage<E> {

Self {
engine: self.engine.clone(),
worker: self.worker.clone(),
worker_scheduler: self.worker_scheduler.clone(),
sched: self.sched.clone(),
read_pool: self.read_pool.clone(),
gc_worker: self.gc_worker.clone(),
refs: self.refs.clone(),
Expand All @@ -550,15 +545,6 @@ impl<E: Engine> Drop for Storage<E> {
return;
}

// This is the last reference of the storage. Now all its references are dropped. Stop and
// destroy the storage now.
let mut worker = self.worker.lock().unwrap();

let h = worker.stop().unwrap();
if let Err(e) = h.join() {
error!("Failed to join sched_handle"; "err" => ?e);
}

let r = self.gc_worker.stop();
if let Err(e) = r {
error!("Failed to stop gc_worker:"; "err" => ?e);
Expand All @@ -577,16 +563,8 @@ impl<E: Engine> Storage<E> {
local_storage: Option<Arc<DB>>,
raft_store_router: Option<ServerRaftStoreRouter>,
) -> Result<Self> {
let worker = Arc::new(Mutex::new(
Builder::new("storage-scheduler")
.batch_size(CMD_BATCH_SIZE)
.pending_capacity(config.scheduler_notify_capacity)
.create(),
));
let worker_scheduler = worker.lock().unwrap().scheduler();
let runner = Scheduler::new(
let sched = TxnScheduler::new(
engine.clone(),
worker_scheduler.clone(),
config.scheduler_concurrency,
config.scheduler_worker_pool_size,
config.scheduler_pending_write_threshold.0 as usize,
Expand All @@ -598,15 +576,13 @@ impl<E: Engine> Storage<E> {
config.gc_ratio_threshold,
);

worker.lock().unwrap().start(runner)?;
gc_worker.start()?;

info!("Storage started.");

Ok(Storage {
engine,
worker,
worker_scheduler,
sched,
read_pool,
gc_worker,
refs: Arc::new(atomic::AtomicUsize::new(1)),
Expand All @@ -632,11 +608,8 @@ impl<E: Engine> Storage<E> {
#[inline]
fn schedule(&self, cmd: Command, cb: StorageCb) -> Result<()> {
fail_point!("storage_drop_message", |_| Ok(()));
match self.worker_scheduler.schedule(Msg::RawCmd { cmd, cb }) {
Ok(()) => Ok(()),
Err(ScheduleError::Full(_)) => Err(Error::SchedTooBusy),
Err(ScheduleError::Stopped(_)) => Err(Error::Closed),
}
self.sched.run_cmd(cmd, cb);
Ok(())
}

/// Get a snapshot of `engine`.
Expand Down
33 changes: 14 additions & 19 deletions src/storage/txn/latch.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
// Copyright 2016 TiKV Project Authors. Licensed under Apache-2.0.


#![allow(deprecated)]

use std::collections::hash_map::DefaultHasher;
use std::collections::VecDeque;
use std::hash::{Hash, Hasher, SipHasher as DefaultHasher};
use std::hash::{Hash, Hasher};
use std::sync::Mutex;
use std::usize;

/// Latch which is used to serialize accesses to resources hashed to the same slot.
Expand Down Expand Up @@ -63,7 +62,7 @@ impl Lock {
/// Each latch is indexed by a slot ID, hence the term latch and slot are used interchangeably, but
/// conceptually a latch is a queue, and a slot is an index to the queue.
pub struct Latches {
slots: Vec<Latch>,
slots: Vec<Mutex<Latch>>,
size: usize,
}

Expand All @@ -72,11 +71,10 @@ impl Latches {
///
/// The size will be rounded up to the power of 2.
pub fn new(size: usize) -> Latches {
let power_of_two_size = usize::next_power_of_two(size);
Latches {
slots: vec![Latch::new(); power_of_two_size],
size: power_of_two_size,
}
let size = usize::next_power_of_two(size);
let mut slots = Vec::with_capacity(size);
(0..size).for_each(|_| slots.push(Mutex::new(Latch::new())));
Latches { slots, size }
}

/// Creates a lock which specifies all the required latches for a command.
Expand All @@ -96,11 +94,10 @@ impl Latches {
/// This method will enqueue the command ID into the waiting queues of the latches. A latch is
/// considered acquired if the command ID is at the front of the queue. Returns true if all the
/// Latches are acquired, false otherwise.
pub fn acquire(&mut self, lock: &mut Lock, who: u64) -> bool {
pub fn acquire(&self, lock: &mut Lock, who: u64) -> bool {
let mut acquired_count: usize = 0;
for i in &lock.required_slots[lock.owned_count..] {
let latch = &mut self.slots[*i];

let mut latch = self.slots[*i].lock().unwrap();
let front = latch.waiting.front().cloned();
match front {
Some(cid) => {
Expand All @@ -117,21 +114,19 @@ impl Latches {
}
}
}

lock.owned_count += acquired_count;
lock.acquired()
}

/// Releases all latches owned by the `lock` of command with ID `who`, returns the wakeup list.
///
/// Preconditions: the caller must ensure the command is at the front of the latches.
pub fn release(&mut self, lock: &Lock, who: u64) -> Vec<u64> {
pub fn release(&self, lock: &Lock, who: u64) -> Vec<u64> {
let mut wakeup_list: Vec<u64> = vec![];
for i in &lock.required_slots[..lock.owned_count] {
let latch = &mut self.slots[*i];
let mut latch = self.slots[*i].lock().unwrap();
let front = latch.waiting.pop_front().unwrap();
assert_eq!(front, who);

if let Some(wakeup) = latch.waiting.front() {
wakeup_list.push(*wakeup);
}
Expand All @@ -156,7 +151,7 @@ mod tests {

#[test]
fn test_wakeup() {
let mut latches = Latches::new(256);
let latches = Latches::new(256);

let slots_a: Vec<usize> = vec![1, 3, 5];
let mut lock_a = Lock::new(slots_a);
Expand Down Expand Up @@ -184,7 +179,7 @@ mod tests {

#[test]
fn test_wakeup_by_multi_cmds() {
let mut latches = Latches::new(256);
let latches = Latches::new(256);

let slots_a: Vec<usize> = vec![1, 2, 3];
let slots_b: Vec<usize> = vec![4, 5, 6];
Expand Down
4 changes: 2 additions & 2 deletions src/storage/txn/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
mod latch;
mod process;
pub mod sched_pool;
mod scheduler;
pub mod scheduler;
mod store;

use std::error;
use std::io::Error as IoError;

pub use self::process::RESOLVE_LOCK_BATCH_SIZE;
pub use self::scheduler::{Msg, Scheduler, CMD_BATCH_SIZE};
pub use self::scheduler::{Msg, Scheduler};
pub use self::store::{FixtureStore, FixtureStoreScanner};
pub use self::store::{Scanner, SnapshotStore, Store};
use tikv_util::escape;
Expand Down
61 changes: 25 additions & 36 deletions src/storage/txn/process.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.

use std::mem;
use std::thread;
use std::time::Duration;
use std::u64;
use std::{mem, thread, u64};

use futures::future;
use kvproto::kvrpcpb::{CommandPri, Context, LockInfo};
Expand All @@ -12,19 +10,12 @@ use crate::storage::kv::{CbContext, Modify, Result as EngineResult};
use crate::storage::mvcc::{
Error as MvccError, Lock as MvccLock, MvccReader, MvccTxn, Write, MAX_TXN_WRITE_SIZE,
};
use crate::storage::txn::{sched_pool::*, scheduler::Msg, Error, Result};
use crate::storage::{
Command, Engine, Error as StorageError, Result as StorageResult, ScanMode, Snapshot,
Statistics, StorageCb,
metrics::*, Command, Engine, Error as StorageError, Key, MvccInfo, Result as StorageResult,
ScanMode, Snapshot, Statistics, StorageCb, Value,
};
use crate::storage::{Key, MvccInfo, Value};
use tikv_util::time::Instant;
use tikv_util::time::SlowTimer;
use tikv_util::worker::{self, ScheduleError};

use super::super::metrics::*;
use super::sched_pool::*;
use super::scheduler::Msg;
use super::{Error, Result};
use tikv_util::time::{Instant, SlowTimer};

// To resolve a key, the write size is about 100~150 bytes, depending on key and value length.
// The write batch will be around 32KB if we scan 256 keys each time.
Expand Down Expand Up @@ -107,29 +98,37 @@ impl Task {
}
}

pub struct Executor<E: Engine> {
pub trait MsgScheduler: Clone + Send + 'static {
fn on_msg(&self, task: Msg);
}

pub struct Executor<E: Engine, S: MsgScheduler> {
// We put time consuming tasks to the thread pool.
sched_pool: Option<SchedPool<E>>,
// And the tasks completes we post a completion to the `Scheduler`.
scheduler: Option<worker::Scheduler<Msg>>,
scheduler: Option<S>,
}

impl<E: Engine> Executor<E> {
pub fn new(scheduler: worker::Scheduler<Msg>, pool: SchedPool<E>) -> Self {
impl<E: Engine, S: MsgScheduler> Executor<E, S> {
pub fn new(scheduler: S, pool: SchedPool<E>) -> Self {
Executor {
scheduler: Some(scheduler),
sched_pool: Some(pool),
}
}

fn take_scheduler(&mut self) -> worker::Scheduler<Msg> {
fn take_scheduler(&mut self) -> S {
self.scheduler.take().unwrap()
}

fn take_pool(&mut self) -> SchedPool<E> {
self.sched_pool.take().unwrap()
}

pub fn clone_pool(&self) -> SchedPool<E> {
self.sched_pool.clone().unwrap()
}

/// Start the execution of the task.
pub fn execute(mut self, cb_ctx: CbContext, snapshot: EngineResult<E::Snap>, task: Task) {
debug!(
Expand Down Expand Up @@ -249,19 +248,18 @@ impl<E: Engine> Executor<E> {
let sched = scheduler.clone();
// The callback to receive async results of write prepare from the storage engine.
let engine_cb = Box::new(move |(_, result)| {
if notify_scheduler(
notify_scheduler(
sched,
Msg::WriteFinished {
cid,
pr,
result,
tag,
},
) {
KV_COMMAND_KEYWRITE_HISTOGRAM_VEC
.with_label_values(&[tag])
.observe(rows as f64);
}
);
KV_COMMAND_KEYWRITE_HISTOGRAM_VEC
.with_label_values(&[tag])
.observe(rows as f64);
});

if let Err(e) = engine.async_write(&ctx, to_be_write, engine_cb) {
Expand Down Expand Up @@ -574,17 +572,8 @@ fn process_write_impl<S: Snapshot>(
Ok((ctx, pr, modifies, rows))
}

fn notify_scheduler(scheduler: worker::Scheduler<Msg>, msg: Msg) -> bool {
match scheduler.schedule(msg) {
Ok(_) => true,
e @ Err(ScheduleError::Stopped(_)) => {
info!("scheduler stopped"; "err" => ?e);
false
}
Err(e) => {
panic!("schedule msg failed, err:{:?}", e);
}
}
pub fn notify_scheduler<S: MsgScheduler>(scheduler: S, msg: Msg) {
scheduler.on_msg(msg);
}

// Make clippy happy.
Expand Down
Loading

0 comments on commit d0298ca

Please sign in to comment.