From 93309db247006b369aa69e6c9072261840ff980a Mon Sep 17 00:00:00 2001 From: Brian Martin Date: Mon, 11 Jul 2022 15:41:32 -0700 Subject: [PATCH 1/9] testing jemalloc allocator for segcache Test using jemalloc --- Cargo.lock | 28 ++++++++++++++++++++++++++++ src/server/segcache/Cargo.toml | 3 +++ src/server/segcache/src/main.rs | 7 +++++++ 3 files changed, 38 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index d97c3a41b..2ff1d1f8f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -658,6 +658,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fs_extra" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2022715d62ab30faffd124d40b76f4134a550a87792276512b18d63272333394" + [[package]] name = "fuchsia-zircon" version = "0.3.3" @@ -925,6 +931,27 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "112c678d4050afce233f4f2852bb2eb519230b3cf12f33585275537d7e41578d" +[[package]] +name = "jemalloc-sys" +version = "0.5.1+5.3.0-patched" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7c2b313609b95939cb0c5a5c6917fb9b7c9394562aa3ef44eb66ffa51736432" +dependencies = [ + "cc", + "fs_extra", + "libc", +] + +[[package]] +name = "jemallocator" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16c2514137880c52b0b4822b563fadd38257c1f380858addb74a400889696ea6" +dependencies = [ + "jemalloc-sys", + "libc", +] + [[package]] name = "js-sys" version = "0.3.57" @@ -2065,6 +2092,7 @@ dependencies = [ "config", "criterion", "entrystore", + "jemallocator", "logger", "protocol-memcache", "rustcommon-metrics 0.1.1 (git+https://github.com/twitter/rustcommon)", diff --git a/src/server/segcache/Cargo.toml b/src/server/segcache/Cargo.toml index 0ee298ae7..c1753f39e 100644 --- a/src/server/segcache/Cargo.toml +++ b/src/server/segcache/Cargo.toml @@ -47,5 +47,8 @@ protocol-memcache = { path = "../../protocol/memcache" } rustcommon-metrics = { git = "https://github.com/twitter/rustcommon" } server = { path = "../../core/server" } +[target.'cfg(not(target_env = "msvc"))'.dependencies] +jemallocator = "0.5.0" + [dev-dependencies] criterion = "0.3" diff --git a/src/server/segcache/src/main.rs b/src/server/segcache/src/main.rs index bf4322ebe..ed6b41f13 100644 --- a/src/server/segcache/src/main.rs +++ b/src/server/segcache/src/main.rs @@ -12,6 +12,13 @@ //! //! Running this binary is the primary way of using Segcache. +#[cfg(not(target_env = "msvc"))] +use jemallocator::Jemalloc; + +#[cfg(not(target_env = "msvc"))] +#[global_allocator] +static GLOBAL: Jemalloc = Jemalloc; + #[macro_use] extern crate logger; From 6ead9b37a70557f9f98ad5bafa56734a9127231f Mon Sep 17 00:00:00 2001 From: Brian Martin Date: Thu, 8 Sep 2022 14:14:33 -0700 Subject: [PATCH 2/9] use locking instead of queue Replaces the queue between the workers and storage thread with a locking implementation. --- src/core/server/src/lib.rs | 3 +- src/core/server/src/workers/mod.rs | 41 ++----- src/core/server/src/workers/multi.rs | 139 ++++++++++++--------- src/core/server/src/workers/single.rs | 1 + src/core/server/src/workers/storage.rs | 163 ------------------------- 5 files changed, 89 insertions(+), 258 deletions(-) delete mode 100644 src/core/server/src/workers/storage.rs diff --git a/src/core/server/src/lib.rs b/src/core/server/src/lib.rs index a2135af83..4aef1c6d9 100644 --- a/src/core/server/src/lib.rs +++ b/src/core/server/src/lib.rs @@ -97,7 +97,6 @@ use admin::AdminBuilder; use common::signal::Signal; use common::ssl::tls_acceptor; use config::*; -use core::marker::PhantomData; use core::time::Duration; use crossbeam_channel::{bounded, Sender}; use entrystore::EntryStore; @@ -108,7 +107,7 @@ use rustcommon_metrics::*; use session::{Buf, ServerSession, Session}; use slab::Slab; use std::io::{Error, ErrorKind, Result}; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use waker::Waker; mod listener; diff --git a/src/core/server/src/workers/mod.rs b/src/core/server/src/workers/mod.rs index 80d17c9a6..2fcf29103 100644 --- a/src/core/server/src/workers/mod.rs +++ b/src/core/server/src/workers/mod.rs @@ -7,11 +7,9 @@ use std::thread::JoinHandle; mod multi; mod single; -mod storage; use multi::*; use single::*; -use storage::*; heatmap!( WORKER_EVENT_DEPTH, @@ -44,8 +42,7 @@ pub enum Workers { worker: SingleWorker, }, Multi { - workers: Vec>, - storage: StorageWorker, + workers: Vec>, }, } @@ -66,12 +63,8 @@ where } Self::Multi { mut workers, - mut storage, } => { - let mut join_handles = vec![std::thread::Builder::new() - .name(format!("{}_storage", THREAD_PREFIX)) - .spawn(move || storage.run()) - .unwrap()]; + let mut join_handles = vec![]; for (id, mut worker) in workers.drain(..).enumerate() { join_handles.push( @@ -93,8 +86,7 @@ pub enum WorkersBuilder { worker: SingleWorkerBuilder, }, Multi { - workers: Vec>, - storage: StorageWorkerBuilder, + workers: Vec>, }, } @@ -108,14 +100,15 @@ where let threads = config.worker().threads(); if threads > 1 { + let storage = Arc::new(Mutex::new(storage)); + let mut workers = vec![]; for _ in 0..threads { - workers.push(MultiWorkerBuilder::new(config, parser.clone())?) + workers.push(MultiWorkerBuilder::new(config, parser.clone(), storage.clone())?) } Ok(Self::Multi { workers, - storage: StorageWorkerBuilder::new(config, storage)?, }) } else { Ok(Self::Single { @@ -131,7 +124,6 @@ where } Self::Multi { workers, - storage: _, } => workers.iter().map(|w| w.waker()).collect(), } } @@ -141,8 +133,8 @@ where Self::Single { worker } => { vec![worker.waker()] } - Self::Multi { workers, storage } => { - let mut wakers = vec![storage.waker()]; + Self::Multi { workers } => { + let mut wakers = vec![]; for worker in workers { wakers.push(worker.waker()); } @@ -160,33 +152,18 @@ where let mut session_queues = session_queues; match self { Self::Multi { - storage, + // storage, mut workers, } => { - let storage_wakers = vec![storage.waker()]; - let worker_wakers: Vec> = workers.iter().map(|v| v.waker()).collect(); - let (mut worker_data_queues, mut storage_data_queues) = - Queues::new(worker_wakers, storage_wakers, QUEUE_CAPACITY); - - // The storage thread precedes the worker threads in the set of - // wakers, so its signal queue is the first element of - // `signal_queues`. Its request queue is also the first (and - // only) element of `request_queues`. We remove these and build - // the storage so we can loop through the remaining signal - // queues when launching the worker threads. - let s = storage.build(storage_data_queues.remove(0), signal_queues.remove(0)); - let mut w = Vec::new(); for worker_builder in workers.drain(..) { w.push(worker_builder.build( - worker_data_queues.remove(0), session_queues.remove(0), signal_queues.remove(0), )); } Workers::Multi { - storage: s, workers: w, } } diff --git a/src/core/server/src/workers/multi.rs b/src/core/server/src/workers/multi.rs index 113cf08b9..f34d8d244 100644 --- a/src/core/server/src/workers/multi.rs +++ b/src/core/server/src/workers/multi.rs @@ -2,19 +2,22 @@ // Licensed under the Apache License, Version 2.0 // http://www.apache.org/licenses/LICENSE-2.0 +use std::collections::VecDeque; use super::*; -pub struct MultiWorkerBuilder { +pub struct MultiWorkerBuilder { nevent: usize, parser: Parser, + pending: VecDeque, poll: Poll, sessions: Slab>, + storage: Arc>, timeout: Duration, waker: Arc, } -impl MultiWorkerBuilder { - pub fn new(config: &T, parser: Parser) -> Result { +impl MultiWorkerBuilder { + pub fn new(config: &T, parser: Parser, storage: Arc>) -> Result { let config = config.worker(); let poll = Poll::new()?; @@ -29,8 +32,10 @@ impl MultiWorkerBuilder { Ok(Self { nevent, parser, + pending: VecDeque::new(), poll, sessions: Slab::new(), + storage, timeout, waker, }) @@ -42,41 +47,43 @@ impl MultiWorkerBuilder { pub fn build( self, - data_queue: Queues<(Request, Token), (Request, Response, Token)>, session_queue: Queues, signal_queue: Queues<(), Signal>, - ) -> MultiWorker { + ) -> MultiWorker { MultiWorker { - data_queue, nevent: self.nevent, parser: self.parser, + pending: self.pending, poll: self.poll, session_queue, sessions: self.sessions, signal_queue, + storage: self.storage, timeout: self.timeout, waker: self.waker, } } } -pub struct MultiWorker { - data_queue: Queues<(Request, Token), (Request, Response, Token)>, +pub struct MultiWorker { nevent: usize, parser: Parser, + pending: VecDeque, poll: Poll, session_queue: Queues, sessions: Slab>, signal_queue: Queues<(), Signal>, + storage: Arc>, timeout: Duration, waker: Arc, } -impl MultiWorker +impl MultiWorker where Parser: Parse + Clone, Request: Klog + Klog, Response: Compose, + Storage: EntryStore + Execute, { /// Return the `Session` to the `Listener` to handle flush/close fn close(&mut self, token: Token) { @@ -100,10 +107,58 @@ where // process up to one request match session.receive() { - Ok(request) => self - .data_queue - .try_send_to(0, (request, token)) - .map_err(|_| Error::new(ErrorKind::Other, "data queue is full")), + Ok(request) => { + let response = { + let mut storage = self.storage.lock().unwrap(); + (*storage).execute(&request) + }; + PROCESS_REQ.increment(); + if response.should_hangup() { + let _ = session.send(response); + return Err(Error::new(ErrorKind::Other, "should hangup")); + } + request.klog(&response); + match session.send(response) { + Ok(_) => { + // attempt to flush immediately if there's now data in + // the write buffer + if session.write_pending() > 0 { + match session.flush() { + Ok(_) => Ok(()), + Err(e) => map_err(e), + }?; + } + + // reregister to get writable event + if session.write_pending() > 0 { + let interest = session.interest(); + if self + .poll + .registry() + .reregister(session, token, interest) + .is_err() + { + return Err(Error::new(ErrorKind::Other, "failed to reregister")); + } + } + + // if there's still data to read, put the token on the + // pending queue + if session.remaining() > 0 { + self.pending.push_back(token); + } + + Ok(()) + } + Err(e) => { + if e.kind() == ErrorKind::WouldBlock { + Ok(()) + } else { + Err(e) + } + } + } + } Err(e) => map_err(e), } } @@ -126,7 +181,6 @@ where // these are buffers which are re-used in each loop iteration to receive // events and queue messages let mut events = Events::with_capacity(self.nevent); - let mut messages = Vec::with_capacity(QUEUE_CAPACITY); loop { WORKER_EVENT_LOOP.increment(); @@ -152,6 +206,16 @@ where match token { WAKER_TOKEN => { self.waker.reset(); + + // handle outstanding reads + for _ in 0..self.pending.len() { + if let Some(token) = self.pending.pop_front() { + if self.read(token).is_err() { + self.close(token); + } + } + } + // handle up to one new session if let Some(mut session) = self.session_queue.try_recv().map(|v| v.into_inner()) @@ -171,50 +235,6 @@ where let _ = self.waker.wake(); } - // handle all pending messages on the data queue - self.data_queue.try_recv_all(&mut messages); - for (request, response, token) in messages.drain(..).map(|v| v.into_inner()) - { - request.klog(&response); - if let Some(session) = self.sessions.get_mut(token.0) { - if response.should_hangup() { - let _ = session.send(response); - self.close(token); - continue; - } else if session.send(response).is_err() { - self.close(token); - continue; - } else if session.write_pending() > 0 { - // try to immediately flush, if we still - // have pending bytes, reregister. This - // saves us one syscall when flushing would - // not block. - if let Err(e) = session.flush() { - if map_err(e).is_err() { - self.close(token); - continue; - } - } - - if session.write_pending() > 0 { - let interest = session.interest(); - if session - .reregister(self.poll.registry(), token, interest) - .is_err() - { - self.close(token); - continue; - } - } - } - - if session.remaining() > 0 && self.read(token).is_err() { - self.close(token); - continue; - } - } - } - // check if we received any signals from the admin thread while let Some(signal) = self.signal_queue.try_recv().map(|v| v.into_inner()) @@ -257,9 +277,6 @@ where } } } - - // wakes the storage thread if necessary - let _ = self.data_queue.wake(); } } } diff --git a/src/core/server/src/workers/single.rs b/src/core/server/src/workers/single.rs index 974955a10..f316908b5 100644 --- a/src/core/server/src/workers/single.rs +++ b/src/core/server/src/workers/single.rs @@ -214,6 +214,7 @@ where match token { WAKER_TOKEN => { self.waker.reset(); + // handle outstanding reads for _ in 0..self.pending.len() { if let Some(token) = self.pending.pop_front() { diff --git a/src/core/server/src/workers/storage.rs b/src/core/server/src/workers/storage.rs deleted file mode 100644 index 10cd25ce3..000000000 --- a/src/core/server/src/workers/storage.rs +++ /dev/null @@ -1,163 +0,0 @@ -// Copyright 2021 Twitter, Inc. -// Licensed under the Apache License, Version 2.0 -// http://www.apache.org/licenses/LICENSE-2.0 - -use crate::*; - -counter!( - STORAGE_EVENT_LOOP, - "the number of times the event loop has run" -); -heatmap!( - STORAGE_QUEUE_DEPTH, - 1_000_000, - "the distribution of the depth of the storage queue on each loop" -); - -pub struct StorageWorkerBuilder { - nevent: usize, - poll: Poll, - storage: Storage, - timeout: Duration, - waker: Arc, - _request: PhantomData, - _response: PhantomData, -} - -impl StorageWorkerBuilder { - pub fn new(config: &T, storage: Storage) -> Result { - let config = config.worker(); - - let poll = Poll::new()?; - - let waker = Arc::new(Waker::from( - ::net::Waker::new(poll.registry(), WAKER_TOKEN).unwrap(), - )); - - let nevent = config.nevent(); - let timeout = Duration::from_millis(config.timeout() as u64); - - Ok(Self { - nevent, - poll, - storage, - timeout, - waker, - _request: PhantomData, - _response: PhantomData, - }) - } - - pub fn waker(&self) -> Arc { - self.waker.clone() - } - - pub fn build( - self, - data_queue: Queues<(Request, Response, Token), (Request, Token)>, - signal_queue: Queues<(), Signal>, - ) -> StorageWorker { - StorageWorker { - data_queue, - nevent: self.nevent, - poll: self.poll, - signal_queue, - storage: self.storage, - timeout: self.timeout, - waker: self.waker, - _request: PhantomData, - _response: PhantomData, - } - } -} - -pub struct StorageWorker { - data_queue: Queues<(Request, Response, Token), (Request, Token)>, - nevent: usize, - poll: Poll, - signal_queue: Queues<(), Signal>, - storage: Storage, - timeout: Duration, - #[allow(dead_code)] - waker: Arc, - _request: PhantomData, - _response: PhantomData, -} - -impl StorageWorker -where - Storage: Execute + EntryStore, - Request: Klog + Klog, - Response: Compose, -{ - /// Run the `StorageWorker` in a loop, handling new session events. - pub fn run(&mut self) { - let mut events = Events::with_capacity(self.nevent); - let mut messages = Vec::with_capacity(1024); - - loop { - STORAGE_EVENT_LOOP.increment(); - - self.storage.expire(); - - // get events with timeout - if self.poll.poll(&mut events, Some(self.timeout)).is_err() { - error!("Error polling"); - } - - let timestamp = Instant::now(); - - if !events.is_empty() { - self.waker.reset(); - - trace!("handling events"); - - self.data_queue.try_recv_all(&mut messages); - - STORAGE_QUEUE_DEPTH.increment(timestamp, messages.len() as _, 1); - - for message in messages.drain(..) { - let sender = message.sender(); - let (request, token) = message.into_inner(); - trace!("handling request from worker: {}", sender); - let response = self.storage.execute(&request); - PROCESS_REQ.increment(); - let mut message = (request, response, token); - for retry in 0..QUEUE_RETRIES { - if let Err(m) = self.data_queue.try_send_to(sender, message) { - if (retry + 1) == QUEUE_RETRIES { - error!("error sending message to worker"); - } - // wake workers immediately - let _ = self.data_queue.wake(); - message = m; - } else { - break; - } - } - } - - let _ = self.data_queue.wake(); - - // check if we received any signals from the admin thread - while let Some(s) = self.signal_queue.try_recv().map(|v| v.into_inner()) { - match s { - Signal::FlushAll => { - warn!("received flush_all"); - self.storage.clear(); - } - Signal::Shutdown => { - // if we received a shutdown, we can return and stop - // processing events - - // TODO(bmartin): graceful shutdown would occur here - // when we add persistence - - return; - } - } - } - } - } - } -} From 35fcc0c329566fe3b70cca64d2ab89015c1dc9e3 Mon Sep 17 00:00:00 2001 From: Brian Martin Date: Thu, 8 Sep 2022 14:35:45 -0700 Subject: [PATCH 3/9] handle pending, expire, and flush_all --- src/core/server/src/workers/multi.rs | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/src/core/server/src/workers/multi.rs b/src/core/server/src/workers/multi.rs index f34d8d244..d07f8184f 100644 --- a/src/core/server/src/workers/multi.rs +++ b/src/core/server/src/workers/multi.rs @@ -185,6 +185,16 @@ where loop { WORKER_EVENT_LOOP.increment(); + { + let mut storage = self.storage.lock().unwrap(); + (*storage).expire(); + } + + // we need another wakeup if there are still pending reads + if !self.pending.is_empty() { + let _ = self.waker.wake(); + } + // get events with timeout if self.poll.poll(&mut events, Some(self.timeout)).is_err() { error!("Error polling"); @@ -240,7 +250,10 @@ where self.signal_queue.try_recv().map(|v| v.into_inner()) { match signal { - Signal::FlushAll => {} + Signal::FlushAll => { + let mut storage = self.storage.lock().unwrap(); + (*storage).clear(); + } Signal::Shutdown => { // if we received a shutdown, we can return // and stop processing events From f18ba91d1c7cc01d6f73af4afa4fc48858c1f969 Mon Sep 17 00:00:00 2001 From: Brian Martin Date: Mon, 19 Sep 2022 11:03:24 -0700 Subject: [PATCH 4/9] rustfmt --- src/core/server/src/workers/mod.rs | 27 ++++++++++----------------- src/core/server/src/workers/multi.rs | 8 ++++++-- src/core/server/src/workers/single.rs | 2 +- 3 files changed, 17 insertions(+), 20 deletions(-) diff --git a/src/core/server/src/workers/mod.rs b/src/core/server/src/workers/mod.rs index 2fcf29103..8b8b5ab9c 100644 --- a/src/core/server/src/workers/mod.rs +++ b/src/core/server/src/workers/mod.rs @@ -61,9 +61,7 @@ where .spawn(move || worker.run()) .unwrap()] } - Self::Multi { - mut workers, - } => { + Self::Multi { mut workers } => { let mut join_handles = vec![]; for (id, mut worker) in workers.drain(..).enumerate() { @@ -104,12 +102,14 @@ where let mut workers = vec![]; for _ in 0..threads { - workers.push(MultiWorkerBuilder::new(config, parser.clone(), storage.clone())?) + workers.push(MultiWorkerBuilder::new( + config, + parser.clone(), + storage.clone(), + )?) } - Ok(Self::Multi { - workers, - }) + Ok(Self::Multi { workers }) } else { Ok(Self::Single { worker: SingleWorkerBuilder::new(config, parser, storage)?, @@ -122,9 +122,7 @@ where Self::Single { worker } => { vec![worker.waker()] } - Self::Multi { - workers, - } => workers.iter().map(|w| w.waker()).collect(), + Self::Multi { workers } => workers.iter().map(|w| w.waker()).collect(), } } @@ -157,15 +155,10 @@ where } => { let mut w = Vec::new(); for worker_builder in workers.drain(..) { - w.push(worker_builder.build( - session_queues.remove(0), - signal_queues.remove(0), - )); + w.push(worker_builder.build(session_queues.remove(0), signal_queues.remove(0))); } - Workers::Multi { - workers: w, - } + Workers::Multi { workers: w } } Self::Single { worker } => Workers::Single { worker: worker.build(session_queues.remove(0), signal_queues.remove(0)), diff --git a/src/core/server/src/workers/multi.rs b/src/core/server/src/workers/multi.rs index d07f8184f..617a340ab 100644 --- a/src/core/server/src/workers/multi.rs +++ b/src/core/server/src/workers/multi.rs @@ -2,8 +2,8 @@ // Licensed under the Apache License, Version 2.0 // http://www.apache.org/licenses/LICENSE-2.0 -use std::collections::VecDeque; use super::*; +use std::collections::VecDeque; pub struct MultiWorkerBuilder { nevent: usize, @@ -17,7 +17,11 @@ pub struct MultiWorkerBuilder { } impl MultiWorkerBuilder { - pub fn new(config: &T, parser: Parser, storage: Arc>) -> Result { + pub fn new( + config: &T, + parser: Parser, + storage: Arc>, + ) -> Result { let config = config.worker(); let poll = Poll::new()?; diff --git a/src/core/server/src/workers/single.rs b/src/core/server/src/workers/single.rs index f316908b5..f8dbcf573 100644 --- a/src/core/server/src/workers/single.rs +++ b/src/core/server/src/workers/single.rs @@ -214,7 +214,7 @@ where match token { WAKER_TOKEN => { self.waker.reset(); - + // handle outstanding reads for _ in 0..self.pending.len() { if let Some(token) = self.pending.pop_front() { From 1094436c63ae486c4f5c6fdcd1e9f7aded69d028 Mon Sep 17 00:00:00 2001 From: Brian Martin Date: Mon, 3 Oct 2022 11:26:57 -0700 Subject: [PATCH 5/9] move jemalloc version to workspace level --- Cargo.toml | 1 + src/server/segcache/Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 8499cf791..89f1866fb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,6 +48,7 @@ clap = "2.33.3" crossbeam-channel = "0.5.6" crossbeam-queue = "0.3.5" foreign-types-shared = "0.3.1" +jemallocator = "0.5.0" libc = "0.2.134" log = "0.4.17" memmap2 = "0.2.2" diff --git a/src/server/segcache/Cargo.toml b/src/server/segcache/Cargo.toml index acdc1592c..bc46ee9ee 100644 --- a/src/server/segcache/Cargo.toml +++ b/src/server/segcache/Cargo.toml @@ -49,7 +49,7 @@ rustcommon-metrics = { workspace = true } server = { path = "../../core/server" } [target.'cfg(not(target_env = "msvc"))'.dependencies] -jemallocator = "0.5.0" +jemallocator = { workspace = true } [dev-dependencies] criterion = "0.3" From bd09845516fd4fc2ff622884b871f876f05d1a07 Mon Sep 17 00:00:00 2001 From: Brian Martin Date: Thu, 6 Oct 2022 15:33:21 -0700 Subject: [PATCH 6/9] add response buffer recycling Changes to the protocol / entrystore layers to allow recycling the buffers which are used to hold key/value responses. --- src/core/admin/src/lib.rs | 6 +- src/core/proxy/src/frontend.rs | 4 +- src/core/server/src/lib.rs | 2 +- src/core/server/src/process.rs | 2 +- src/core/server/src/workers/mod.rs | 2 +- src/core/server/src/workers/multi.rs | 20 +++- src/core/server/src/workers/single.rs | 18 +++- src/entrystore/src/noop/ping.rs | 2 +- src/entrystore/src/seg/memcache.rs | 27 +++-- src/protocol/common/src/lib.rs | 6 +- src/protocol/memcache/src/request/get.rs | 4 +- src/protocol/memcache/src/request/gets.rs | 4 +- src/protocol/memcache/src/response/mod.rs | 17 ++- src/protocol/memcache/src/response/values.rs | 101 ++++++++++++------ src/protocol/memcache/src/storage/mod.rs | 2 +- .../ping/src/ping/wire/response/mod.rs | 8 ++ src/session/src/server.rs | 2 +- 17 files changed, 155 insertions(+), 72 deletions(-) diff --git a/src/core/admin/src/lib.rs b/src/core/admin/src/lib.rs index c78c2e3e2..9b767d38a 100644 --- a/src/core/admin/src/lib.rs +++ b/src/core/admin/src/lib.rs @@ -316,16 +316,16 @@ impl Admin { match request { AdminRequest::FlushAll => { let _ = self.signal_queue_tx.try_send_all(Signal::FlushAll); - session.send(AdminResponse::Ok)?; + session.send(&AdminResponse::Ok)?; } AdminRequest::Quit => { return Err(Error::new(ErrorKind::Other, "should hangup")); } AdminRequest::Stats => { - session.send(AdminResponse::Stats)?; + session.send(&AdminResponse::Stats)?; } AdminRequest::Version => { - session.send(AdminResponse::version(self.version.clone()))?; + session.send(&AdminResponse::version(self.version.clone()))?; } } diff --git a/src/core/proxy/src/frontend.rs b/src/core/proxy/src/frontend.rs index fa9be36e5..4e0ab3aaf 100644 --- a/src/core/proxy/src/frontend.rs +++ b/src/core/proxy/src/frontend.rs @@ -237,10 +237,10 @@ where { if let Some(session) = self.sessions.get_mut(token.0) { if response.should_hangup() { - let _ = session.send(FrontendResponse::from(response)); + let _ = session.send(&FrontendResponse::from(response)); self.close(token); continue; - } else if session.send(FrontendResponse::from(response)).is_err() { + } else if session.send(&FrontendResponse::from(response)).is_err() { self.close(token); continue; } else if session.write_pending() > 0 { diff --git a/src/core/server/src/lib.rs b/src/core/server/src/lib.rs index 4aef1c6d9..4b8f89946 100644 --- a/src/core/server/src/lib.rs +++ b/src/core/server/src/lib.rs @@ -101,7 +101,7 @@ use core::time::Duration; use crossbeam_channel::{bounded, Sender}; use entrystore::EntryStore; use logger::{Drain, Klog}; -use protocol_common::{Compose, Execute, Parse}; +use protocol_common::{Compose, Execute, Parse, IntoBuffers}; use queues::Queues; use rustcommon_metrics::*; use session::{Buf, ServerSession, Session}; diff --git a/src/core/server/src/process.rs b/src/core/server/src/process.rs index 4d98b7a2e..803967b0c 100644 --- a/src/core/server/src/process.rs +++ b/src/core/server/src/process.rs @@ -16,7 +16,7 @@ impl ProcessBuilder + Clone + Send, Request: 'static + Klog + Klog + Send, - Response: 'static + Compose + Send, + Response: 'static + Compose + IntoBuffers + Send, Storage: 'static + Execute + EntryStore + Send, { pub fn new( diff --git a/src/core/server/src/workers/mod.rs b/src/core/server/src/workers/mod.rs index 8b8b5ab9c..48cced09c 100644 --- a/src/core/server/src/workers/mod.rs +++ b/src/core/server/src/workers/mod.rs @@ -50,7 +50,7 @@ impl Workers + Clone + Send, Request: 'static + Klog + Klog + Send, - Response: 'static + Compose + Send, + Response: 'static + Compose + IntoBuffers + Send, Storage: 'static + EntryStore + Execute + Send, { pub fn spawn(self) -> Vec> { diff --git a/src/core/server/src/workers/multi.rs b/src/core/server/src/workers/multi.rs index 617a340ab..9fb097a84 100644 --- a/src/core/server/src/workers/multi.rs +++ b/src/core/server/src/workers/multi.rs @@ -14,6 +14,7 @@ pub struct MultiWorkerBuilder { storage: Arc>, timeout: Duration, waker: Arc, + buffers: Vec>, } impl MultiWorkerBuilder { @@ -33,6 +34,8 @@ impl MultiWorkerBuilder MultiWorkerBuilder MultiWorkerBuilder { storage: Arc>, timeout: Duration, waker: Arc, + buffers: Vec>, } impl MultiWorker where Parser: Parse + Clone, Request: Klog + Klog, - Response: Compose, + Response: Compose + IntoBuffers, Storage: EntryStore + Execute, { /// Return the `Session` to the `Listener` to handle flush/close @@ -114,15 +120,15 @@ where Ok(request) => { let response = { let mut storage = self.storage.lock().unwrap(); - (*storage).execute(&request) + (*storage).execute(&request, &mut self.buffers) }; PROCESS_REQ.increment(); if response.should_hangup() { - let _ = session.send(response); + let _ = session.send(&response); return Err(Error::new(ErrorKind::Other, "should hangup")); } request.klog(&response); - match session.send(response) { + let result = match session.send(&response) { Ok(_) => { // attempt to flush immediately if there's now data in // the write buffer @@ -161,7 +167,13 @@ where Err(e) } } + }; + if let Some(mut buffers) = response.into_buffers() { + for buffer in buffers.drain(..) { + self.buffers.push(buffer); + } } + result } Err(e) => map_err(e), } diff --git a/src/core/server/src/workers/single.rs b/src/core/server/src/workers/single.rs index f8dbcf573..60d93957d 100644 --- a/src/core/server/src/workers/single.rs +++ b/src/core/server/src/workers/single.rs @@ -14,6 +14,7 @@ pub struct SingleWorkerBuilder { storage: Storage, timeout: Duration, waker: Arc, + buffers: Vec>, } impl SingleWorkerBuilder { @@ -38,6 +39,7 @@ impl SingleWorkerBuilder SingleWorkerBuilder { storage: Storage, timeout: Duration, waker: Arc, + buffers: Vec>, } impl SingleWorker where Parser: Parse + Clone, Request: Klog + Klog, - Response: Compose, + Response: Compose + IntoBuffers, Storage: EntryStore + Execute, { /// Return the `Session` to the `Listener` to handle flush/close @@ -108,14 +112,14 @@ where // process up to one pending request match session.receive() { Ok(request) => { - let response = self.storage.execute(&request); + let response = self.storage.execute(&request, &mut self.buffers); PROCESS_REQ.increment(); if response.should_hangup() { - let _ = session.send(response); + let _ = session.send(&response); return Err(Error::new(ErrorKind::Other, "should hangup")); } request.klog(&response); - match session.send(response) { + let result = match session.send(&response) { Ok(_) => { // attempt to flush immediately if there's now data in // the write buffer @@ -154,7 +158,13 @@ where Err(e) } } + }; + if let Some(mut buffers) = response.into_buffers() { + for buffer in buffers.drain(..) { + self.buffers.push(buffer); + } } + result } Err(e) => { if e.kind() == ErrorKind::WouldBlock { diff --git a/src/entrystore/src/noop/ping.rs b/src/entrystore/src/noop/ping.rs index 3870d4547..1c840f0d2 100644 --- a/src/entrystore/src/noop/ping.rs +++ b/src/entrystore/src/noop/ping.rs @@ -11,7 +11,7 @@ use protocol_ping::*; impl PingStorage for Noop {} impl Execute for Noop { - fn execute(&mut self, request: &Request) -> Response { + fn execute(&mut self, request: &Request, _buffers: &mut Vec>) -> Response { match request { Request::Ping => Response::Pong, } diff --git a/src/entrystore/src/seg/memcache.rs b/src/entrystore/src/seg/memcache.rs index 9518580cf..515d3aa07 100644 --- a/src/entrystore/src/seg/memcache.rs +++ b/src/entrystore/src/seg/memcache.rs @@ -13,9 +13,9 @@ use protocol_memcache::*; use std::time::Duration; impl Execute for Seg { - fn execute(&mut self, request: &Request) -> Response { + fn execute(&mut self, request: &Request, buffers: &mut Vec>) -> Response { match request { - Request::Get(get) => self.get(get), + Request::Get(get) => self.get(get, buffers), Request::Gets(gets) => self.gets(gets), Request::Set(set) => self.set(set), Request::Add(add) => self.add(add), @@ -33,30 +33,35 @@ impl Execute for Seg { } impl Storage for Seg { - fn get(&mut self, get: &Get) -> Response { + fn get(&mut self, get: &Get, buffers: &mut Vec>) -> Response { let mut values = Vec::with_capacity(get.keys().len()); + for key in get.keys().iter() { + let buffer = buffers.pop().unwrap_or_default(); + if let Some(item) = self.data.get(key) { let o = item.optional().unwrap_or(&[0, 0, 0, 0]); let flags = u32::from_be_bytes([o[0], o[1], o[2], o[3]]); match item.value() { - seg::Value::Bytes(b) => { - values.push(Value::new(item.key(), flags, None, b)); + seg::Value::Bytes(value) => { + values.push(Value::new_with_buffer(item.key(), flags, None, value, buffer)); } - seg::Value::U64(v) => { - values.push(Value::new( + seg::Value::U64(value) => { + values.push(Value::new_with_buffer( item.key(), flags, None, - format!("{}", v).as_bytes(), + format!("{}", value).as_bytes(), + buffer )); } } } else { - values.push(Value::none(key)); + values.push(Value::none_with_buffer(key, buffer)); } } - Values::new(values.into_boxed_slice()).into() + + Values::new(values).into() } fn gets(&mut self, get: &Gets) -> Response { @@ -82,7 +87,7 @@ impl Storage for Seg { values.push(Value::none(key)); } } - Values::new(values.into_boxed_slice()).into() + Values::new(values).into() } fn set(&mut self, set: &Set) -> Response { diff --git a/src/protocol/common/src/lib.rs b/src/protocol/common/src/lib.rs index bb0ca1856..2c14c1139 100644 --- a/src/protocol/common/src/lib.rs +++ b/src/protocol/common/src/lib.rs @@ -21,8 +21,12 @@ pub trait Compose { } } +pub trait IntoBuffers { + fn into_buffers(self) -> Option>>; +} + pub trait Execute { - fn execute(&mut self, request: &Request) -> Response; + fn execute(&mut self, request: &Request, buffers: &mut Vec>) -> Response; } #[derive(Debug, PartialEq, Eq)] diff --git a/src/protocol/memcache/src/request/get.rs b/src/protocol/memcache/src/request/get.rs index 05314b007..4962d57c2 100644 --- a/src/protocol/memcache/src/request/get.rs +++ b/src/protocol/memcache/src/request/get.rs @@ -108,7 +108,7 @@ impl Klog for Get { let mut miss_keys = 0; for value in res.values() { - if value.len().is_none() { + if value.vlen().is_none() { miss_keys += 1; klog!( @@ -123,7 +123,7 @@ impl Klog for Get { "\"get {}\" {} {}", String::from_utf8_lossy(value.key()), HIT, - value.len().unwrap(), + value.vlen().unwrap(), ); } } diff --git a/src/protocol/memcache/src/request/gets.rs b/src/protocol/memcache/src/request/gets.rs index ae3893c2c..3747012e8 100644 --- a/src/protocol/memcache/src/request/gets.rs +++ b/src/protocol/memcache/src/request/gets.rs @@ -64,7 +64,7 @@ impl Klog for Gets { let mut miss_keys = 0; for value in res.values() { - if value.len().is_none() { + if value.vlen().is_none() { miss_keys += 1; klog!( @@ -79,7 +79,7 @@ impl Klog for Gets { "\"gets {}\" {} {}", String::from_utf8_lossy(value.key()), HIT, - value.len().unwrap(), + value.vlen().unwrap(), ); } } diff --git a/src/protocol/memcache/src/response/mod.rs b/src/protocol/memcache/src/response/mod.rs index 88af9d2b1..722c5b67c 100644 --- a/src/protocol/memcache/src/response/mod.rs +++ b/src/protocol/memcache/src/response/mod.rs @@ -75,7 +75,7 @@ impl Response { Self::NotFound(NotFound::new(noreply)) } - pub fn values(values: Box<[Value]>) -> Self { + pub fn values(values: Vec) -> Self { Self::Values(Values { values }) } @@ -120,6 +120,19 @@ impl Compose for Response { } } +impl IntoBuffers for Response { + fn into_buffers(self) -> Option>> { + match self { + Self::Values(mut e) => { + Some(e.values.drain(..).map(|v| v.into_buf()).collect()) + } + _ => { + None + } + } + } +} + #[derive(Debug, PartialEq, Eq)] pub enum ResponseType { Error, @@ -205,7 +218,7 @@ pub(crate) fn response(input: &[u8]) -> IResult<&[u8], Response> { Ok(( input, Response::Values(Values { - values: Vec::new().into_boxed_slice(), + values: Vec::new(), }), )) } diff --git a/src/protocol/memcache/src/response/values.rs b/src/protocol/memcache/src/response/values.rs index 08694c3eb..216280e26 100644 --- a/src/protocol/memcache/src/response/values.rs +++ b/src/protocol/memcache/src/response/values.rs @@ -6,11 +6,11 @@ use super::*; #[derive(Debug, PartialEq, Eq)] pub struct Values { - pub(crate) values: Box<[Value]>, + pub(crate) values: Vec, } impl Values { - pub fn new(values: Box<[Value]>) -> Self { + pub fn new(values: Vec) -> Self { Self { values } } @@ -21,38 +21,73 @@ impl Values { #[derive(Debug, PartialEq, Eq, Clone)] pub struct Value { - key: Box<[u8]>, + // data holds both key and value (if present) + data: Vec, + klen: usize, + vlen: Option, + flags: u32, cas: Option, - data: Option>, } impl Value { - pub fn new(key: &[u8], flags: u32, cas: Option, data: &[u8]) -> Self { + pub fn new(key: &[u8], flags: u32, cas: Option, value: &[u8]) -> Self { + Self::new_with_buffer( + key, + flags, + cas, + value, + Vec::with_capacity(key.len() + value.len()) + ) + } + + pub fn none(key: &[u8]) -> Self { + Self::none_with_buffer(key, Vec::with_capacity(key.len())) + } + + pub fn new_with_buffer(key: &[u8], flags: u32, cas: Option, value: &[u8], mut buf: Vec) -> Self { + buf.clear(); + buf.reserve(key.len() + value.len()); + buf.extend_from_slice(key); + buf.extend_from_slice(value); + + let klen = key.len(); + let vlen = Some(value.len()); + Self { - key: key.to_owned().into_boxed_slice(), + data: buf, + klen, + vlen, + flags, cas, - data: Some(data.to_owned().into_boxed_slice()), } } - pub fn none(key: &[u8]) -> Self { + pub fn none_with_buffer(key: &[u8], mut buf: Vec) -> Self { + buf.clear(); + buf.extend_from_slice(key); + Self { - key: key.to_owned().into_boxed_slice(), + data: buf, + klen: key.len(), + vlen: None, flags: 0, cas: None, - data: None, } } pub fn key(&self) -> &[u8] { - &self.key + &self.data[0..self.klen] } - #[allow(clippy::len_without_is_empty)] - pub fn len(&self) -> Option { - self.data.as_ref().map(|v| v.len()) + pub fn vlen(&self) -> Option { + self.vlen + } + + pub fn into_buf(mut self) -> Vec { + self.data.clear(); + self.data } } @@ -73,25 +108,26 @@ impl Compose for Values { impl Compose for Value { fn compose(&self, session: &mut dyn BufMut) -> usize { - if self.data.is_none() { + if self.vlen.is_none() { return 0; } - let data = self.data.as_ref().unwrap(); + let key = &self.data[0..self.klen]; + let value = &self.data[self.klen..]; let prefix = b"VALUE "; let header_fields = if let Some(cas) = self.cas { - format!(" {} {} {}\r\n", self.flags, data.len(), cas).into_bytes() + format!(" {} {} {}\r\n", self.flags, value.len(), cas).into_bytes() } else { - format!(" {} {}\r\n", self.flags, data.len()).into_bytes() + format!(" {} {}\r\n", self.flags, value.len()).into_bytes() }; - let size = prefix.len() + self.key.len() + header_fields.len() + data.len() + CRLF.len(); + let size = prefix.len() + key.len() + header_fields.len() + value.len() + CRLF.len(); session.put_slice(prefix); - session.put_slice(&self.key); + session.put_slice(key); session.put_slice(&header_fields); - session.put_slice(data); + session.put_slice(value); session.put_slice(CRLF); size @@ -137,17 +173,12 @@ pub fn parse(input: &[u8]) -> IResult<&[u8], Values> { let (i, _) = space0(input)?; let (i, _) = crlf(i)?; - // we know how many bytes of data, and that its followed by a CRLF - let (i, data) = take(bytes)(i)?; + // we know how many bytes of value, and that its followed by a CRLF + let (i, value) = take(bytes)(i)?; let (i, _) = crlf(i)?; // add to the collection of values - values.push(Value { - key: key.to_owned().into_boxed_slice(), - flags, - cas, - data: Some(data.to_owned().into_boxed_slice()), - }); + values.push(Value::new(key, flags, cas, value)); // look for a space or the start of a CRLF let (i, s) = take_till(|b| (b == b' ' || b == b'\r'))(i)?; @@ -175,7 +206,7 @@ pub fn parse(input: &[u8]) -> IResult<&[u8], Values> { Ok(( input, Values { - values: values.into_boxed_slice(), + values, }, )) } @@ -192,7 +223,7 @@ mod tests { response(b"VALUE 0 0 1\r\n1\r\nEND\r\n"), Ok(( &b""[..], - Response::values(vec![value_0.clone()].into_boxed_slice()), + Response::values(vec![value_0.clone()]), )) ); @@ -202,7 +233,7 @@ mod tests { response(b"VALUE 1 1 1\r\n\0\r\nEND\r\n"), Ok(( &b""[..], - Response::values(vec![value_1.clone()].into_boxed_slice()), + Response::values(vec![value_1.clone()]), )) ); @@ -211,7 +242,7 @@ mod tests { response(b"VALUE 0 0 1\r\n1\r\nVALUE 1 1 1\r\n\0\r\nEND\r\n"), Ok(( &b""[..], - Response::values(vec![value_0, value_1].into_boxed_slice()), + Response::values(vec![value_0, value_1]), )) ); @@ -219,13 +250,13 @@ mod tests { let value_2 = Value::new(b"2", 100, Some(42), b""); assert_eq!( response(b"VALUE 2 100 0 42\r\n\r\nEND\r\n"), - Ok((&b""[..], Response::values(vec![value_2].into_boxed_slice()),)) + Ok((&b""[..], Response::values(vec![value_2]),)) ); // empty values response assert_eq!( response(b"END\r\n"), - Ok((&b""[..], Response::values(vec![].into_boxed_slice()),)) + Ok((&b""[..], Response::values(vec![]),)) ); } } diff --git a/src/protocol/memcache/src/storage/mod.rs b/src/protocol/memcache/src/storage/mod.rs index 2bf54e745..690d35181 100644 --- a/src/protocol/memcache/src/storage/mod.rs +++ b/src/protocol/memcache/src/storage/mod.rs @@ -11,7 +11,7 @@ pub trait Storage { fn decr(&mut self, request: &Decr) -> Response; fn delete(&mut self, request: &Delete) -> Response; fn flush_all(&mut self, request: &FlushAll) -> Response; - fn get(&mut self, request: &Get) -> Response; + fn get(&mut self, request: &Get, buffers: &mut Vec>) -> Response; fn gets(&mut self, request: &Gets) -> Response; fn incr(&mut self, request: &Incr) -> Response; fn prepend(&mut self, request: &Prepend) -> Response; diff --git a/src/protocol/ping/src/ping/wire/response/mod.rs b/src/protocol/ping/src/ping/wire/response/mod.rs index c509fad87..97ee9a123 100644 --- a/src/protocol/ping/src/ping/wire/response/mod.rs +++ b/src/protocol/ping/src/ping/wire/response/mod.rs @@ -12,9 +12,17 @@ mod parse; #[cfg(test)] mod test; +use protocol_common::IntoBuffers; + pub use parse::Parser as ResponseParser; /// A collection of all possible `Ping` responses pub enum Response { Pong, } + +impl IntoBuffers for Response { + fn into_buffers(self) -> Option>> { + None + } +} \ No newline at end of file diff --git a/src/session/src/server.rs b/src/session/src/server.rs index 7577e1542..1d4c3ec59 100644 --- a/src/session/src/server.rs +++ b/src/session/src/server.rs @@ -85,7 +85,7 @@ where } /// Send a message to the session buffer. - pub fn send(&mut self, tx: Tx) -> Result { + pub fn send(&mut self, tx: &Tx) -> Result { SESSION_SEND.increment(); let timestamp = self.pending.pop_front(); From 317a6c6119273162fc25d94b4f4917000d1ddff0 Mon Sep 17 00:00:00 2001 From: Brian Martin Date: Tue, 11 Oct 2022 09:41:25 -0700 Subject: [PATCH 7/9] rustfmt --- src/core/server/src/lib.rs | 2 +- src/entrystore/src/seg/memcache.rs | 10 ++++-- src/protocol/memcache/src/response/mod.rs | 15 ++------- src/protocol/memcache/src/response/values.rs | 32 +++++++------------ .../ping/src/ping/wire/response/mod.rs | 2 +- 5 files changed, 25 insertions(+), 36 deletions(-) diff --git a/src/core/server/src/lib.rs b/src/core/server/src/lib.rs index 4b8f89946..b66baaf26 100644 --- a/src/core/server/src/lib.rs +++ b/src/core/server/src/lib.rs @@ -101,7 +101,7 @@ use core::time::Duration; use crossbeam_channel::{bounded, Sender}; use entrystore::EntryStore; use logger::{Drain, Klog}; -use protocol_common::{Compose, Execute, Parse, IntoBuffers}; +use protocol_common::{Compose, Execute, IntoBuffers, Parse}; use queues::Queues; use rustcommon_metrics::*; use session::{Buf, ServerSession, Session}; diff --git a/src/entrystore/src/seg/memcache.rs b/src/entrystore/src/seg/memcache.rs index 515d3aa07..22fb059ae 100644 --- a/src/entrystore/src/seg/memcache.rs +++ b/src/entrystore/src/seg/memcache.rs @@ -44,7 +44,13 @@ impl Storage for Seg { let flags = u32::from_be_bytes([o[0], o[1], o[2], o[3]]); match item.value() { seg::Value::Bytes(value) => { - values.push(Value::new_with_buffer(item.key(), flags, None, value, buffer)); + values.push(Value::new_with_buffer( + item.key(), + flags, + None, + value, + buffer, + )); } seg::Value::U64(value) => { values.push(Value::new_with_buffer( @@ -52,7 +58,7 @@ impl Storage for Seg { flags, None, format!("{}", value).as_bytes(), - buffer + buffer, )); } } diff --git a/src/protocol/memcache/src/response/mod.rs b/src/protocol/memcache/src/response/mod.rs index 722c5b67c..84adb93fb 100644 --- a/src/protocol/memcache/src/response/mod.rs +++ b/src/protocol/memcache/src/response/mod.rs @@ -123,12 +123,8 @@ impl Compose for Response { impl IntoBuffers for Response { fn into_buffers(self) -> Option>> { match self { - Self::Values(mut e) => { - Some(e.values.drain(..).map(|v| v.into_buf()).collect()) - } - _ => { - None - } + Self::Values(mut e) => Some(e.values.drain(..).map(|v| v.into_buf()).collect()), + _ => None, } } } @@ -215,12 +211,7 @@ pub(crate) fn response(input: &[u8]) -> IResult<&[u8], Response> { // this is for empty set of values, incidated by "END" (input, ResponseType::Empty) => { let (input, _) = crlf(input)?; - Ok(( - input, - Response::Values(Values { - values: Vec::new(), - }), - )) + Ok((input, Response::Values(Values { values: Vec::new() }))) } // this is for numeric responses from incr/decr (input, ResponseType::Numeric(value)) => { diff --git a/src/protocol/memcache/src/response/values.rs b/src/protocol/memcache/src/response/values.rs index 216280e26..9311b6a52 100644 --- a/src/protocol/memcache/src/response/values.rs +++ b/src/protocol/memcache/src/response/values.rs @@ -37,7 +37,7 @@ impl Value { flags, cas, value, - Vec::with_capacity(key.len() + value.len()) + Vec::with_capacity(key.len() + value.len()), ) } @@ -45,7 +45,13 @@ impl Value { Self::none_with_buffer(key, Vec::with_capacity(key.len())) } - pub fn new_with_buffer(key: &[u8], flags: u32, cas: Option, value: &[u8], mut buf: Vec) -> Self { + pub fn new_with_buffer( + key: &[u8], + flags: u32, + cas: Option, + value: &[u8], + mut buf: Vec, + ) -> Self { buf.clear(); buf.reserve(key.len() + value.len()); buf.extend_from_slice(key); @@ -203,12 +209,7 @@ pub fn parse(input: &[u8]) -> IResult<&[u8], Values> { } } - Ok(( - input, - Values { - values, - }, - )) + Ok((input, Values { values })) } #[cfg(test)] @@ -221,29 +222,20 @@ mod tests { let value_0 = Value::new(b"0", 0, None, b"1"); assert_eq!( response(b"VALUE 0 0 1\r\n1\r\nEND\r\n"), - Ok(( - &b""[..], - Response::values(vec![value_0.clone()]), - )) + Ok((&b""[..], Response::values(vec![value_0.clone()]),)) ); // binary data for the value let value_1 = Value::new(b"1", 1, None, b"\0"); assert_eq!( response(b"VALUE 1 1 1\r\n\0\r\nEND\r\n"), - Ok(( - &b""[..], - Response::values(vec![value_1.clone()]), - )) + Ok((&b""[..], Response::values(vec![value_1.clone()]),)) ); // two values in the same response assert_eq!( response(b"VALUE 0 0 1\r\n1\r\nVALUE 1 1 1\r\n\0\r\nEND\r\n"), - Ok(( - &b""[..], - Response::values(vec![value_0, value_1]), - )) + Ok((&b""[..], Response::values(vec![value_0, value_1]),)) ); // a value with zero-length data and a cas value diff --git a/src/protocol/ping/src/ping/wire/response/mod.rs b/src/protocol/ping/src/ping/wire/response/mod.rs index 97ee9a123..2b137f313 100644 --- a/src/protocol/ping/src/ping/wire/response/mod.rs +++ b/src/protocol/ping/src/ping/wire/response/mod.rs @@ -25,4 +25,4 @@ impl IntoBuffers for Response { fn into_buffers(self) -> Option>> { None } -} \ No newline at end of file +} From 952eef41b2c5e4f8d02f83e74d1ad2e6afe10045 Mon Sep 17 00:00:00 2001 From: Brian Martin Date: Fri, 14 Oct 2022 14:53:38 -0700 Subject: [PATCH 8/9] move this allocation onto the stack with arrayvec --- Cargo.lock | 2 ++ Cargo.toml | 1 + src/entrystore/Cargo.toml | 1 + src/entrystore/src/seg/memcache.rs | 8 ++++---- src/protocol/memcache/Cargo.toml | 1 + src/protocol/memcache/src/lib.rs | 2 ++ src/protocol/memcache/src/response/mod.rs | 10 ++++++++-- src/protocol/memcache/src/response/values.rs | 20 ++++++++++++++++---- 8 files changed, 35 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index afcf47939..a7587be49 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -696,6 +696,7 @@ checksum = "90e5c1c8368803113bf0c9584fc495a58b86dc8a29edbf8fe877d21d9507e797" name = "entrystore" version = "0.3.0" dependencies = [ + "arrayvec 0.7.2", "common", "config", "protocol-common", @@ -1762,6 +1763,7 @@ dependencies = [ name = "protocol-memcache" version = "0.3.0" dependencies = [ + "arrayvec 0.7.2", "common", "criterion 0.3.6", "logger", diff --git a/Cargo.toml b/Cargo.toml index 89f1866fb..c7e7885e6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,7 @@ members = [ [workspace.dependencies] ahash = "0.8.0" +arrayvec = "0.7.2" backtrace = "0.3.66" bitvec = "1.0.1" blake3 = "1.3.1" diff --git a/src/entrystore/Cargo.toml b/src/entrystore/Cargo.toml index 2ba8c4c1e..7f2d6cea8 100644 --- a/src/entrystore/Cargo.toml +++ b/src/entrystore/Cargo.toml @@ -13,6 +13,7 @@ license = { workspace = true } debug = ["seg/debug"] [dependencies] +arrayvec = { workspace = true } common = { path = "../common" } config = { path = "../config" } protocol-common = { path = "../protocol/common" } diff --git a/src/entrystore/src/seg/memcache.rs b/src/entrystore/src/seg/memcache.rs index 22fb059ae..ac420f930 100644 --- a/src/entrystore/src/seg/memcache.rs +++ b/src/entrystore/src/seg/memcache.rs @@ -34,7 +34,7 @@ impl Execute for Seg { impl Storage for Seg { fn get(&mut self, get: &Get, buffers: &mut Vec>) -> Response { - let mut values = Vec::with_capacity(get.keys().len()); + let mut values = Values::new(); for key in get.keys().iter() { let buffer = buffers.pop().unwrap_or_default(); @@ -67,11 +67,11 @@ impl Storage for Seg { } } - Values::new(values).into() + values.into() } fn gets(&mut self, get: &Gets) -> Response { - let mut values = Vec::with_capacity(get.keys().len()); + let mut values = Values::new(); for key in get.keys().iter() { if let Some(item) = self.data.get(key) { let o = item.optional().unwrap_or(&[0, 0, 0, 0]); @@ -93,7 +93,7 @@ impl Storage for Seg { values.push(Value::none(key)); } } - Values::new(values).into() + values.into() } fn set(&mut self, set: &Set) -> Response { diff --git a/src/protocol/memcache/Cargo.toml b/src/protocol/memcache/Cargo.toml index f13196462..9598307e7 100644 --- a/src/protocol/memcache/Cargo.toml +++ b/src/protocol/memcache/Cargo.toml @@ -13,6 +13,7 @@ path = "benches/request_parsing.rs" harness = false [dependencies] +arrayvec = { workspace = true } common = { path = "../../common" } logger = { path = "../../logger" } nom = { workspace = true } diff --git a/src/protocol/memcache/src/lib.rs b/src/protocol/memcache/src/lib.rs index 0f42eec93..94cc263c1 100644 --- a/src/protocol/memcache/src/lib.rs +++ b/src/protocol/memcache/src/lib.rs @@ -30,6 +30,8 @@ pub enum MemcacheError { ServerError(ServerError), } +const MAX_BATCH_SIZE: usize = 128; + type Instant = common::time::Instant>; counter!(GET); diff --git a/src/protocol/memcache/src/response/mod.rs b/src/protocol/memcache/src/response/mod.rs index 84adb93fb..783ea6d21 100644 --- a/src/protocol/memcache/src/response/mod.rs +++ b/src/protocol/memcache/src/response/mod.rs @@ -2,6 +2,7 @@ // Licensed under the Apache License, Version 2.0 // http://www.apache.org/licenses/LICENSE-2.0 +use arrayvec::ArrayVec; use crate::*; use protocol_common::{BufMut, Parse, ParseOk}; @@ -76,7 +77,12 @@ impl Response { } pub fn values(values: Vec) -> Self { - Self::Values(Values { values }) + let mut values = values; + let mut v = ArrayVec::new(); + for value in values.drain(..) { + v.push(value) + } + Self::Values(Values { values: v }) } pub fn hangup() -> Self { @@ -211,7 +217,7 @@ pub(crate) fn response(input: &[u8]) -> IResult<&[u8], Response> { // this is for empty set of values, incidated by "END" (input, ResponseType::Empty) => { let (input, _) = crlf(input)?; - Ok((input, Response::Values(Values { values: Vec::new() }))) + Ok((input, Response::Values(Values::new()))) } // this is for numeric responses from incr/decr (input, ResponseType::Numeric(value)) => { diff --git a/src/protocol/memcache/src/response/values.rs b/src/protocol/memcache/src/response/values.rs index 9311b6a52..804101740 100644 --- a/src/protocol/memcache/src/response/values.rs +++ b/src/protocol/memcache/src/response/values.rs @@ -4,19 +4,31 @@ use super::*; +use arrayvec::ArrayVec; + #[derive(Debug, PartialEq, Eq)] pub struct Values { - pub(crate) values: Vec, + pub(crate) values: ArrayVec, +} + +impl Default for Values { + fn default() -> Self { + Self { values: ArrayVec::new() } + } } impl Values { - pub fn new(values: Vec) -> Self { - Self { values } + pub fn new() -> Self { + Self::default() } pub fn values(&self) -> &[Value] { &self.values } + + pub fn push(&mut self, value: Value) { + self.values.push(value) + } } #[derive(Debug, PartialEq, Eq, Clone)] @@ -141,7 +153,7 @@ impl Compose for Value { } pub fn parse(input: &[u8]) -> IResult<&[u8], Values> { - let mut values = Vec::new(); + let mut values = ArrayVec::new(); let mut input = input; loop { let (i, _) = space1(input)?; From da95e2f3d9112e0028974a52a825e1381042bd18 Mon Sep 17 00:00:00 2001 From: Brian Martin Date: Tue, 18 Oct 2022 08:50:05 -0700 Subject: [PATCH 9/9] rustfmt --- src/protocol/memcache/src/response/mod.rs | 2 +- src/protocol/memcache/src/response/values.rs | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/protocol/memcache/src/response/mod.rs b/src/protocol/memcache/src/response/mod.rs index 783ea6d21..bdf9eaa3d 100644 --- a/src/protocol/memcache/src/response/mod.rs +++ b/src/protocol/memcache/src/response/mod.rs @@ -2,8 +2,8 @@ // Licensed under the Apache License, Version 2.0 // http://www.apache.org/licenses/LICENSE-2.0 -use arrayvec::ArrayVec; use crate::*; +use arrayvec::ArrayVec; use protocol_common::{BufMut, Parse, ParseOk}; mod client_error; diff --git a/src/protocol/memcache/src/response/values.rs b/src/protocol/memcache/src/response/values.rs index 804101740..5dead10fb 100644 --- a/src/protocol/memcache/src/response/values.rs +++ b/src/protocol/memcache/src/response/values.rs @@ -13,7 +13,9 @@ pub struct Values { impl Default for Values { fn default() -> Self { - Self { values: ArrayVec::new() } + Self { + values: ArrayVec::new(), + } } }