Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txn: remove scheduler #4098

Merged
merged 31 commits into from
May 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
bbc28ef
remove scheduler worker
siddontang Dec 22, 2018
67a555f
Merge branch 'master' into no-scheduler
hicqu Jan 21, 2019
8fff567
A little fix.
hicqu Jan 21, 2019
cf3d24c
Merge branch 'master' into no-scheduler
hicqu Mar 8, 2019
71016f5
a little fix.
hicqu Mar 8, 2019
a6f9188
Merge branch 'master' into no-scheduler
hicqu May 7, 2019
fb1a117
Merge branch 'master' into no-scheduler
hicqu May 7, 2019
461457e
cargo fmt
hicqu May 7, 2019
93b5d39
Merge branch 'master' into no-scheduler
hicqu May 8, 2019
d37a59d
address comments.
hicqu May 8, 2019
4179329
make clippy happy
hicqu May 8, 2019
bfaa606
Merge branch 'master' into no-scheduler
hicqu May 8, 2019
54f9584
enlarge slots
hicqu May 8, 2019
549c2fb
Merge branch 'master' into no-scheduler
hicqu May 17, 2019
325f909
move snapshot to scheduler worker threads
hicqu May 9, 2019
d17a5c2
Merge branch 'master' into no-scheduler
hicqu May 17, 2019
11935a2
make clippy happy
hicqu May 17, 2019
79e02c8
use futrues pool.
hicqu May 17, 2019
bc57f77
Merge branch 'master' into no-scheduler
hicqu May 17, 2019
444be4f
fix cippy
hicqu May 17, 2019
974d13b
add gc worker stop code back
hicqu May 17, 2019
e4ddc4b
address comemts.
hicqu May 17, 2019
31ea9dc
address comments.
hicqu May 17, 2019
8de53be
address comments.
hicqu May 17, 2019
67c2fbb
Merge branch 'master' into no-scheduler
hicqu May 20, 2019
784c0dc
address comments.
hicqu May 20, 2019
222d362
Merge branch 'no-scheduler' of github.com:hicqu/tikv into no-scheduler
hicqu May 20, 2019
d39bacd
add logs for scheduler initialization time
hicqu May 20, 2019
2de0002
Merge branch 'master' into no-scheduler
hicqu May 20, 2019
357002d
remove useless `SchedulerError`
hicqu May 20, 2019
679e670
Merge branch 'no-scheduler' of github.com:hicqu/tikv into no-scheduler
hicqu May 20, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 8 additions & 35 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::cmp;
use std::error;
use std::fmt::{self, Debug, Display, Formatter};
use std::io::Error as IoError;
use std::sync::{atomic, Arc, Mutex};
use std::sync::{atomic, Arc};
use std::u64;

use engine::rocks::DB;
Expand All @@ -25,12 +25,10 @@ use kvproto::kvrpcpb::{CommandPri, Context, KeyRange, LockInfo};
use crate::server::readpool::{self, Builder as ReadPoolBuilder, ReadPool};
use crate::server::ServerRaftStoreRouter;
use tikv_util::collections::HashMap;
use tikv_util::worker::{self, Builder, ScheduleError, Worker};

use self::gc_worker::GCWorker;
use self::metrics::*;
use self::mvcc::Lock;
use self::txn::CMD_BATCH_SIZE;

pub use self::config::{BlockCacheConfig, Config, DEFAULT_DATA_DIR, DEFAULT_ROCKSDB_SUB_DIR};
pub use self::gc_worker::{AutoGCConfig, GCSafePointProvider};
Expand All @@ -42,6 +40,7 @@ pub use self::kv::{
};
pub use self::mvcc::Scanner as StoreScanner;
pub use self::readpool_impl::*;
use self::txn::scheduler::Scheduler as TxnScheduler;
pub use self::txn::{FixtureStore, FixtureStoreScanner};
pub use self::txn::{Msg, Scanner, Scheduler, SnapshotStore, Store};
pub use self::types::{Key, KvPair, MvccInfo, Value};
Expand Down Expand Up @@ -497,10 +496,7 @@ pub struct Storage<E: Engine> {
// TODO: Too many Arcs, would be slow when clone.
engine: E,

/// The worker to execute storage commands.
worker: Arc<Mutex<Worker<Msg>>>,
/// `worker_scheduler` is used to schedule tasks to run in `worker`.
worker_scheduler: worker::Scheduler<Msg>,
sched: TxnScheduler<E>,

/// The thread pool used to run most read operations.
read_pool: ReadPool,
Expand All @@ -527,8 +523,7 @@ impl<E: Engine> Clone for Storage<E> {

Self {
engine: self.engine.clone(),
worker: self.worker.clone(),
worker_scheduler: self.worker_scheduler.clone(),
sched: self.sched.clone(),
read_pool: self.read_pool.clone(),
gc_worker: self.gc_worker.clone(),
refs: self.refs.clone(),
Expand All @@ -550,15 +545,6 @@ impl<E: Engine> Drop for Storage<E> {
return;
}

// This is the last reference of the storage. Now all its references are dropped. Stop and
// destroy the storage now.
let mut worker = self.worker.lock().unwrap();

let h = worker.stop().unwrap();
if let Err(e) = h.join() {
error!("Failed to join sched_handle"; "err" => ?e);
}

let r = self.gc_worker.stop();
if let Err(e) = r {
error!("Failed to stop gc_worker:"; "err" => ?e);
Expand All @@ -577,16 +563,8 @@ impl<E: Engine> Storage<E> {
local_storage: Option<Arc<DB>>,
raft_store_router: Option<ServerRaftStoreRouter>,
) -> Result<Self> {
let worker = Arc::new(Mutex::new(
Builder::new("storage-scheduler")
.batch_size(CMD_BATCH_SIZE)
.pending_capacity(config.scheduler_notify_capacity)
.create(),
));
let worker_scheduler = worker.lock().unwrap().scheduler();
let runner = Scheduler::new(
let sched = TxnScheduler::new(
engine.clone(),
worker_scheduler.clone(),
config.scheduler_concurrency,
config.scheduler_worker_pool_size,
config.scheduler_pending_write_threshold.0 as usize,
Expand All @@ -598,15 +576,13 @@ impl<E: Engine> Storage<E> {
config.gc_ratio_threshold,
);

worker.lock().unwrap().start(runner)?;
gc_worker.start()?;

info!("Storage started.");

Ok(Storage {
engine,
worker,
worker_scheduler,
sched,
read_pool,
gc_worker,
refs: Arc::new(atomic::AtomicUsize::new(1)),
Expand All @@ -632,11 +608,8 @@ impl<E: Engine> Storage<E> {
#[inline]
fn schedule(&self, cmd: Command, cb: StorageCb) -> Result<()> {
fail_point!("storage_drop_message", |_| Ok(()));
match self.worker_scheduler.schedule(Msg::RawCmd { cmd, cb }) {
Ok(()) => Ok(()),
Err(ScheduleError::Full(_)) => Err(Error::SchedTooBusy),
Err(ScheduleError::Stopped(_)) => Err(Error::Closed),
}
self.sched.run_cmd(cmd, cb);
Ok(())
}

/// Get a snapshot of `engine`.
Expand Down
33 changes: 14 additions & 19 deletions src/storage/txn/latch.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
// Copyright 2016 TiKV Project Authors. Licensed under Apache-2.0.


#![allow(deprecated)]

use std::collections::hash_map::DefaultHasher;
use std::collections::VecDeque;
use std::hash::{Hash, Hasher, SipHasher as DefaultHasher};
use std::hash::{Hash, Hasher};
use std::sync::Mutex;
use std::usize;

/// Latch which is used to serialize accesses to resources hashed to the same slot.
Expand Down Expand Up @@ -63,7 +62,7 @@ impl Lock {
/// Each latch is indexed by a slot ID, hence the term latch and slot are used interchangeably, but
/// conceptually a latch is a queue, and a slot is an index to the queue.
pub struct Latches {
slots: Vec<Latch>,
slots: Vec<Mutex<Latch>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here I just think that if we use SpinLock, can we gain a better performance?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can do a benchmark later @hicqu

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure.

size: usize,
}

Expand All @@ -72,11 +71,10 @@ impl Latches {
///
/// The size will be rounded up to the power of 2.
pub fn new(size: usize) -> Latches {
let power_of_two_size = usize::next_power_of_two(size);
Latches {
slots: vec![Latch::new(); power_of_two_size],
size: power_of_two_size,
}
let size = usize::next_power_of_two(size);
let mut slots = Vec::with_capacity(size);
(0..size).for_each(|_| slots.push(Mutex::new(Latch::new())));
hicqu marked this conversation as resolved.
Show resolved Hide resolved
Latches { slots, size }
}

/// Creates a lock which specifies all the required latches for a command.
Expand All @@ -96,11 +94,10 @@ impl Latches {
/// This method will enqueue the command ID into the waiting queues of the latches. A latch is
/// considered acquired if the command ID is at the front of the queue. Returns true if all the
/// Latches are acquired, false otherwise.
pub fn acquire(&mut self, lock: &mut Lock, who: u64) -> bool {
pub fn acquire(&self, lock: &mut Lock, who: u64) -> bool {
let mut acquired_count: usize = 0;
for i in &lock.required_slots[lock.owned_count..] {
let latch = &mut self.slots[*i];

let mut latch = self.slots[*i].lock().unwrap();
let front = latch.waiting.front().cloned();
match front {
Some(cid) => {
Expand All @@ -117,21 +114,19 @@ impl Latches {
}
}
}

lock.owned_count += acquired_count;
lock.acquired()
}

/// Releases all latches owned by the `lock` of command with ID `who`, returns the wakeup list.
///
/// Preconditions: the caller must ensure the command is at the front of the latches.
pub fn release(&mut self, lock: &Lock, who: u64) -> Vec<u64> {
pub fn release(&self, lock: &Lock, who: u64) -> Vec<u64> {
let mut wakeup_list: Vec<u64> = vec![];
for i in &lock.required_slots[..lock.owned_count] {
let latch = &mut self.slots[*i];
let mut latch = self.slots[*i].lock().unwrap();
let front = latch.waiting.pop_front().unwrap();
assert_eq!(front, who);

if let Some(wakeup) = latch.waiting.front() {
wakeup_list.push(*wakeup);
}
Expand All @@ -156,7 +151,7 @@ mod tests {

#[test]
fn test_wakeup() {
let mut latches = Latches::new(256);
let latches = Latches::new(256);

let slots_a: Vec<usize> = vec![1, 3, 5];
let mut lock_a = Lock::new(slots_a);
Expand Down Expand Up @@ -184,7 +179,7 @@ mod tests {

#[test]
fn test_wakeup_by_multi_cmds() {
let mut latches = Latches::new(256);
let latches = Latches::new(256);

let slots_a: Vec<usize> = vec![1, 2, 3];
let slots_b: Vec<usize> = vec![4, 5, 6];
Expand Down
4 changes: 2 additions & 2 deletions src/storage/txn/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
mod latch;
mod process;
pub mod sched_pool;
mod scheduler;
pub mod scheduler;
mod store;

use std::error;
use std::io::Error as IoError;

pub use self::process::RESOLVE_LOCK_BATCH_SIZE;
pub use self::scheduler::{Msg, Scheduler, CMD_BATCH_SIZE};
pub use self::scheduler::{Msg, Scheduler};
pub use self::store::{FixtureStore, FixtureStoreScanner};
pub use self::store::{Scanner, SnapshotStore, Store};
use tikv_util::escape;
Expand Down
61 changes: 25 additions & 36 deletions src/storage/txn/process.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.

use std::mem;
use std::thread;
use std::time::Duration;
use std::u64;
use std::{mem, thread, u64};

use futures::future;
use kvproto::kvrpcpb::{CommandPri, Context, LockInfo};
Expand All @@ -12,19 +10,12 @@ use crate::storage::kv::{CbContext, Modify, Result as EngineResult};
use crate::storage::mvcc::{
Error as MvccError, Lock as MvccLock, MvccReader, MvccTxn, Write, MAX_TXN_WRITE_SIZE,
};
use crate::storage::txn::{sched_pool::*, scheduler::Msg, Error, Result};
use crate::storage::{
Command, Engine, Error as StorageError, Result as StorageResult, ScanMode, Snapshot,
Statistics, StorageCb,
metrics::*, Command, Engine, Error as StorageError, Key, MvccInfo, Result as StorageResult,
ScanMode, Snapshot, Statistics, StorageCb, Value,
};
use crate::storage::{Key, MvccInfo, Value};
use tikv_util::time::Instant;
use tikv_util::time::SlowTimer;
use tikv_util::worker::{self, ScheduleError};

use super::super::metrics::*;
use super::sched_pool::*;
use super::scheduler::Msg;
use super::{Error, Result};
use tikv_util::time::{Instant, SlowTimer};

// To resolve a key, the write size is about 100~150 bytes, depending on key and value length.
// The write batch will be around 32KB if we scan 256 keys each time.
Expand Down Expand Up @@ -107,29 +98,37 @@ impl Task {
}
}

pub struct Executor<E: Engine> {
pub trait MsgScheduler: Clone + Send + 'static {
fn on_msg(&self, task: Msg);
}

pub struct Executor<E: Engine, S: MsgScheduler> {
// We put time consuming tasks to the thread pool.
sched_pool: Option<SchedPool<E>>,
// And the tasks completes we post a completion to the `Scheduler`.
scheduler: Option<worker::Scheduler<Msg>>,
scheduler: Option<S>,
}

impl<E: Engine> Executor<E> {
pub fn new(scheduler: worker::Scheduler<Msg>, pool: SchedPool<E>) -> Self {
impl<E: Engine, S: MsgScheduler> Executor<E, S> {
pub fn new(scheduler: S, pool: SchedPool<E>) -> Self {
Executor {
scheduler: Some(scheduler),
sched_pool: Some(pool),
}
}

fn take_scheduler(&mut self) -> worker::Scheduler<Msg> {
fn take_scheduler(&mut self) -> S {
self.scheduler.take().unwrap()
}

fn take_pool(&mut self) -> SchedPool<E> {
self.sched_pool.take().unwrap()
}

pub fn clone_pool(&self) -> SchedPool<E> {
self.sched_pool.clone().unwrap()
}

/// Start the execution of the task.
pub fn execute(mut self, cb_ctx: CbContext, snapshot: EngineResult<E::Snap>, task: Task) {
debug!(
Expand Down Expand Up @@ -249,19 +248,18 @@ impl<E: Engine> Executor<E> {
let sched = scheduler.clone();
// The callback to receive async results of write prepare from the storage engine.
let engine_cb = Box::new(move |(_, result)| {
if notify_scheduler(
notify_scheduler(
sched,
Msg::WriteFinished {
cid,
pr,
result,
tag,
},
) {
KV_COMMAND_KEYWRITE_HISTOGRAM_VEC
.with_label_values(&[tag])
.observe(rows as f64);
}
);
KV_COMMAND_KEYWRITE_HISTOGRAM_VEC
.with_label_values(&[tag])
.observe(rows as f64);
});

if let Err(e) = engine.async_write(&ctx, to_be_write, engine_cb) {
Expand Down Expand Up @@ -574,17 +572,8 @@ fn process_write_impl<S: Snapshot>(
Ok((ctx, pr, modifies, rows))
}

fn notify_scheduler(scheduler: worker::Scheduler<Msg>, msg: Msg) -> bool {
match scheduler.schedule(msg) {
Ok(_) => true,
e @ Err(ScheduleError::Stopped(_)) => {
info!("scheduler stopped"; "err" => ?e);
false
}
Err(e) => {
panic!("schedule msg failed, err:{:?}", e);
}
}
pub fn notify_scheduler<S: MsgScheduler>(scheduler: S, msg: Msg) {
scheduler.on_msg(msg);
}

// Make clippy happy.
Expand Down
Loading