diff --git a/.gitignore b/.gitignore index fb4be9828..27ff2bd97 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ target data staging +limitcache examples cert.pem key.pem diff --git a/Cargo.lock b/Cargo.lock index 8b4fe8cfe..15405c107 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1792,6 +1792,15 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +[[package]] +name = "hashbrown" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e" +dependencies = [ + "ahash 0.8.3", +] + [[package]] name = "hashbrown" version = "0.14.0" @@ -1802,6 +1811,16 @@ dependencies = [ "allocator-api2", ] +[[package]] +name = "hashlru" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec243be29f0218c651d6dc31eafb562c4363b2e96cd42a92b6948964d28f4c5a" +dependencies = [ + "hashbrown 0.13.2", + "serde", +] + [[package]] name = "heck" version = "0.4.1" @@ -1889,6 +1908,12 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" +[[package]] +name = "human-size" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9994b79e8c1a39b3166c63ae7823bb2b00831e2a96a31399c50fe69df408eaeb" + [[package]] name = "humantime" version = "2.1.0" @@ -2671,10 +2696,12 @@ dependencies = [ "fs_extra", "futures", "futures-util", + "hashlru", "hex", "hostname", "http", "http-auth-basic", + "human-size", "humantime", "humantime-serde", "itertools 0.10.5", diff --git a/server/Cargo.toml b/server/Cargo.toml index 890d09cff..a948a39d3 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -96,10 +96,12 @@ xxhash-rust = { version = "0.8", features = ["xxh3"] } xz2 = { version = "*", features = ["static"] } nom = "7.1.3" humantime = "2.1.0" +human-size = "0.4" openid = { version = "0.12.0", default-features = false, features = ["rustls"] } url = "2.4.0" http-auth-basic = "0.3.3" serde_repr = "0.1.17" +hashlru = { version = "0.11.0", features = ["serde"] } [build-dependencies] cargo_toml = "0.15" diff --git a/server/src/banner.rs b/server/src/banner.rs index 8cee4a756..4c2d9f256 100644 --- a/server/src/banner.rs +++ b/server/src/banner.rs @@ -18,6 +18,7 @@ */ use crossterm::style::Stylize; +use human_size::SpecificSize; use crate::about; use crate::utils::uid::Uid; @@ -100,5 +101,21 @@ async fn storage_info(config: &Config) { config.staging_dir().to_string_lossy(), storage.get_endpoint(), latency - ) + ); + + if let Some(path) = &config.parseable.local_cache_path { + let size: SpecificSize = + SpecificSize::new(config.parseable.local_cache_size as f64, human_size::Byte) + .unwrap() + .into(); + + eprintln!( + "\ + {:8}Cache: \"{}\" + Cache Size: \"{}\"", + "", + path.display(), + size + ); + } } diff --git a/server/src/handlers/http.rs b/server/src/handlers/http.rs index dfefd2ace..06fbf1221 100644 --- a/server/src/handlers/http.rs +++ b/server/src/handlers/http.rs @@ -200,6 +200,21 @@ pub fn configure_routes( .to(logstream::get_retention) .authorize_for_stream(Action::GetRetention), ), + ) + .service( + web::resource("/cache") + // PUT "/logstream/{logstream}/cache" ==> Set retention for given logstream + .route( + web::put() + .to(logstream::put_enable_cache) + .authorize_for_stream(Action::PutCacheEnabled), + ) + // GET "/logstream/{logstream}/cache" ==> Get retention for given logstream + .route( + web::get() + .to(logstream::get_cache_enabled) + .authorize_for_stream(Action::GetCacheEnabled), + ), ); // User API diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index e1dd1a1cd..97ac52056 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -227,6 +227,33 @@ pub async fn put_retention( )) } +pub async fn get_cache_enabled(req: HttpRequest) -> Result { + let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + let cache_enabled = STREAM_INFO.cache_enabled(&stream_name)?; + Ok((web::Json(cache_enabled), StatusCode::OK)) +} + +pub async fn put_enable_cache( + req: HttpRequest, + body: web::Json, +) -> Result { + let enable_cache = body.into_inner(); + let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + let storage = CONFIG.storage().get_object_store(); + + let mut stream_metadata = storage.get_stream_metadata(&stream_name).await?; + stream_metadata.cache_enabled = enable_cache; + storage + .put_stream_manifest(&stream_name, &stream_metadata) + .await?; + + STREAM_INFO.set_stream_cache(&stream_name, enable_cache)?; + Ok(( + format!("Cache setting updated for log stream {stream_name}"), + StatusCode::OK, + )) +} + pub async fn get_stats(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); diff --git a/server/src/localcache.rs b/server/src/localcache.rs new file mode 100644 index 000000000..d15963f21 --- /dev/null +++ b/server/src/localcache.rs @@ -0,0 +1,242 @@ +/* + * Parseable Server (C) 2022 - 2023 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use std::{io, path::PathBuf}; + +use fs_extra::file::CopyOptions; +use futures_util::TryFutureExt; +use hashlru::Cache; +use itertools::{Either, Itertools}; +use object_store::{local::LocalFileSystem, ObjectStore}; +use once_cell::sync::OnceCell; +use tokio::{fs, sync::Mutex}; + +use crate::option::CONFIG; + +pub const STREAM_CACHE_FILENAME: &str = ".cache.json"; +pub const CACHE_META_FILENAME: &str = ".cache_meta.json"; + +#[derive(Debug, serde::Deserialize, serde::Serialize)] +pub struct LocalCache { + version: String, + current_size: u64, + /// Mapping between storage path and cache path. + files: Cache, +} + +impl LocalCache { + fn new() -> Self { + Self { + version: "v1".to_string(), + current_size: 0, + files: Cache::new(100), + } + } +} + +#[derive(Debug, serde::Deserialize, serde::Serialize)] +pub struct CacheMeta { + version: String, + size_capacity: u64, +} + +impl CacheMeta { + fn new() -> Self { + Self { + version: "v1".to_string(), + size_capacity: 0, + } + } +} + +pub struct LocalCacheManager { + filesystem: LocalFileSystem, + cache_path: PathBuf, + cache_capacity: u64, + copy_options: CopyOptions, + semaphore: Mutex<()>, +} + +impl LocalCacheManager { + pub fn global() -> Option<&'static LocalCacheManager> { + static INSTANCE: OnceCell = OnceCell::new(); + + let cache_path = CONFIG.parseable.local_cache_path.as_ref()?; + + Some(INSTANCE.get_or_init(|| { + let cache_path = cache_path.clone(); + std::fs::create_dir_all(&cache_path).unwrap(); + LocalCacheManager { + filesystem: LocalFileSystem::new(), + cache_path, + cache_capacity: CONFIG.parseable.local_cache_size, + copy_options: CopyOptions { + overwrite: true, + skip_exist: false, + ..CopyOptions::new() + }, + semaphore: Mutex::new(()), + } + })) + } + + pub async fn validate(&self, config_capacity: u64) -> Result<(), CacheError> { + fs::create_dir_all(&self.cache_path).await?; + let path = cache_meta_path(&self.cache_path).unwrap(); + let resp = self + .filesystem + .get(&path) + .and_then(|resp| resp.bytes()) + .await; + + let updated_cache = match resp { + Ok(bytes) => { + let mut meta: CacheMeta = serde_json::from_slice(&bytes)?; + if !meta.size_capacity == config_capacity { + meta.size_capacity = config_capacity; + Some(meta) + } else { + None + } + } + Err(object_store::Error::NotFound { .. }) => { + let mut meta = CacheMeta::new(); + meta.size_capacity = config_capacity; + Some(meta) + } + Err(err) => return Err(err.into()), + }; + + if let Some(updated_cache) = updated_cache { + log::info!( + "Cache is updated to new size of {} Bytes", + &updated_cache.size_capacity + ); + self.filesystem + .put(&path, serde_json::to_vec(&updated_cache)?.into()) + .await? + } + + Ok(()) + } + + pub async fn get_cache(&self, stream: &str) -> Result { + let path = cache_file_path(&self.cache_path, stream).unwrap(); + let res = self + .filesystem + .get(&path) + .and_then(|resp| resp.bytes()) + .await; + let cache = match res { + Ok(bytes) => serde_json::from_slice(&bytes)?, + Err(object_store::Error::NotFound { .. }) => LocalCache::new(), + Err(err) => return Err(err.into()), + }; + Ok(cache) + } + + pub async fn put_cache(&self, stream: &str, cache: &LocalCache) -> Result<(), CacheError> { + let path = cache_file_path(&self.cache_path, stream).unwrap(); + let bytes = serde_json::to_vec(cache)?.into(); + Ok(self.filesystem.put(&path, bytes).await?) + } + + pub async fn move_to_cache( + &self, + stream: &str, + key: String, + staging_path: PathBuf, + ) -> Result<(), CacheError> { + let lock = self.semaphore.lock().await; + let mut cache_path = self.cache_path.join(stream); + fs::create_dir_all(&cache_path).await?; + cache_path.push(staging_path.file_name().unwrap()); + fs_extra::file::move_file(staging_path, &cache_path, &self.copy_options)?; + let file_size = std::fs::metadata(&cache_path)?.len(); + let mut cache = self.get_cache(stream).await?; + + while cache.current_size + file_size > self.cache_capacity { + if let Some((_, file_for_removal)) = cache.files.pop_lru() { + let lru_file_size = std::fs::metadata(&file_for_removal)?.len(); + cache.current_size = cache.current_size.saturating_sub(lru_file_size); + log::info!("removing cache entry"); + tokio::spawn(fs::remove_file(file_for_removal)); + } else { + log::error!("Cache size too small"); + break; + } + } + + if cache.files.is_full() { + cache.files.resize(cache.files.capacity() * 2); + } + cache.files.push(key, cache_path); + cache.current_size += file_size; + self.put_cache(stream, &cache).await?; + drop(lock); + Ok(()) + } + + pub async fn partition_on_cached( + &self, + stream: &str, + collection: Vec, + key: fn(&T) -> &String, + ) -> Result<(Vec<(T, PathBuf)>, Vec), CacheError> { + let lock = self.semaphore.lock().await; + let mut cache = self.get_cache(stream).await?; + let (cached, remainder): (Vec<_>, Vec<_>) = collection.into_iter().partition_map(|item| { + let key = key(&item); + match cache.files.get(key).cloned() { + Some(path) => Either::Left((item, path)), + None => Either::Right(item), + } + }); + self.put_cache(stream, &cache).await?; + drop(lock); + Ok((cached, remainder)) + } +} + +fn cache_file_path( + root: impl AsRef, + stream: &str, +) -> Result { + let mut path = root.as_ref().join(stream); + path.push(STREAM_CACHE_FILENAME); + object_store::path::Path::from_absolute_path(path) +} + +fn cache_meta_path( + root: impl AsRef, +) -> Result { + let path = root.as_ref().join(CACHE_META_FILENAME); + object_store::path::Path::from_absolute_path(path) +} + +#[derive(Debug, thiserror::Error)] +pub enum CacheError { + #[error("{0}")] + Serde(#[from] serde_json::Error), + #[error("{0}")] + IOError(#[from] io::Error), + #[error("{0}")] + MoveError(#[from] fs_extra::error::Error), + #[error("{0}")] + ObjectStoreError(#[from] object_store::Error), +} diff --git a/server/src/main.rs b/server/src/main.rs index 0d2bee0b3..6856e587e 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -33,6 +33,7 @@ mod catalog; mod event; mod handlers; mod livetail; +mod localcache; mod metadata; mod metrics; mod migration; @@ -48,6 +49,8 @@ mod validator; use option::CONFIG; +use crate::localcache::LocalCacheManager; + #[actix_web::main] async fn main() -> anyhow::Result<()> { env_logger::init(); @@ -59,6 +62,11 @@ async fn main() -> anyhow::Result<()> { banner::print(&CONFIG, &metadata).await; rbac::map::init(&metadata); metadata.set_global(); + if let Some(cache_manager) = LocalCacheManager::global() { + cache_manager + .validate(CONFIG.parseable.local_cache_size) + .await?; + }; let prometheus = metrics::build_metrics_handler(); CONFIG.storage().register_store_metrics(&prometheus); diff --git a/server/src/metadata.rs b/server/src/metadata.rs index 81855f5c9..5bdfdb515 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -42,6 +42,7 @@ pub struct StreamInfo(RwLock>); pub struct LogStreamMetadata { pub schema: HashMap>, pub alerts: Alerts, + pub cache_enabled: bool, } // It is very unlikely that panic will occur when dealing with metadata. @@ -80,6 +81,22 @@ impl StreamInfo { Ok(!self.schema(stream_name)?.fields.is_empty()) } + pub fn cache_enabled(&self, stream_name: &str) -> Result { + let map = self.read().expect(LOCK_EXPECT); + map.get(stream_name) + .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) + .map(|metadata| metadata.cache_enabled) + } + + pub fn set_stream_cache(&self, stream_name: &str, enable: bool) -> Result<(), MetadataError> { + let mut map = self.write().expect(LOCK_EXPECT); + let stream = map + .get_mut(stream_name) + .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))?; + stream.cache_enabled = enable; + Ok(()) + } + pub fn schema(&self, stream_name: &str) -> Result, MetadataError> { let map = self.read().expect(LOCK_EXPECT); let schema = map @@ -131,6 +148,7 @@ impl StreamInfo { for stream in storage.list_streams().await? { let alerts = storage.get_alerts(&stream.name).await?; let schema = storage.get_schema(&stream.name).await?; + let meta = storage.get_stream_metadata(&stream.name).await?; let schema = update_schema_from_staging(&stream.name, schema); let schema = HashMap::from_iter( @@ -140,7 +158,11 @@ impl StreamInfo { .map(|v| (v.name().to_owned(), v.clone())), ); - let metadata = LogStreamMetadata { schema, alerts }; + let metadata = LogStreamMetadata { + schema, + alerts, + cache_enabled: meta.cache_enabled, + }; let mut map = self.write().expect(LOCK_EXPECT); diff --git a/server/src/metrics/mod.rs b/server/src/metrics/mod.rs index cc51bbc97..78a05e5a3 100644 --- a/server/src/metrics/mod.rs +++ b/server/src/metrics/mod.rs @@ -67,6 +67,14 @@ pub static QUERY_EXECUTE_TIME: Lazy = Lazy::new(|| { .expect("metric can be created") }); +pub static QUERY_CACHE_HIT: Lazy = Lazy::new(|| { + IntCounterVec::new( + Opts::new("QUERY_CACHE_HIT", "Full Cache hit").namespace(METRICS_NAMESPACE), + &["stream"], + ) + .expect("metric can be created") +}); + pub static ALERTS_STATES: Lazy = Lazy::new(|| { IntCounterVec::new( Opts::new("alerts_states", "Alerts States").namespace(METRICS_NAMESPACE), @@ -91,6 +99,9 @@ fn custom_metrics(registry: &Registry) { registry .register(Box::new(QUERY_EXECUTE_TIME.clone())) .expect("metric can be registered"); + registry + .register(Box::new(QUERY_CACHE_HIT.clone())) + .expect("metric can be registered"); registry .register(Box::new(ALERTS_STATES.clone())) .expect("metric can be registered"); diff --git a/server/src/option.rs b/server/src/option.rs index 01ac2b832..da33b3178 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -29,6 +29,8 @@ use crate::oidc::{self, OpenidConfig}; use crate::storage::{FSConfig, ObjectStorageProvider, S3Config, LOCAL_SYNC_INTERVAL}; use crate::utils::validate_path_is_writeable; +pub const MIN_CACHE_SIZE_BYTES: u64 = 1000u64.pow(3); // 1 GiB + pub static CONFIG: Lazy> = Lazy::new(|| Arc::new(Config::new())); #[derive(Debug)] @@ -62,6 +64,15 @@ impl Config { .exit() } + if server.local_cache_path.is_some() { + parseable_cli_command() + .error( + ErrorKind::ValueValidation, + "Cannot use cache with local-store subcommand.", + ) + .exit() + } + Config { parseable: server, storage: Arc::new(storage), @@ -174,6 +185,12 @@ pub struct Server { /// for incoming events and local cache pub local_staging_path: PathBuf, + /// The local cache path is used for speeding up query on latest data + pub local_cache_path: Option, + + /// Size for local cache + pub local_cache_size: u64, + /// Interval in seconds after which uncommited data would be /// uploaded to the storage platform. pub upload_interval: u64, @@ -220,6 +237,7 @@ impl FromArgMatches for Server { } fn update_from_arg_matches(&mut self, m: &clap::ArgMatches) -> Result<(), clap::Error> { + self.local_cache_path = m.get_one::(Self::CACHE).cloned(); self.tls_cert_path = m.get_one::(Self::TLS_CERT).cloned(); self.tls_key_path = m.get_one::(Self::TLS_KEY).cloned(); self.domain_address = m.get_one::(Self::DOMAIN_URI).cloned(); @@ -235,6 +253,10 @@ impl FromArgMatches for Server { .get_one::(Self::STAGING) .cloned() .expect("default value for staging"); + self.local_cache_size = m + .get_one::(Self::CACHE_SIZE) + .cloned() + .expect("default value for cache size"); self.upload_interval = m .get_one::(Self::UPLOAD_INTERVAL) .cloned() @@ -319,6 +341,8 @@ impl Server { pub const ADDRESS: &'static str = "address"; pub const DOMAIN_URI: &'static str = "origin"; pub const STAGING: &'static str = "local-staging-path"; + pub const CACHE: &'static str = "cache-path"; + pub const CACHE_SIZE: &'static str = "cache-size"; pub const UPLOAD_INTERVAL: &'static str = "upload-interval"; pub const USERNAME: &'static str = "username"; pub const PASSWORD: &'static str = "password"; @@ -384,6 +408,25 @@ impl Server { .value_parser(validation::canonicalize_path) .help("The local staging path is used as a temporary landing point for incoming events and local cache") .next_line_help(true), + ) + .arg( + Arg::new(Self::CACHE) + .long(Self::CACHE) + .env("P_CACHE_DIR") + .value_name("DIR") + .value_parser(validation::canonicalize_path) + .help("Local path to be used for caching latest files") + .next_line_help(true), + ) + .arg( + Arg::new(Self::CACHE_SIZE) + .long(Self::CACHE_SIZE) + .env("P_CACHE_SIZE") + .value_name("size") + .default_value("1Gib") + .value_parser(validation::human_size_to_bytes) + .help("Size for cache in human readable format (e.g 1GiB, 2GiB, 100MB)") + .next_line_help(true), ) .arg( Arg::new(Self::UPLOAD_INTERVAL) @@ -569,8 +612,13 @@ pub mod validation { fs::{canonicalize, create_dir_all}, net::ToSocketAddrs, path::PathBuf, + str::FromStr, }; + use human_size::SpecificSize; + + use crate::option::MIN_CACHE_SIZE_BYTES; + pub fn file_path(s: &str) -> Result { if s.is_empty() { return Err("empty path".to_owned()); @@ -606,4 +654,29 @@ pub mod validation { pub fn url(s: &str) -> Result { url::Url::parse(s).map_err(|_| "Invalid URL provided".to_string()) } + + pub fn human_size_to_bytes(s: &str) -> Result { + use human_size::multiples; + fn parse_and_map( + s: &str, + ) -> Result { + SpecificSize::::from_str(s).map(|x| x.to_bytes()) + } + + let size = parse_and_map::(s) + .or(parse_and_map::(s)) + .or(parse_and_map::(s)) + .or(parse_and_map::(s)) + .or(parse_and_map::(s)) + .or(parse_and_map::(s)) + .map_err(|_| "Could not parse given size".to_string())?; + + if size < MIN_CACHE_SIZE_BYTES { + return Err( + "Specified value of cache size is smaller than current minimum of 1GiB".to_string(), + ); + } + + Ok(size) + } } diff --git a/server/src/query.rs b/server/src/query.rs index 663873dae..3e17e3c73 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -19,7 +19,6 @@ mod filter_optimizer; mod listing_table_builder; mod stream_schema_provider; -mod table_provider; use chrono::{DateTime, Utc}; use chrono::{NaiveDateTime, TimeZone}; diff --git a/server/src/query/stream_schema_provider.rs b/server/src/query/stream_schema_provider.rs index 14924cf2c..48128f8e0 100644 --- a/server/src/query/stream_schema_provider.rs +++ b/server/src/query/stream_schema_provider.rs @@ -18,6 +18,7 @@ use std::{any::Any, collections::HashMap, ops::Bound, sync::Arc}; +use arrow_array::RecordBatch; use arrow_schema::{Schema, SchemaRef, SortOptions}; use bytes::Bytes; use chrono::{NaiveDateTime, Timelike, Utc}; @@ -31,18 +32,18 @@ use datafusion::{ file_format::{parquet::ParquetFormat, FileFormat}, listing::PartitionedFile, physical_plan::FileScanConfig, - TableProvider, + MemTable, TableProvider, }, error::DataFusionError, execution::{context::SessionState, object_store::ObjectStoreUrl}, logical_expr::{BinaryExpr, Operator, TableProviderFilterPushDown, TableType}, optimizer::utils::conjunction, physical_expr::{create_physical_expr, PhysicalSortExpr}, - physical_plan::{self, ExecutionPlan}, + physical_plan::{self, empty::EmptyExec, union::UnionExec, ExecutionPlan, Statistics}, prelude::{Column, Expr}, scalar::ScalarValue, }; -use futures_util::{stream::FuturesOrdered, StreamExt, TryStreamExt}; +use futures_util::{stream::FuturesOrdered, StreamExt, TryFutureExt, TryStreamExt}; use itertools::Itertools; use object_store::{path::Path, ObjectStore}; use url::Url; @@ -53,12 +54,14 @@ use crate::{ Snapshot, }, event::{self, DEFAULT_TIMESTAMP_KEY}, + localcache::LocalCacheManager, metadata::STREAM_INFO, + metrics::QUERY_CACHE_HIT, option::CONFIG, storage::ObjectStorage, }; -use super::{listing_table_builder::ListingTableBuilder, table_provider::QueryTableProvider}; +use super::listing_table_builder::ListingTableBuilder; // schema provider for stream based on global data pub struct GlobalSchemaProvider { @@ -101,107 +104,101 @@ struct StandardTableProvider { url: Url, } -impl StandardTableProvider { - #[allow(clippy::too_many_arguments)] - async fn remote_physical_plan( - &self, - glob_storage: Arc, - object_store: Arc, - snapshot: &catalog::snapshot::Snapshot, - time_filters: &[PartialTimeFilter], - projection: Option<&Vec>, - filters: &[Expr], - limit: Option, - state: &SessionState, - ) -> Result>, DataFusionError> { - let items = snapshot.manifests(time_filters); - let manifest_files = collect_manifest_files( - object_store, - items - .into_iter() - .sorted_by_key(|file| file.time_lower_bound) - .map(|item| item.manifest_path) - .collect(), - ) - .await?; +#[allow(clippy::too_many_arguments)] +async fn create_parquet_physical_plan( + object_store_url: ObjectStoreUrl, + partitions: Vec>, + statistics: Statistics, + schema: Arc, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + state: &SessionState, +) -> Result, DataFusionError> { + let filters = if let Some(expr) = conjunction(filters.to_vec()) { + let table_df_schema = schema.as_ref().clone().to_dfschema()?; + let filters = + create_physical_expr(&expr, &table_df_schema, &schema, state.execution_props())?; + Some(filters) + } else { + None + }; - let mut manifest_files: Vec<_> = manifest_files - .into_iter() - .flat_map(|file| file.files) - .rev() - .collect(); + let sort_expr = PhysicalSortExpr { + expr: physical_plan::expressions::col(DEFAULT_TIMESTAMP_KEY, &schema)?, + options: SortOptions { + descending: true, + nulls_first: true, + }, + }; - for filter in filters { - manifest_files.retain(|file| !file.can_be_pruned(filter)) - } + let file_format = ParquetFormat::default().with_enable_pruning(Some(true)); + // create the execution plan + let plan = file_format + .create_physical_plan( + state, + FileScanConfig { + object_store_url, + file_schema: schema.clone(), + file_groups: partitions, + statistics, + projection: projection.cloned(), + limit, + output_ordering: vec![vec![sort_expr]], + table_partition_cols: Vec::new(), + infinite_source: false, + }, + filters.as_ref(), + ) + .await?; - if let Some(limit) = limit { - let limit = limit as u64; - let mut curr_limit = 0; - let mut pos = None; - - for (index, file) in manifest_files.iter().enumerate() { - curr_limit += file.num_rows(); - if curr_limit >= limit { - pos = Some(index); - break; - } - } + Ok(plan) +} - if let Some(pos) = pos { - manifest_files.truncate(pos + 1); +async fn collect_from_snapshot( + snapshot: &catalog::snapshot::Snapshot, + time_filters: &[PartialTimeFilter], + object_store: Arc, + filters: &[Expr], + limit: Option, +) -> Result, DataFusionError> { + let items = snapshot.manifests(time_filters); + let manifest_files = collect_manifest_files( + object_store, + items + .into_iter() + .sorted_by_key(|file| file.time_lower_bound) + .map(|item| item.manifest_path) + .collect(), + ) + .await?; + let mut manifest_files: Vec<_> = manifest_files + .into_iter() + .flat_map(|file| file.files) + .rev() + .collect(); + for filter in filters { + manifest_files.retain(|file| !file.can_be_pruned(filter)) + } + if let Some(limit) = limit { + let limit = limit as u64; + let mut curr_limit = 0; + let mut pos = None; + + for (index, file) in manifest_files.iter().enumerate() { + curr_limit += file.num_rows(); + if curr_limit >= limit { + pos = Some(index); + break; } } - if manifest_files.is_empty() { - return Ok(None); + if let Some(pos) = pos { + manifest_files.truncate(pos + 1); } - - let (partitioned_files, statistics) = partitioned_files(manifest_files, &self.schema, 1); - - let filters = if let Some(expr) = conjunction(filters.to_vec()) { - let table_df_schema = self.schema.as_ref().clone().to_dfschema()?; - let filters = create_physical_expr( - &expr, - &table_df_schema, - &self.schema, - state.execution_props(), - )?; - Some(filters) - } else { - None - }; - - let sort_expr = PhysicalSortExpr { - expr: physical_plan::expressions::col(DEFAULT_TIMESTAMP_KEY, &self.schema)?, - options: SortOptions { - descending: true, - nulls_first: true, - }, - }; - - let file_format = ParquetFormat::default().with_enable_pruning(Some(true)); - // create the execution plan - let plan = file_format - .create_physical_plan( - state, - FileScanConfig { - object_store_url: ObjectStoreUrl::parse(&glob_storage.store_url()).unwrap(), - file_schema: self.schema.clone(), - file_groups: partitioned_files, - statistics, - projection: projection.cloned(), - limit, - output_ordering: vec![vec![sort_expr]], - table_partition_cols: Vec::new(), - infinite_source: false, - }, - filters.as_ref(), - ) - .await?; - - Ok(Some(plan)) } + + Ok(manifest_files) } fn partitioned_files( @@ -288,23 +285,32 @@ impl TableProvider for StandardTableProvider { filters: &[Expr], limit: Option, ) -> Result, DataFusionError> { + let mut memory_exec = None; + let mut cache_exec = None; + let time_filters = extract_primary_filter(filters); if time_filters.is_empty() { return Err(DataFusionError::Plan("potentially unbounded query on time range. Table scanning requires atleast one time bound".to_string())); } - let memtable = if include_now(filters) { - event::STREAM_WRITERS.recordbatches_cloned(&self.stream, &self.schema) - } else { - None + if include_now(filters) { + if let Some(records) = + event::STREAM_WRITERS.recordbatches_cloned(&self.stream, &self.schema) + { + let reversed_mem_table = reversed_mem_table(records, self.schema.clone())?; + memory_exec = Some( + reversed_mem_table + .scan(state, projection, filters, limit) + .await?, + ); + } }; - let storage = state + let object_store = state .runtime_env() .object_store_registry .get_store(&self.url) .unwrap(); - let glob_storage = CONFIG.storage().get_object_store(); // Fetch snapshot @@ -313,34 +319,94 @@ impl TableProvider for StandardTableProvider { .await .map_err(|err| DataFusionError::Plan(err.to_string()))?; - let remote_table = if is_overlapping_query(&snapshot.manifest_list, &time_filters) { - // Is query timerange is overlapping with older data. - if let Some(table) = ListingTableBuilder::new(self.stream.clone()) - .populate_via_listing(glob_storage.clone(), storage, &time_filters) - .await? - .build(self.schema.clone(), |x| glob_storage.query_prefixes(x))? - { - Some(table.scan(state, projection, filters, limit).await?) - } else { - None - } - } else { - self.remote_physical_plan( + // Is query timerange is overlapping with older data. + if is_overlapping_query(&snapshot.manifest_list, &time_filters) { + return legacy_listing_table( + self.stream.clone(), + memory_exec, glob_storage, - storage, - &snapshot, + object_store, &time_filters, + self.schema.clone(), + state, + projection, + filters, + limit, + ) + .await; + } + + let mut manifest_files = + collect_from_snapshot(&snapshot, &time_filters, object_store, filters, limit).await?; + + if manifest_files.is_empty() { + return final_plan(vec![memory_exec], projection, self.schema.clone()); + } + + // Based on entries in the manifest files, find them in the cache and create a physical plan. + if let Some(cache_manager) = LocalCacheManager::global() { + let (cached, remainder) = cache_manager + .partition_on_cached(&self.stream, manifest_files, |file| &file.file_path) + .await + .map_err(|err| DataFusionError::External(Box::new(err)))?; + + // Assign remaining entries back to manifest list + // This is to be used for remote query + manifest_files = remainder; + + let cached = cached + .into_iter() + .map(|(mut file, cache_path)| { + let cache_path = + object_store::path::Path::from_absolute_path(cache_path).unwrap(); + file.file_path = cache_path.to_string(); + file + }) + .collect(); + + let (partitioned_files, statistics) = partitioned_files(cached, &self.schema, 1); + let plan = create_parquet_physical_plan( + ObjectStoreUrl::parse("file:///").unwrap(), + partitioned_files, + statistics, + self.schema.clone(), projection, filters, limit, state, ) - .await? - }; + .await?; - QueryTableProvider::try_new(memtable, remote_table, self.schema.clone())? - .scan(state, projection, filters, limit) - .await + cache_exec = Some(plan) + } + + if manifest_files.is_empty() { + QUERY_CACHE_HIT.with_label_values(&[&self.stream]).inc(); + return final_plan( + vec![memory_exec, cache_exec], + projection, + self.schema.clone(), + ); + } + + let (partitioned_files, statistics) = partitioned_files(manifest_files, &self.schema, 1); + let remote_exec = create_parquet_physical_plan( + ObjectStoreUrl::parse(&glob_storage.store_url()).unwrap(), + partitioned_files, + statistics, + self.schema.clone(), + projection, + filters, + limit, + state, + ) + .await?; + + Ok(final_plan( + vec![memory_exec, cache_exec, Some(remote_exec)], + projection, + self.schema.clone(), + )?) } fn supports_filter_pushdown( @@ -358,6 +424,66 @@ impl TableProvider for StandardTableProvider { } } +#[allow(clippy::too_many_arguments)] +async fn legacy_listing_table( + stream: String, + mem_exec: Option>, + glob_storage: Arc, + object_store: Arc, + time_filters: &[PartialTimeFilter], + schema: Arc, + state: &SessionState, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, +) -> Result, DataFusionError> { + let remote_table = ListingTableBuilder::new(stream) + .populate_via_listing(glob_storage.clone(), object_store, time_filters) + .and_then(|builder| async { + let table = builder.build(schema.clone(), |x| glob_storage.query_prefixes(x))?; + let res = match table { + Some(table) => Some(table.scan(state, projection, filters, limit).await?), + _ => None, + }; + Ok(res) + }) + .await?; + + final_plan(vec![mem_exec, remote_table], projection, schema) +} + +fn final_plan( + execution_plans: Vec>>, + projection: Option<&Vec>, + schema: Arc, +) -> Result, DataFusionError> { + let mut execution_plans = execution_plans.into_iter().flatten().collect_vec(); + let exec: Arc = if execution_plans.is_empty() { + let schema = match projection { + Some(projection) => Arc::new(schema.project(projection)?), + None => schema, + }; + Arc::new(EmptyExec::new(false, schema)) + } else if execution_plans.len() == 1 { + execution_plans.pop().unwrap() + } else { + Arc::new(UnionExec::new(execution_plans)) + }; + + Ok(exec) +} + +fn reversed_mem_table( + mut records: Vec, + schema: Arc, +) -> Result { + records[..].reverse(); + records + .iter_mut() + .for_each(|batch| *batch = crate::utils::arrow::reverse_reader::reverse(batch)); + MemTable::try_new(schema, vec![records]) +} + #[derive(Debug)] pub enum PartialTimeFilter { Low(Bound), diff --git a/server/src/query/table_provider.rs b/server/src/query/table_provider.rs deleted file mode 100644 index 7ffc0c70f..000000000 --- a/server/src/query/table_provider.rs +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Parseable Server (C) 2022 - 2023 Parseable, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - */ - -use async_trait::async_trait; -use datafusion::arrow::datatypes::{Schema, SchemaRef}; -use datafusion::arrow::record_batch::RecordBatch; - -use datafusion::datasource::{MemTable, TableProvider}; -use datafusion::error::DataFusionError; -use datafusion::execution::context::SessionState; -use datafusion::logical_expr::{TableProviderFilterPushDown, TableType}; -use datafusion::physical_plan::empty::EmptyExec; -use datafusion::physical_plan::union::UnionExec; -use datafusion::physical_plan::ExecutionPlan; -use datafusion::prelude::Expr; - -use std::any::Any; -use std::sync::Arc; -use std::vec; - -use crate::utils::arrow::reverse_reader::reverse; - -pub struct QueryTableProvider { - staging: Option, - // remote table - storage: Option>, - schema: Arc, -} - -impl QueryTableProvider { - pub fn try_new( - staging: Option>, - storage: Option>, - schema: Arc, - ) -> Result { - // in place reverse transform - let staging = if let Some(mut staged_batches) = staging { - staged_batches[..].reverse(); - staged_batches - .iter_mut() - .for_each(|batch| *batch = reverse(batch)); - Some(staged_batches) - } else { - None - }; - - let memtable = staging - .map(|records| MemTable::try_new(schema.clone(), vec![records])) - .transpose()?; - - Ok(Self { - staging: memtable, - storage, - schema, - }) - } - - async fn create_physical_plan( - &self, - ctx: &SessionState, - projection: Option<&Vec>, - filters: &[Expr], - limit: Option, - ) -> Result, DataFusionError> { - let mut exec = vec![]; - - if let Some(table) = &self.staging { - exec.push(table.scan(ctx, projection, filters, limit).await?) - } - - if let Some(storage_listing) = self.storage.clone() { - exec.push(storage_listing); - } - - let exec: Arc = if exec.is_empty() { - let schema = match projection { - Some(projection) => Arc::new(self.schema.project(projection)?), - None => self.schema.clone(), - }; - Arc::new(EmptyExec::new(false, schema)) - } else if exec.len() == 1 { - exec.pop().unwrap() - } else { - Arc::new(UnionExec::new(exec)) - }; - - Ok(exec) - } -} - -#[async_trait] -impl TableProvider for QueryTableProvider { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema(&self) -> SchemaRef { - Arc::clone(&self.schema) - } - - fn table_type(&self) -> TableType { - TableType::Base - } - - async fn scan( - &self, - ctx: &SessionState, - projection: Option<&Vec>, - filters: &[Expr], - limit: Option, - ) -> datafusion::error::Result> { - self.create_physical_plan(ctx, projection, filters, limit) - .await - } - - fn supports_filters_pushdown( - &self, - filters: &[&Expr], - ) -> datafusion::error::Result> { - Ok(filters - .iter() - .map(|_| TableProviderFilterPushDown::Inexact) - .collect()) - } -} diff --git a/server/src/rbac/role.rs b/server/src/rbac/role.rs index 968e58e34..5251bec65 100644 --- a/server/src/rbac/role.rs +++ b/server/src/rbac/role.rs @@ -29,6 +29,8 @@ pub enum Action { DeleteStream, GetRetention, PutRetention, + GetCacheEnabled, + PutCacheEnabled, PutAlert, GetAlert, PutUser, @@ -101,6 +103,8 @@ impl RoleBuilder { | Action::GetStats | Action::GetRetention | Action::PutRetention + | Action::GetCacheEnabled + | Action::PutCacheEnabled | Action::PutAlert | Action::GetAlert | Action::All => Permission::Stream(action, self.stream.clone().unwrap()), @@ -169,6 +173,8 @@ pub mod model { Action::GetStats, Action::GetRetention, Action::PutRetention, + Action::PutCacheEnabled, + Action::GetCacheEnabled, Action::PutAlert, Action::GetAlert, Action::GetAbout, diff --git a/server/src/storage.rs b/server/src/storage.rs index 21f054887..10db5770f 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -74,6 +74,8 @@ pub struct ObjectStoreFormat { pub stats: Stats, #[serde(default)] pub snapshot: Snapshot, + #[serde(default)] + pub cache_enabled: bool, } #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] @@ -115,6 +117,7 @@ impl Default for ObjectStoreFormat { permissions: vec![Permisssion::new("parseable".to_string())], stats: Stats::default(), snapshot: Snapshot::default(), + cache_enabled: false, } } } diff --git a/server/src/storage/localfs.rs b/server/src/storage/localfs.rs index 6c163e21f..5ec98bfcc 100644 --- a/server/src/storage/localfs.rs +++ b/server/src/storage/localfs.rs @@ -25,7 +25,7 @@ use std::{ use async_trait::async_trait; use bytes::Bytes; use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeConfig}; -use fs_extra::file::{move_file, CopyOptions}; +use fs_extra::file::CopyOptions; use futures::{stream::FuturesUnordered, TryStreamExt}; use relative_path::RelativePath; use tokio::fs::{self, DirEntry}; @@ -189,8 +189,7 @@ impl ObjectStorage for LocalFS { if let Some(path) = to_path.parent() { fs::create_dir_all(path).await? } - let _ = move_file(path, to_path, &op)?; - + let _ = fs_extra::file::copy(path, to_path, &op)?; Ok(()) } diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 57fbdd3f3..e6f713715 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -24,6 +24,7 @@ use super::{ use crate::{ alerts::Alerts, catalog::{self, manifest::Manifest, snapshot::Snapshot}, + localcache::LocalCacheManager, metadata::STREAM_INFO, metrics::{storage::StorageMetrics, STORAGE_SIZE}, option::CONFIG, @@ -35,6 +36,7 @@ use arrow_schema::Schema; use async_trait::async_trait; use bytes::Bytes; use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeConfig}; +use itertools::Itertools; use relative_path::RelativePath; use relative_path::RelativePathBuf; use serde_json::Value; @@ -196,6 +198,15 @@ pub trait ObjectStorage: Sync + 'static { Ok(serde_json::from_slice(&stream_metadata).expect("parseable config is valid json")) } + async fn put_stream_manifest( + &self, + stream_name: &str, + manifest: &ObjectStoreFormat, + ) -> Result<(), ObjectStorageError> { + let path = stream_json_path(stream_name); + self.put_object(&path, to_bytes(manifest)).await + } + async fn get_stats(&self, stream_name: &str) -> Result { let stream_metadata = self.get_object(&stream_json_path(stream_name)).await?; let stream_metadata: Value = @@ -218,7 +229,6 @@ pub trait ObjectStorage: Sync + 'static { .expect("is object") .get("retention") .cloned(); - if let Some(retention) = retention { Ok(serde_json::from_value(retention).unwrap()) } else { @@ -306,10 +316,15 @@ pub trait ObjectStorage: Sync + 'static { } let streams = STREAM_INFO.list_streams(); - let mut stream_stats = HashMap::new(); + let cache_manager = LocalCacheManager::global(); + let mut cache_updates: HashMap<&String, Vec<_>> = HashMap::new(); + for stream in &streams { + let cache_enabled = STREAM_INFO + .cache_enabled(stream) + .map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?; let dir = StorageDir::new(stream); let schema = convert_disk_files_to_parquet(stream, &dir) .map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?; @@ -327,27 +342,30 @@ pub trait ObjectStorage: Sync + 'static { .or_insert_with(|| compressed_size); }); - for file in &parquet_files { + for file in parquet_files { let filename = file .file_name() .expect("only parquet files are returned by iterator") .to_str() .expect("filename is valid string"); let file_suffix = str::replacen(filename, ".", "/", 3); - let objectstore_path = format!("{stream}/{file_suffix}"); - let manifest = catalog::create_from_parquet_file( - self.absolute_url(RelativePath::from_path(&objectstore_path).unwrap()) - .to_string(), - file, - ) - .unwrap(); - self.upload_file(&objectstore_path, file).await?; + let stream_relative_path = format!("{stream}/{file_suffix}"); + self.upload_file(&stream_relative_path, &file).await?; + let absolute_path = self + .absolute_url(RelativePath::from_path(&stream_relative_path).unwrap()) + .to_string(); let store = CONFIG.storage().get_object_store(); + let manifest = + catalog::create_from_parquet_file(absolute_path.clone(), &file).unwrap(); catalog::update_snapshot(store, stream, manifest).await?; - } - - for file in parquet_files { - let _ = fs::remove_file(file); + if cache_enabled && cache_manager.is_some() { + cache_updates + .entry(stream) + .or_default() + .push((absolute_path, file)) + } else { + let _ = fs::remove_file(file); + } } } @@ -363,6 +381,24 @@ pub trait ObjectStorage: Sync + 'static { } } + if let Some(manager) = cache_manager { + let cache_updates = cache_updates + .into_iter() + .map(|(key, value)| (key.to_owned(), value)) + .collect_vec(); + + tokio::spawn(async move { + for (stream, files) in cache_updates { + for (storage_path, file) in files { + manager + .move_to_cache(&stream, storage_path, file.to_owned()) + .await + .unwrap() + } + } + }); + } + Ok(()) } }