Skip to content

Commit

Permalink
feat: Enable GCS, S3, R2 and MinIO as object stores for local runs (#…
Browse files Browse the repository at this point in the history
…1843)

This PR introduces support for using external object stores as
data/catalog storage in locally run GlareDB.

Closes #1806, and also partially addresses #1818.
  • Loading branch information
gruuya authored Oct 3, 2023
1 parent ea3b2df commit c3cc95a
Show file tree
Hide file tree
Showing 18 changed files with 347 additions and 89 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 16 additions & 9 deletions crates/datasources/src/native/access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,7 @@ impl NativeTableStorage {

pub async fn delete_table(&self, table: &TableEntry) -> Result<()> {
let prefix = format!("databases/{}/tables/{}", self.db_id, table.meta.id);
let path: ObjectStorePath = match &self.conf {
StorageConfig::Gcs { bucket, .. } => format!("gs://{}/{}", bucket, prefix).into(),
StorageConfig::Memory => format!("memory://{}", prefix).into(),
_ => prefix.into(),
};
let mut x = self.store.list(Some(&path)).await?;
let mut x = self.store.list(Some(&prefix.into())).await?;
while let Some(meta) = x.next().await {
let meta = meta?;
self.store.delete(&meta.location).await?
Expand All @@ -154,9 +149,21 @@ impl NativeTableStorage {
let prefix = format!("databases/{}/tables/{}", self.db_id, table.meta.id);

let url = match &self.conf {
StorageConfig::Gcs { bucket, .. } => {
Url::parse(&format!("gs://{}/{}", bucket, prefix.clone()))?
StorageConfig::S3 {
endpoint, bucket, ..
} => {
let mut s3_url = if let Some(endpoint) = endpoint {
endpoint.clone()
} else {
"s3://".to_string()
};

if let Some(bucket) = bucket {
s3_url = format!("{s3_url}/{bucket}");
}
Url::parse(&format!("{s3_url}/{prefix}"))?
}
StorageConfig::Gcs { bucket, .. } => Url::parse(&format!("gs://{bucket}/{prefix}"))?,
StorageConfig::Local { path } => {
let path =
fs::canonicalize(path)
Expand All @@ -169,7 +176,7 @@ impl NativeTableStorage {
Url::from_file_path(path).map_err(|_| NativeError::Static("Path not absolute"))?
}
StorageConfig::Memory => {
let s = format!("memory://{}", prefix.clone());
let s = format!("memory://{prefix}");
Url::parse(&s)?
}
};
Expand Down
3 changes: 3 additions & 0 deletions crates/glaredb/src/args/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ pub struct LocalClientOpts {
#[clap(short = 'c', long, value_parser)]
pub cloud_url: Option<Url>,

#[clap(flatten)]
pub storage_config: StorageConfigArgs,

/// Ignores the proxy and directly goes to the server for remote execution.
///
/// (Internal)
Expand Down
21 changes: 21 additions & 0 deletions crates/glaredb/src/args/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use anyhow::anyhow;
use anyhow::Result;
use clap::{Parser, ValueEnum};
use std::fmt::Write as _;
Expand Down Expand Up @@ -81,3 +82,23 @@ pub struct PgProxyArgs {
#[clap(long)]
pub cloud_auth_code: String,
}

#[derive(Debug, Clone, Parser)]
pub struct StorageConfigArgs {
/// URL of the object store in which to keep the data in.
#[clap(short, long)]
pub location: Option<String>,

/// Storage options for building the object store.
#[clap(short = 'o', long = "option", requires = "location", value_parser=parse_key_value_pair)]
pub storage_options: Vec<(String, String)>,
}

fn parse_key_value_pair(key_value_pair: &str) -> Result<(String, String)> {
key_value_pair
.split_once('=')
.map(|(key, value)| (key.to_string(), value.to_string()))
.ok_or(anyhow!(
"Expected key-value pair delimited by an equals sign, got '{key_value_pair}'"
))
}
8 changes: 3 additions & 5 deletions crates/glaredb/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
pub mod args;
pub mod commands;
mod highlighter;
pub mod local;
pub mod metastore;
pub mod pg_proxy;
mod prompt;
pub mod rpc_proxy;
pub mod server;
pub mod util;

pub mod args;
mod highlighter;
mod prompt;
45 changes: 23 additions & 22 deletions crates/glaredb/src/local.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::args::{LocalClientOpts, OutputMode};
use crate::args::{LocalClientOpts, OutputMode, StorageConfigArgs};
use crate::highlighter::{SQLHighlighter, SQLHinter, SQLValidator};
use crate::prompt::SQLPrompt;
use crate::util::MetastoreClientMode;
use anyhow::{anyhow, Result};
use arrow_util::pretty::pretty_format_batches;
use clap::ValueEnum;
Expand All @@ -16,18 +15,16 @@ use datafusion::physical_plan::SendableRecordBatchStream;
use futures::StreamExt;
use pgrepr::format::Format;
use reedline::{FileBackedHistory, Reedline, Signal};
use std::collections::HashMap;

use datafusion_ext::vars::SessionVars;
use sqlexec::engine::EngineStorageConfig;
use sqlexec::engine::{Engine, SessionStorageConfig, TrackedSession};
use sqlexec::parser;
use sqlexec::remote::client::RemoteClient;
use sqlexec::session::ExecutionResult;
use std::env;
use std::io::Write;
use std::path::PathBuf;
use std::sync::Arc;
use telemetry::Tracker;
use tracing::error;
use url::Url;

Expand All @@ -48,22 +45,19 @@ pub struct LocalSession {
impl LocalSession {
pub async fn connect(opts: LocalClientOpts) -> Result<Self> {
// Connect to metastore.
let mode = MetastoreClientMode::new_local(opts.data_dir.clone())?;
let metastore_client = mode.into_client().await?;
let tracker = Arc::new(Tracker::Nop);

let storage_conf = match &opts.data_dir {
Some(path) => EngineStorageConfig::Local { path: path.clone() },
None => EngineStorageConfig::Memory,
let mut engine = if let StorageConfigArgs {
location: Some(location),
storage_options,
} = &opts.storage_config
{
// TODO: try to consolidate with --data-dir option
Engine::from_storage_options(location, &HashMap::from_iter(storage_options.clone()))
.await?
} else {
Engine::from_data_dir(&opts.data_dir).await?
};

let engine = Engine::new(
metastore_client,
storage_conf,
tracker,
opts.spill_path.clone(),
)
.await?;
engine = engine.with_spill_path(opts.spill_path.clone());

let sess = if let Some(url) = opts.cloud_url.clone() {
let (exec_client, info_msg) = if opts.ignore_rpc_auth {
Expand Down Expand Up @@ -131,9 +125,16 @@ impl LocalSession {
}

async fn run_interactive(&mut self) -> Result<()> {
let info = match &self.opts.data_dir {
Some(path) => format!("Persisting database at path: {}", path.display()),
None => "Using in-memory catalog".to_string(),
let info = match (&self.opts.storage_config, &self.opts.data_dir) {
(
StorageConfigArgs {
location: Some(location),
..
},
_,
) => format!("Persisting database at location: {location}"),
(_, Some(path)) => format!("Persisting database at path: {}", path.display()),
(_, None) => "Using in-memory catalog".to_string(),
};

println!("{info}");
Expand Down
13 changes: 11 additions & 2 deletions crates/glaredb/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::util::MetastoreClientMode;
use anyhow::{anyhow, Result};
use metastore::util::MetastoreClientMode;
use pgsrv::auth::LocalAuthenticator;
use pgsrv::handler::{ProtocolHandler, ProtocolHandlerConfig};
use protogen::gen::rpcsrv::service::execution_service_server::ExecutionServiceServer;
Expand Down Expand Up @@ -55,7 +55,15 @@ impl ComputeServer {
fs::create_dir_all(&env_tmp)?;

// Connect to metastore.
let mode = MetastoreClientMode::new_from_options(metastore_addr, data_dir.clone())?;
let mode = match (metastore_addr, &data_dir) {
(Some(_), Some(_)) => {
return Err(anyhow!(
"Only one of metastore address or metastore path may be provided."
))
}
(Some(addr), None) => MetastoreClientMode::Remote { addr },
_ => MetastoreClientMode::new_local(data_dir.clone()),
};
let metastore_client = mode.into_client().await?;

let tracker = match segment_key {
Expand All @@ -78,6 +86,7 @@ impl ComputeServer {
let storage_conf = match (data_dir, service_account_key) {
(None, Some(key)) => EngineStorageConfig::Gcs {
service_account_key: key,
bucket: None,
},
(Some(dir), None) => EngineStorageConfig::Local { path: dir },
(None, None) => EngineStorageConfig::Memory,
Expand Down
3 changes: 3 additions & 0 deletions crates/metastore/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ pub enum MetastoreError {

#[error(transparent)]
Validation(#[from] sqlbuiltins::validation::ValidationError),

#[error(transparent)]
TonicTransportError(#[from] tonic::transport::Error),
}

pub type Result<T, E = MetastoreError> = std::result::Result<T, E>;
Expand Down
1 change: 1 addition & 0 deletions crates/metastore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ pub mod srv;

mod database;
mod storage;
pub mod util;
3 changes: 1 addition & 2 deletions crates/metastore/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ pub async fn start_inprocess_local(

/// Starts an in-process metastore service, returning a client for the service.
///
/// Useful for some tests, as well as when running GlareDB locally for testing.
/// This should never be used in production.
/// Useful for tests, as well as when running GlareDB locally.
pub async fn start_inprocess(
store: Arc<dyn ObjectStore>,
) -> Result<MetastoreServiceClient<Channel>> {
Expand Down
35 changes: 17 additions & 18 deletions crates/glaredb/src/util.rs → crates/metastore/src/util.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use anyhow::{anyhow, Result};
use metastore::local::{start_inprocess_inmemory, start_inprocess_local};
use crate::errors::{MetastoreError, Result};
use crate::local::{start_inprocess_inmemory, start_inprocess_local};
use protogen::gen::metastore::service::metastore_service_client::MetastoreServiceClient;
use std::path::PathBuf;
use std::{fs, time::Duration};
Expand All @@ -18,19 +18,10 @@ pub enum MetastoreClientMode {
}

impl MetastoreClientMode {
pub fn new_local(local_path: Option<PathBuf>) -> Result<Self> {
pub fn new_local(local_path: Option<PathBuf>) -> Self {
match local_path {
Some(path) => Ok(MetastoreClientMode::LocalDisk { path }),
None => Ok(MetastoreClientMode::LocalInMemory),
}
}
pub fn new_from_options(addr: Option<String>, local_path: Option<PathBuf>) -> Result<Self> {
match (addr, &local_path) {
(Some(_), Some(_)) => Err(anyhow!(
"Only one of metastore address or metastore path may be provided."
)),
(Some(addr), None) => Ok(MetastoreClientMode::Remote { addr }),
_ => Self::new_local(local_path),
Some(path) => MetastoreClientMode::LocalDisk { path },
None => MetastoreClientMode::LocalInMemory,
}
}

Expand All @@ -49,14 +40,22 @@ impl MetastoreClientMode {
}
Self::LocalDisk { path } => {
if !path.exists() {
fs::create_dir_all(&path)?;
fs::create_dir_all(&path).map_err(|e| {
MetastoreError::FailedInProcessStartup(format!(
"Failed creating directory at path {}: {e}",
path.to_string_lossy()
))
})?;
}
if path.exists() && !path.is_dir() {
return Err(anyhow!("Path is not a valid directory"));
return Err(MetastoreError::FailedInProcessStartup(format!(
"Error creating metastore client, path {} is not a valid directory",
path.to_string_lossy()
)));
}
Ok(start_inprocess_local(path).await?)
start_inprocess_local(path).await
}
Self::LocalInMemory => Ok(start_inprocess_inmemory().await?),
Self::LocalInMemory => start_inprocess_inmemory().await,
}
}
}
41 changes: 41 additions & 0 deletions crates/object_store_util/src/conf.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use object_store::aws::{AmazonS3Builder, S3CopyIfNotExists};
use object_store::{
gcp::GoogleCloudStorageBuilder, local::LocalFileSystem, memory::InMemory,
Error as ObjectStoreError, ObjectStore,
Expand All @@ -11,6 +12,13 @@ static IN_MEMORY_STORE: Lazy<Arc<InMemory>> = Lazy::new(|| Arc::new(InMemory::ne
/// Configuration options for various types of storage we support.
#[derive(Debug, Clone)]
pub enum StorageConfig {
S3 {
access_key_id: String,
secret_access_key: String,
region: Option<String>,
endpoint: Option<String>,
bucket: Option<String>,
},
Gcs {
service_account_key: String,
bucket: String,
Expand All @@ -25,6 +33,39 @@ impl StorageConfig {
/// Create a new object store using this config.
pub fn new_object_store(&self) -> Result<Arc<dyn ObjectStore>, ObjectStoreError> {
Ok(match self {
StorageConfig::S3 {
access_key_id,
secret_access_key,
region,
endpoint,
bucket,
} => {
let mut builder = AmazonS3Builder::new()
.with_access_key_id(access_key_id)
.with_secret_access_key(secret_access_key)
.with_region(region.clone().unwrap_or_default());

if let Some(endpoint) = endpoint {
if endpoint.starts_with("http://") {
builder = builder.with_allow_http(true);
}
builder = builder.with_endpoint(endpoint);
if endpoint.contains("r2.cloudflarestorage.com") {
// Ensure `ObjectStore::copy_if_not_exists` is enabled on the S3 client for
// Cloudflare R2 with the adequate header
builder = builder.with_copy_if_not_exists(S3CopyIfNotExists::Header(
"cf-copy-destination-if-none-match".to_string(),
"*".to_string(),
))
}
}

if let Some(bucket) = bucket {
builder = builder.with_bucket_name(bucket);
}

Arc::new(builder.build()?)
}
StorageConfig::Gcs {
service_account_key,
bucket,
Expand Down
Loading

0 comments on commit c3cc95a

Please sign in to comment.