Skip to content

Commit

Permalink
LRU statement cache for MySQL
Browse files Browse the repository at this point in the history
  • Loading branch information
Julius de Bruijn committed Jun 24, 2020
1 parent 72c4e04 commit 54d6828
Show file tree
Hide file tree
Showing 14 changed files with 173 additions and 9 deletions.
16 changes: 16 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions sqlx-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,4 @@ url = { version = "2.1.1", default-features = false }
uuid = { version = "0.8.1", default-features = false, optional = true, features = [ "std" ] }
whoami = "0.8.1"
stringprep = "0.1.2"
lru-cache = "0.1.2"
13 changes: 13 additions & 0 deletions sqlx-core/src/caching_connection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use futures_core::future::BoxFuture;

use crate::error::Error;

/// A connection that is capable of caching prepared statements.
pub trait CachingConnection: Send {
/// The number of statements currently cached in the connection.
fn cached_statements_count(&self) -> usize;

/// Removes all statements from the cache, closing them on the server if
/// needed.
fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>>;
}
3 changes: 3 additions & 0 deletions sqlx-core/src/common/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
mod statement_cache;

pub(crate) use statement_cache::StatementCache;
50 changes: 50 additions & 0 deletions sqlx-core/src/common/statement_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use lru_cache::LruCache;

/// A cache for prepared statements. When full, the least recently used
/// statement gets removed.
#[derive(Debug)]
pub struct StatementCache {
inner: LruCache<String, u32>,
}

impl StatementCache {
/// Create a new cache with the given capacity.
pub fn new(capacity: usize) -> Self {
Self {
inner: LruCache::new(capacity),
}
}

/// Returns a mutable reference to the value corresponding to the given key
/// in the cache, if any.
pub fn get_mut(&mut self, k: &str) -> Option<&mut u32> {
self.inner.get_mut(k)
}

/// Inserts a new statement to the cache, returning the least recently used
/// statement id if the cache is full, or if inserting with an existing key,
/// the replaced existing statement.
pub fn insert(&mut self, k: &str, v: u32) -> Option<u32> {
let mut lru_item = None;

if self.inner.capacity() == self.len() && !self.inner.contains_key(k) {
lru_item = self.remove_lru();
} else if self.inner.contains_key(k) {
lru_item = self.inner.remove(k);
}

self.inner.insert(k.into(), v);

lru_item
}

/// The number of statements in the cache.
pub fn len(&self) -> usize {
self.inner.len()
}

/// Removes the least recently used item from the cache.
pub fn remove_lru(&mut self) -> Option<u32> {
self.inner.remove_lru().map(|(_, v)| v)
}
}
2 changes: 2 additions & 0 deletions sqlx-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@ pub mod arguments;
pub mod pool;

pub mod connection;
pub mod caching_connection;

#[macro_use]
pub mod transaction;

#[macro_use]
pub mod encode;

mod common;
pub mod database;
pub mod decode;
pub mod describe;
Expand Down
4 changes: 2 additions & 2 deletions sqlx-core/src/mysql/connection/establish.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use bytes::Bytes;
use hashbrown::HashMap;

use crate::common::StatementCache;
use crate::error::Error;
use crate::mysql::connection::{tls, MySqlStream, COLLATE_UTF8MB4_UNICODE_CI, MAX_PACKET_SIZE};
use crate::mysql::protocol::connect::{
Expand Down Expand Up @@ -98,7 +98,7 @@ impl MySqlConnection {

Ok(Self {
stream,
cache_statement: HashMap::new(),
cache_statement: StatementCache::new(options.statement_cache_size),
scratch_row_columns: Default::default(),
scratch_row_column_names: Default::default(),
})
Expand Down
12 changes: 7 additions & 5 deletions sqlx-core/src/mysql/connection/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::mysql::connection::stream::Busy;
use crate::mysql::io::MySqlBufExt;
use crate::mysql::protocol::response::Status;
use crate::mysql::protocol::statement::{
BinaryRow, Execute as StatementExecute, Prepare, PrepareOk,
BinaryRow, Execute as StatementExecute, Prepare, PrepareOk, StmtClose,
};
use crate::mysql::protocol::text::{ColumnDefinition, ColumnFlags, Query, TextRow};
use crate::mysql::protocol::Packet;
Expand All @@ -26,8 +26,8 @@ use crate::mysql::{

impl MySqlConnection {
async fn prepare(&mut self, query: &str) -> Result<u32, Error> {
if let Some(&statement) = self.cache_statement.get(query) {
return Ok(statement);
if let Some(statement) = self.cache_statement.get_mut(query) {
return Ok(*statement);
}

// https://dev.mysql.com/doc/internals/en/com-stmt-prepare.html
Expand Down Expand Up @@ -60,8 +60,10 @@ impl MySqlConnection {
self.stream.maybe_recv_eof().await?;
}

self.cache_statement
.insert(query.to_owned(), ok.statement_id);
// in case of the cache being full, close the least recently used statement
if let Some(statement) = self.cache_statement.insert(query, ok.statement_id) {
self.stream.send_packet(StmtClose { statement }).await?;
}

Ok(ok.statement_id)
}
Expand Down
21 changes: 20 additions & 1 deletion sqlx-core/src/mysql/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@ use futures_core::future::BoxFuture;
use futures_util::FutureExt;
use hashbrown::HashMap;

use crate::caching_connection::CachingConnection;
use crate::common::StatementCache;
use crate::connection::{Connect, Connection};
use crate::error::Error;
use crate::executor::Executor;
use crate::ext::ustr::UStr;
use crate::mysql::protocol::statement::StmtClose;
use crate::mysql::protocol::text::{Ping, Quit};
use crate::mysql::row::MySqlColumn;
use crate::mysql::{MySql, MySqlConnectOptions};
Expand All @@ -34,7 +37,7 @@ pub struct MySqlConnection {
pub(crate) stream: MySqlStream,

// cache by query string to the statement id
cache_statement: HashMap<String, u32>,
cache_statement: StatementCache,

// working memory for the active row's column information
// this allows us to re-use these allocations unless the user is persisting the
Expand All @@ -43,6 +46,22 @@ pub struct MySqlConnection {
scratch_row_column_names: Arc<HashMap<UStr, usize>>,
}

impl CachingConnection for MySqlConnection {
fn cached_statements_count(&self) -> usize {
self.cache_statement.len()
}

fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> {
Box::pin(async move {
while let Some(statement) = self.cache_statement.remove_lru() {
self.stream.send_packet(StmtClose { statement }).await?;
}

Ok(())
})
}
}

impl Debug for MySqlConnection {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("MySqlConnection").finish()
Expand Down
17 changes: 17 additions & 0 deletions sqlx-core/src/mysql/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ pub struct MySqlConnectOptions {
pub(crate) database: Option<String>,
pub(crate) ssl_mode: MySqlSslMode,
pub(crate) ssl_ca: Option<PathBuf>,
pub(crate) statement_cache_size: usize,
}

impl Default for MySqlConnectOptions {
Expand All @@ -120,6 +121,7 @@ impl MySqlConnectOptions {
database: None,
ssl_mode: MySqlSslMode::Preferred,
ssl_ca: None,
statement_cache_size: 100,
}
}

Expand Down Expand Up @@ -190,6 +192,17 @@ impl MySqlConnectOptions {
self.ssl_ca = Some(file_name.as_ref().to_owned());
self
}

/// Sets the size of the connection's statement cache in a number of stored
/// distinct statements. Caching is handled using LRU, meaning when the
/// amount of queries hits the defined limit, the oldest statement will get
/// dropped.
///
/// The default cache size is 100 statements.
pub fn statement_cache_size(mut self, size: usize) -> Self {
self.statement_cache_size = size;
self
}
}

impl FromStr for MySqlConnectOptions {
Expand Down Expand Up @@ -231,6 +244,10 @@ impl FromStr for MySqlConnectOptions {
options = options.ssl_ca(&*value);
}

"statement-cache-size" => {
options = options.statement_cache_size(value.parse()?);
}

_ => {}
}
}
Expand Down
2 changes: 2 additions & 0 deletions sqlx-core/src/mysql/protocol/statement/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ mod execute;
mod prepare;
mod prepare_ok;
mod row;
mod stmt_close;

pub(crate) use execute::Execute;
pub(crate) use prepare::Prepare;
pub(crate) use prepare_ok::PrepareOk;
pub(crate) use row::BinaryRow;
pub(crate) use stmt_close::StmtClose;
16 changes: 16 additions & 0 deletions sqlx-core/src/mysql/protocol/statement/stmt_close.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use crate::io::Encode;
use crate::mysql::protocol::Capabilities;

// https://dev.mysql.com/doc/internals/en/com-stmt-close.html

#[derive(Debug)]
pub struct StmtClose {
pub statement: u32,
}

impl Encode<'_, Capabilities> for StmtClose {
fn encode_with(&self, buf: &mut Vec<u8>, _: Capabilities) {
buf.push(0x19); // COM_STMT_CLOSE
buf.extend(&self.statement.to_le_bytes());
}
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#![cfg_attr(docsrs, feature(doc_cfg))]

pub use sqlx_core::arguments::{Arguments, IntoArguments};
pub use sqlx_core::caching_connection::CachingConnection;
pub use sqlx_core::connection::{Connect, Connection};
pub use sqlx_core::database::{self, Database};
pub use sqlx_core::executor::{Execute, Executor};
Expand Down
24 changes: 23 additions & 1 deletion tests/mysql/mysql.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use futures::TryStreamExt;
use sqlx::mysql::{MySql, MySqlPool, MySqlRow};
use sqlx::{Connection, Executor, Row};
use sqlx::{CachingConnection, Connection, Executor, Row};
use sqlx_test::new;

#[sqlx_macros::test]
Expand Down Expand Up @@ -177,3 +177,25 @@ SELECT id, text FROM messages;

Ok(())
}

#[sqlx_macros::test]
async fn it_caches_statements() -> anyhow::Result<()> {
let mut conn = new::<MySql>().await?;

for i in 0..2 {
let row = sqlx::query("SELECT ? AS val")
.bind(i)
.fetch_one(&mut conn)
.await?;

let val: u32 = row.get("val");

assert_eq!(i, val);
}

assert_eq!(1, conn.cached_statements_count());
conn.clear_cached_statements().await?;
assert_eq!(0, conn.cached_statements_count());

Ok(())
}

0 comments on commit 54d6828

Please sign in to comment.