Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Enable GCS, S3, R2 and MinIO as object stores for local runs #1843

Merged
merged 17 commits into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 16 additions & 9 deletions crates/datasources/src/native/access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,7 @@ impl NativeTableStorage {

pub async fn delete_table(&self, table: &TableEntry) -> Result<()> {
let prefix = format!("databases/{}/tables/{}", self.db_id, table.meta.id);
let path: ObjectStorePath = match &self.conf {
StorageConfig::Gcs { bucket, .. } => format!("gs://{}/{}", bucket, prefix).into(),
StorageConfig::Memory => format!("memory://{}", prefix).into(),
_ => prefix.into(),
};
let mut x = self.store.list(Some(&path)).await?;
let mut x = self.store.list(Some(&prefix.into())).await?;
while let Some(meta) = x.next().await {
let meta = meta?;
self.store.delete(&meta.location).await?
Expand All @@ -153,9 +148,21 @@ impl NativeTableStorage {
let prefix = format!("databases/{}/tables/{}", self.db_id, table.meta.id);

let url = match &self.conf {
StorageConfig::Gcs { bucket, .. } => {
Url::parse(&format!("gs://{}/{}", bucket, prefix.clone()))?
StorageConfig::S3 {
endpoint, bucket, ..
} => {
let mut s3_url = if let Some(endpoint) = endpoint {
endpoint.clone()
} else {
"s3://".to_string()
};

if let Some(bucket) = bucket {
s3_url = format!("{s3_url}/{bucket}");
}
Url::parse(&format!("{s3_url}/{prefix}"))?
}
StorageConfig::Gcs { bucket, .. } => Url::parse(&format!("gs://{bucket}/{prefix}"))?,
StorageConfig::Local { path } => {
let path =
fs::canonicalize(path)
Expand All @@ -168,7 +175,7 @@ impl NativeTableStorage {
Url::from_file_path(path).map_err(|_| NativeError::Static("Path not absolute"))?
}
StorageConfig::Memory => {
let s = format!("memory://{}", prefix.clone());
let s = format!("memory://{prefix}");
Url::parse(&s)?
}
};
Expand Down
3 changes: 3 additions & 0 deletions crates/glaredb/src/args/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ pub struct LocalClientOpts {
#[clap(short = 'c', long, value_parser)]
pub cloud_url: Option<Url>,

#[clap(flatten)]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept wanting there to be soemthing like "flatten with prefix" so we could still get strict parsing that we didn't have to write ourselves, but it seems like this isn't to be. I also sort of wanted ArgGroup to be able to get us more parsing, but again, no such luck.

pub storage_config: StorageConfigArgs,

/// Ignores the proxy and directly goes to the server for remote execution.
///
/// (Internal)
Expand Down
21 changes: 21 additions & 0 deletions crates/glaredb/src/args/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use anyhow::anyhow;
use anyhow::Result;
use clap::{Parser, ValueEnum};
use std::fmt::Write as _;
Expand Down Expand Up @@ -81,3 +82,23 @@ pub struct PgProxyArgs {
#[clap(long)]
pub cloud_auth_code: String,
}

#[derive(Debug, Clone, Parser)]
pub struct StorageConfigArgs {
/// URL of the object store in which to keep the data in.
#[clap(short, long)]
pub location: Option<String>,

/// Storage options for building the object store.
#[clap(short = 'o', long, requires = "location", value_parser=parse_key_value_pair)]
pub storage_options: Vec<(String, String)>,
}

fn parse_key_value_pair(key_value_pair: &str) -> Result<(String, String)> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if you specify options which aren't supported, or options that are specified more than once? will order matter (should we sort by key?)

While I think it's good to have some kind of flexible configs here (given that not all backing storage is going have the same arguments),

it's my assumption that we'll (eventually) have all this specified by a config file or most people will specify the options interactively in via SQL/python.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if you specify options which aren't supported

Unsupported options will be ignored but the initialization/setup will error out with a clear message (either from us, or courtesy of the object store crate itself) if some of the required keys are missing.

or options that are specified more than once? will order matter (should we sort by key?)

The last option value will be used in this case.

key_value_pair
.split_once('=')
.map(|(key, value)| (key.to_string(), value.to_string()))
.ok_or(anyhow!(
"Expected key-value pair delimited by an equals sign, got '{key_value_pair}'"
))
}
52 changes: 42 additions & 10 deletions crates/glaredb/src/local.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::args::{LocalClientOpts, OutputMode};
use crate::args::{LocalClientOpts, OutputMode, StorageConfigArgs};
use crate::highlighter::{SQLHighlighter, SQLHinter, SQLValidator};
use crate::prompt::SQLPrompt;
use crate::util::MetastoreClientMode;
Expand All @@ -16,8 +16,11 @@ use datafusion::physical_plan::SendableRecordBatchStream;
use futures::StreamExt;
use pgrepr::format::Format;
use reedline::{FileBackedHistory, Reedline, Signal};
use std::collections::HashMap;

use datafusion_ext::vars::SessionVars;
use metastore::local::start_inprocess;
use object_store_util::shared::SharedObjectStore;
use sqlexec::engine::EngineStorageConfig;
use sqlexec::engine::{Engine, SessionStorageConfig, TrackedSession};
use sqlexec::parser;
Expand Down Expand Up @@ -48,15 +51,37 @@ pub struct LocalSession {
impl LocalSession {
pub async fn connect(opts: LocalClientOpts) -> Result<Self> {
// Connect to metastore.
let mode = MetastoreClientMode::new_local(opts.data_dir.clone())?;
let metastore_client = mode.into_client().await?;
let tracker = Arc::new(Tracker::Nop);
let (storage_conf, metastore_client) = if let StorageConfigArgs {
location: Some(location),
storage_options,
} = &opts.storage_config
{
// TODO: try to consolidate with --data-dir option
let conf = EngineStorageConfig::try_from_options(
location,
HashMap::from_iter(storage_options.clone()),
)?;
let store = conf
.storage_config(&SessionStorageConfig::default())?
.new_object_store()?;
// Wrap up the store with a shared one, so that we get to use the non-atomic
// copy-if-not-exists that is defined there when initializing the lease
let store = SharedObjectStore::new(store);
let client = start_inprocess(Arc::new(store)).await?;
(conf, client)
} else {
let conf = match &opts.data_dir {
Some(path) => EngineStorageConfig::Local { path: path.clone() },
None => EngineStorageConfig::Memory,
};

let storage_conf = match &opts.data_dir {
Some(path) => EngineStorageConfig::Local { path: path.clone() },
None => EngineStorageConfig::Memory,
let mode = MetastoreClientMode::new_local(opts.data_dir.clone())?;
let client = mode.into_client().await?;
(conf, client)
};

let tracker = Arc::new(Tracker::Nop);

let engine = Engine::new(
metastore_client,
storage_conf,
Expand Down Expand Up @@ -131,9 +156,16 @@ impl LocalSession {
}

async fn run_interactive(&mut self) -> Result<()> {
let info = match &self.opts.data_dir {
Some(path) => format!("Persisting database at path: {}", path.display()),
None => "Using in-memory catalog".to_string(),
let info = match (&self.opts.storage_config, &self.opts.data_dir) {
(
StorageConfigArgs {
location: Some(location),
..
},
_,
) => format!("Persisting database at location: {location}"),
(_, Some(path)) => format!("Persisting database at path: {}", path.display()),
(_, None) => "Using in-memory catalog".to_string(),
};

println!("{info}");
Expand Down
1 change: 1 addition & 0 deletions crates/glaredb/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ impl ComputeServer {
let storage_conf = match (data_dir, service_account_key) {
(None, Some(key)) => EngineStorageConfig::Gcs {
service_account_key: key,
bucket: None,
},
(Some(dir), None) => EngineStorageConfig::Local { path: dir },
(None, None) => EngineStorageConfig::Memory,
Expand Down
3 changes: 1 addition & 2 deletions crates/metastore/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ pub async fn start_inprocess_local(

/// Starts an in-process metastore service, returning a client for the service.
///
/// Useful for some tests, as well as when running GlareDB locally for testing.
/// This should never be used in production.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! 🫣

/// Useful for tests, as well as when running GlareDB locally.
pub async fn start_inprocess(
store: Arc<dyn ObjectStore>,
) -> Result<MetastoreServiceClient<Channel>> {
Expand Down
1 change: 1 addition & 0 deletions crates/object_store_util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ edition = { workspace = true }
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
anyhow = "1.0"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as for metastore.

logutil = { path = "../logutil" }
object_store = { workspace = true }
tempfile = "3"
Expand Down
41 changes: 41 additions & 0 deletions crates/object_store_util/src/conf.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use object_store::aws::{AmazonS3Builder, S3CopyIfNotExists};
use object_store::{
gcp::GoogleCloudStorageBuilder, local::LocalFileSystem, memory::InMemory,
Error as ObjectStoreError, ObjectStore,
Expand All @@ -11,6 +12,13 @@ static IN_MEMORY_STORE: Lazy<Arc<InMemory>> = Lazy::new(|| Arc::new(InMemory::ne
/// Configuration options for various types of storage we support.
#[derive(Debug, Clone)]
pub enum StorageConfig {
S3 {
access_key_id: String,
secret_access_key: String,
region: Option<String>,
endpoint: Option<String>,
bucket: Option<String>,
},
Gcs {
service_account_key: String,
bucket: String,
Expand All @@ -25,6 +33,39 @@ impl StorageConfig {
/// Create a new object store using this config.
pub fn new_object_store(&self) -> Result<Arc<dyn ObjectStore>, ObjectStoreError> {
Ok(match self {
StorageConfig::S3 {
access_key_id,
secret_access_key,
region,
endpoint,
bucket,
} => {
let mut builder = AmazonS3Builder::new()
.with_access_key_id(access_key_id)
.with_secret_access_key(secret_access_key)
.with_region(region.clone().unwrap_or_default());

if let Some(endpoint) = endpoint {
if endpoint.starts_with("http://") {
builder = builder.with_allow_http(true);
}
builder = builder.with_endpoint(endpoint);
if endpoint.contains("r2.cloudflarestorage.com") {
// Ensure `ObjectStore::copy_if_not_exists` is enabled on the S3 client for
// Cloudflare R2 with the adequate header
builder = builder.with_copy_if_not_exists(S3CopyIfNotExists::Header(
"cf-copy-destination-if-none-match".to_string(),
"*".to_string(),
))
}
}

if let Some(bucket) = bucket {
builder = builder.with_bucket_name(bucket);
}

Arc::new(builder.build()?)
}
StorageConfig::Gcs {
service_account_key,
bucket,
Expand Down
32 changes: 25 additions & 7 deletions crates/object_store_util/src/shared.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use anyhow::anyhow;
use async_trait::async_trait;
use bytes::Bytes;
use futures::stream::BoxStream;
use object_store::{path::Path, GetResult, ListResult, ObjectMeta, ObjectStore, Result};
use object_store::{
path::Path, Error as ObjectStoreError, GetResult, ListResult, ObjectMeta, ObjectStore, Result,
};
use object_store::{GetOptions, MultipartId};
use std::ops::Range;
use std::sync::Arc;
Expand Down Expand Up @@ -86,14 +89,29 @@ impl ObjectStore for SharedObjectStore {
}

async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
self.inner.copy_if_not_exists(from, to).await
match self.inner.copy_if_not_exists(from, to).await {
Ok(_) => Ok(()),
Err(ObjectStoreError::NotSupported { .. }) => {
// Go with the poor man's copy-if-not-exists: try a regular copy if the path doesn't exist
match self.head(to).await {
Ok(_) => {
return Err(ObjectStoreError::AlreadyExists {
path: to.to_string(),
source: anyhow!(
"Object at path {to} already exists, can't perform copy-if-not-exists"
)
.into(),
})
}
Err(ObjectStoreError::NotFound { .. }) => self.copy(from, to).await,
Err(e) => Err(e),
}
}
Err(e) => Err(e),
}
}
tychoish marked this conversation as resolved.
Show resolved Hide resolved

async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
self.rename(from, to).await
}

async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
self.inner.rename_if_not_exists(from, to).await
self.inner.rename(from, to).await
}
}
Loading