Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose a static object store registry #1072

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,6 @@ impl TryFrom<&protobuf::PhysicalExprNode> for Arc<dyn PhysicalExpr> {
aggregate_functions: Default::default(),
config: ExecutionConfig::new(),
execution_props: ExecutionProps::new(),
object_store_registry: Arc::new(ObjectStoreRegistry::new()),
};

let fun_expr = functions::create_physical_fun(
Expand Down
4 changes: 2 additions & 2 deletions datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ path = "src/lib.rs"
default = ["crypto_expressions", "regex_expressions", "unicode_expressions"]
simd = ["arrow/simd"]
crypto_expressions = ["md-5", "sha2"]
regex_expressions = ["regex", "lazy_static"]
regex_expressions = ["regex"]
unicode_expressions = ["unicode-segmentation"]
# Used for testing ONLY: causes all values to hash to the same value (test for collisions)
force_hash_collisions = []
Expand All @@ -67,7 +67,7 @@ sha2 = { version = "^0.9.1", optional = true }
ordered-float = "2.0"
unicode-segmentation = { version = "^1.7.1", optional = true }
regex = { version = "^1.4.3", optional = true }
lazy_static = { version = "^1.4.0", optional = true }
lazy_static = "^1.4.0"
smallvec = { version = "1.6", features = ["union"] }
rand = "0.8"
avro-rs = { version = "0.13", features = ["snappy"], optional = true }
Expand Down
22 changes: 16 additions & 6 deletions datafusion/src/datasource/object_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,26 @@ use std::sync::{Arc, RwLock};

use async_trait::async_trait;
use futures::{AsyncRead, Stream};
use lazy_static::lazy_static;

use local::LocalFileSystem;

use crate::error::{DataFusionError, Result};
use chrono::Utc;

lazy_static! {
/// A singleton to help making the object store registry
/// consitent throughout the code. For instance in a distributed
/// system like Ballista, it ensures that the stores configured
/// here will be consistent on the executor and the scheduler.
pub static ref OBJECT_STORES: Arc<ObjectStoreRegistry> = {
let reg = Arc::new(ObjectStoreRegistry::new());
reg.register_store(LOCAL_SCHEME.to_string(), Arc::new(LocalFileSystem));
// YOU CAN ADD YOUR CUSTOM STORES HERE
reg
};
}

/// Object Reader for one file in a object store
#[async_trait]
pub trait ObjectReader {
Expand Down Expand Up @@ -101,14 +115,10 @@ pub struct ObjectStoreRegistry {
}

impl ObjectStoreRegistry {
/// Create the registry that object stores can registered into.
/// ['LocalFileSystem'] store is registered in by default to support read local files natively.
/// Create an empty registry that object stores can be registered into.
pub fn new() -> Self {
let mut map: HashMap<String, Arc<dyn ObjectStore>> = HashMap::new();
map.insert(LOCAL_SCHEME.to_string(), Arc::new(LocalFileSystem));

Self {
object_stores: RwLock::new(map),
object_stores: RwLock::new(HashMap::new()),
}
}

Expand Down
28 changes: 0 additions & 28 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ use crate::catalog::{
ResolvedTableReference, TableReference,
};
use crate::datasource::csv::CsvFile;
use crate::datasource::object_store::{ObjectStore, ObjectStoreRegistry};
use crate::datasource::parquet::ParquetTable;
use crate::datasource::TableProvider;
use crate::error::{DataFusionError, Result};
Expand Down Expand Up @@ -169,7 +168,6 @@ impl ExecutionContext {
aggregate_functions: HashMap::new(),
config,
execution_props: ExecutionProps::new(),
object_store_registry: Arc::new(ObjectStoreRegistry::new()),
})),
}
}
Expand Down Expand Up @@ -398,29 +396,6 @@ impl ExecutionContext {
self.state.lock().unwrap().catalog_list.catalog(name)
}

/// Registers a object store with scheme using a custom `ObjectStore` so that
/// an external file system or object storage system could be used against this context.
///
/// Returns the `ObjectStore` previously registered for this scheme, if any
pub fn register_object_store(
&self,
scheme: impl Into<String>,
object_store: Arc<dyn ObjectStore>,
) -> Option<Arc<dyn ObjectStore>> {
let scheme = scheme.into();

self.state
.lock()
.unwrap()
.object_store_registry
.register_store(scheme, object_store)
}

/// Retrieves a `ObjectStore` instance by scheme
pub fn object_store(&self, scheme: &str) -> Option<Arc<dyn ObjectStore>> {
self.state.lock().unwrap().object_store_registry.get(scheme)
}

/// Registers a table using a custom `TableProvider` so that
/// it can be referenced from SQL statements executed against this
/// context.
Expand Down Expand Up @@ -929,8 +904,6 @@ pub struct ExecutionContextState {
pub config: ExecutionConfig,
/// Execution properties
pub execution_props: ExecutionProps,
/// Object Store that are registered with the context
pub object_store_registry: Arc<ObjectStoreRegistry>,
}

impl ExecutionProps {
Expand Down Expand Up @@ -958,7 +931,6 @@ impl ExecutionContextState {
aggregate_functions: HashMap::new(),
config: ExecutionConfig::new(),
execution_props: ExecutionProps::new(),
object_store_registry: Arc::new(ObjectStoreRegistry::new()),
}
}

Expand Down