Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: segcache: use mutex and jemalloc #459

Draft
wants to merge 13 commits into
base: master
Choose a base branch
from
30 changes: 30 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ members = [

[workspace.dependencies]
ahash = "0.8.0"
arrayvec = "0.7.2"
backtrace = "0.3.66"
bitvec = "1.0.1"
blake3 = "1.3.1"
Expand All @@ -48,6 +49,7 @@ clap = "2.33.3"
crossbeam-channel = "0.5.6"
crossbeam-queue = "0.3.5"
foreign-types-shared = "0.3.1"
jemallocator = "0.5.0"
libc = "0.2.134"
log = "0.4.17"
memmap2 = "0.2.2"
Expand Down
6 changes: 3 additions & 3 deletions src/core/admin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,16 +316,16 @@ impl Admin {
match request {
AdminRequest::FlushAll => {
let _ = self.signal_queue_tx.try_send_all(Signal::FlushAll);
session.send(AdminResponse::Ok)?;
session.send(&AdminResponse::Ok)?;
}
AdminRequest::Quit => {
return Err(Error::new(ErrorKind::Other, "should hangup"));
}
AdminRequest::Stats => {
session.send(AdminResponse::Stats)?;
session.send(&AdminResponse::Stats)?;
}
AdminRequest::Version => {
session.send(AdminResponse::version(self.version.clone()))?;
session.send(&AdminResponse::version(self.version.clone()))?;
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/core/proxy/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,10 @@ where
{
if let Some(session) = self.sessions.get_mut(token.0) {
if response.should_hangup() {
let _ = session.send(FrontendResponse::from(response));
let _ = session.send(&FrontendResponse::from(response));
self.close(token);
continue;
} else if session.send(FrontendResponse::from(response)).is_err() {
} else if session.send(&FrontendResponse::from(response)).is_err() {
self.close(token);
continue;
} else if session.write_pending() > 0 {
Expand Down
5 changes: 2 additions & 3 deletions src/core/server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,18 +97,17 @@ use admin::AdminBuilder;
use common::signal::Signal;
use common::ssl::tls_acceptor;
use config::*;
use core::marker::PhantomData;
use core::time::Duration;
use crossbeam_channel::{bounded, Sender};
use entrystore::EntryStore;
use logger::{Drain, Klog};
use protocol_common::{Compose, Execute, Parse};
use protocol_common::{Compose, Execute, IntoBuffers, Parse};
use queues::Queues;
use rustcommon_metrics::*;
use session::{Buf, ServerSession, Session};
use slab::Slab;
use std::io::{Error, ErrorKind, Result};
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use waker::Waker;

mod listener;
Expand Down
2 changes: 1 addition & 1 deletion src/core/server/src/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ impl<Parser, Request, Response, Storage> ProcessBuilder<Parser, Request, Respons
where
Parser: 'static + Parse<Request> + Clone + Send,
Request: 'static + Klog + Klog<Response = Response> + Send,
Response: 'static + Compose + Send,
Response: 'static + Compose + IntoBuffers + Send,
Storage: 'static + Execute<Request, Response> + EntryStore + Send,
{
pub fn new<T: AdminConfig + ServerConfig + TlsConfig + WorkerConfig>(
Expand Down
68 changes: 19 additions & 49 deletions src/core/server/src/workers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@ use std::thread::JoinHandle;

mod multi;
mod single;
mod storage;

use multi::*;
use single::*;
use storage::*;

heatmap!(
WORKER_EVENT_DEPTH,
Expand Down Expand Up @@ -44,16 +42,15 @@ pub enum Workers<Parser, Request, Response, Storage> {
worker: SingleWorker<Parser, Request, Response, Storage>,
},
Multi {
workers: Vec<MultiWorker<Parser, Request, Response>>,
storage: StorageWorker<Request, Response, Storage, Token>,
workers: Vec<MultiWorker<Parser, Request, Response, Storage>>,
},
}

impl<Parser, Request, Response, Storage> Workers<Parser, Request, Response, Storage>
where
Parser: 'static + Parse<Request> + Clone + Send,
Request: 'static + Klog + Klog<Response = Response> + Send,
Response: 'static + Compose + Send,
Response: 'static + Compose + IntoBuffers + Send,
Storage: 'static + EntryStore + Execute<Request, Response> + Send,
{
pub fn spawn(self) -> Vec<JoinHandle<()>> {
Expand All @@ -64,14 +61,8 @@ where
.spawn(move || worker.run())
.unwrap()]
}
Self::Multi {
mut workers,
mut storage,
} => {
let mut join_handles = vec![std::thread::Builder::new()
.name(format!("{}_storage", THREAD_PREFIX))
.spawn(move || storage.run())
.unwrap()];
Self::Multi { mut workers } => {
let mut join_handles = vec![];

for (id, mut worker) in workers.drain(..).enumerate() {
join_handles.push(
Expand All @@ -93,8 +84,7 @@ pub enum WorkersBuilder<Parser, Request, Response, Storage> {
worker: SingleWorkerBuilder<Parser, Request, Response, Storage>,
},
Multi {
workers: Vec<MultiWorkerBuilder<Parser, Request, Response>>,
storage: StorageWorkerBuilder<Request, Response, Storage>,
workers: Vec<MultiWorkerBuilder<Parser, Request, Response, Storage>>,
},
}

Expand All @@ -108,15 +98,18 @@ where
let threads = config.worker().threads();

if threads > 1 {
let storage = Arc::new(Mutex::new(storage));

let mut workers = vec![];
for _ in 0..threads {
workers.push(MultiWorkerBuilder::new(config, parser.clone())?)
workers.push(MultiWorkerBuilder::new(
config,
parser.clone(),
storage.clone(),
)?)
}

Ok(Self::Multi {
workers,
storage: StorageWorkerBuilder::new(config, storage)?,
})
Ok(Self::Multi { workers })
} else {
Ok(Self::Single {
worker: SingleWorkerBuilder::new(config, parser, storage)?,
Expand All @@ -129,10 +122,7 @@ where
Self::Single { worker } => {
vec![worker.waker()]
}
Self::Multi {
workers,
storage: _,
} => workers.iter().map(|w| w.waker()).collect(),
Self::Multi { workers } => workers.iter().map(|w| w.waker()).collect(),
}
}

Expand All @@ -141,8 +131,8 @@ where
Self::Single { worker } => {
vec![worker.waker()]
}
Self::Multi { workers, storage } => {
let mut wakers = vec![storage.waker()];
Self::Multi { workers } => {
let mut wakers = vec![];
for worker in workers {
wakers.push(worker.waker());
}
Expand All @@ -160,35 +150,15 @@ where
let mut session_queues = session_queues;
match self {
Self::Multi {
storage,
// storage,
mut workers,
} => {
let storage_wakers = vec![storage.waker()];
let worker_wakers: Vec<Arc<Waker>> = workers.iter().map(|v| v.waker()).collect();
let (mut worker_data_queues, mut storage_data_queues) =
Queues::new(worker_wakers, storage_wakers, QUEUE_CAPACITY);

// The storage thread precedes the worker threads in the set of
// wakers, so its signal queue is the first element of
// `signal_queues`. Its request queue is also the first (and
// only) element of `request_queues`. We remove these and build
// the storage so we can loop through the remaining signal
// queues when launching the worker threads.
let s = storage.build(storage_data_queues.remove(0), signal_queues.remove(0));

let mut w = Vec::new();
for worker_builder in workers.drain(..) {
w.push(worker_builder.build(
worker_data_queues.remove(0),
session_queues.remove(0),
signal_queues.remove(0),
));
w.push(worker_builder.build(session_queues.remove(0), signal_queues.remove(0)));
}

Workers::Multi {
storage: s,
workers: w,
}
Workers::Multi { workers: w }
}
Self::Single { worker } => Workers::Single {
worker: worker.build(session_queues.remove(0), signal_queues.remove(0)),
Expand Down
Loading