From 109d3499b2d4814c90526807854b6aa1a41d17f4 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Wed, 6 Oct 2021 17:40:14 +0200 Subject: [PATCH] [feat] expose a static object store registry --- .../src/serde/physical_plan/from_proto.rs | 1 - datafusion/Cargo.toml | 4 +-- datafusion/src/datasource/object_store/mod.rs | 22 +++++++++++---- datafusion/src/execution/context.rs | 28 ------------------- 4 files changed, 18 insertions(+), 37 deletions(-) diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs index 0d233725fc9f..febd0c0b1e5b 100644 --- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs @@ -674,7 +674,6 @@ impl TryFrom<&protobuf::PhysicalExprNode> for Arc { aggregate_functions: Default::default(), config: ExecutionConfig::new(), execution_props: ExecutionProps::new(), - object_store_registry: Arc::new(ObjectStoreRegistry::new()), }; let fun_expr = functions::create_physical_fun( diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index 327fcd33de95..d049bd502199 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -40,7 +40,7 @@ path = "src/lib.rs" default = ["crypto_expressions", "regex_expressions", "unicode_expressions"] simd = ["arrow/simd"] crypto_expressions = ["md-5", "sha2"] -regex_expressions = ["regex", "lazy_static"] +regex_expressions = ["regex"] unicode_expressions = ["unicode-segmentation"] # Used for testing ONLY: causes all values to hash to the same value (test for collisions) force_hash_collisions = [] @@ -67,7 +67,7 @@ sha2 = { version = "^0.9.1", optional = true } ordered-float = "2.0" unicode-segmentation = { version = "^1.7.1", optional = true } regex = { version = "^1.4.3", optional = true } -lazy_static = { version = "^1.4.0", optional = true } +lazy_static = "^1.4.0" smallvec = { version = "1.6", features = ["union"] } rand = "0.8" avro-rs = { version = "0.13", features = ["snappy"], optional = true } diff --git a/datafusion/src/datasource/object_store/mod.rs b/datafusion/src/datasource/object_store/mod.rs index fd25fd43a2e7..84a8b3c52e94 100644 --- a/datafusion/src/datasource/object_store/mod.rs +++ b/datafusion/src/datasource/object_store/mod.rs @@ -26,12 +26,26 @@ use std::sync::{Arc, RwLock}; use async_trait::async_trait; use futures::{AsyncRead, Stream}; +use lazy_static::lazy_static; use local::LocalFileSystem; use crate::error::{DataFusionError, Result}; use chrono::Utc; +lazy_static! { + /// A singleton to help making the object store registry + /// consitent throughout the code. For instance in a distributed + /// system like Ballista, it ensures that the stores configured + /// here will be consistent on the executor and the scheduler. + pub static ref OBJECT_STORES: Arc = { + let reg = Arc::new(ObjectStoreRegistry::new()); + reg.register_store(LOCAL_SCHEME.to_string(), Arc::new(LocalFileSystem)); + // YOU CAN ADD YOUR CUSTOM STORES HERE + reg + }; +} + /// Object Reader for one file in a object store #[async_trait] pub trait ObjectReader { @@ -101,14 +115,10 @@ pub struct ObjectStoreRegistry { } impl ObjectStoreRegistry { - /// Create the registry that object stores can registered into. - /// ['LocalFileSystem'] store is registered in by default to support read local files natively. + /// Create an empty registry that object stores can be registered into. pub fn new() -> Self { - let mut map: HashMap> = HashMap::new(); - map.insert(LOCAL_SCHEME.to_string(), Arc::new(LocalFileSystem)); - Self { - object_stores: RwLock::new(map), + object_stores: RwLock::new(HashMap::new()), } } diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 12347beca2a0..cc4e51c4e63a 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -49,7 +49,6 @@ use crate::catalog::{ ResolvedTableReference, TableReference, }; use crate::datasource::csv::CsvFile; -use crate::datasource::object_store::{ObjectStore, ObjectStoreRegistry}; use crate::datasource::parquet::ParquetTable; use crate::datasource::TableProvider; use crate::error::{DataFusionError, Result}; @@ -169,7 +168,6 @@ impl ExecutionContext { aggregate_functions: HashMap::new(), config, execution_props: ExecutionProps::new(), - object_store_registry: Arc::new(ObjectStoreRegistry::new()), })), } } @@ -398,29 +396,6 @@ impl ExecutionContext { self.state.lock().unwrap().catalog_list.catalog(name) } - /// Registers a object store with scheme using a custom `ObjectStore` so that - /// an external file system or object storage system could be used against this context. - /// - /// Returns the `ObjectStore` previously registered for this scheme, if any - pub fn register_object_store( - &self, - scheme: impl Into, - object_store: Arc, - ) -> Option> { - let scheme = scheme.into(); - - self.state - .lock() - .unwrap() - .object_store_registry - .register_store(scheme, object_store) - } - - /// Retrieves a `ObjectStore` instance by scheme - pub fn object_store(&self, scheme: &str) -> Option> { - self.state.lock().unwrap().object_store_registry.get(scheme) - } - /// Registers a table using a custom `TableProvider` so that /// it can be referenced from SQL statements executed against this /// context. @@ -929,8 +904,6 @@ pub struct ExecutionContextState { pub config: ExecutionConfig, /// Execution properties pub execution_props: ExecutionProps, - /// Object Store that are registered with the context - pub object_store_registry: Arc, } impl ExecutionProps { @@ -958,7 +931,6 @@ impl ExecutionContextState { aggregate_functions: HashMap::new(), config: ExecutionConfig::new(), execution_props: ExecutionProps::new(), - object_store_registry: Arc::new(ObjectStoreRegistry::new()), } }