Skip to content

Commit

Permalink
Add an end-to-end test for the external store
Browse files Browse the repository at this point in the history
Also rename floc crate to clade.
  • Loading branch information
gruuya committed Dec 29, 2023
1 parent 1ff1b7d commit 6ce1526
Show file tree
Hide file tree
Showing 18 changed files with 205 additions and 46 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ jobs:
with:
path: ~/.cache/pre-commit
key: pre-commit-${{ runner.os }}-pre-commit-${{ hashFiles('**/.pre-commit-config.yaml') }}
- name: Install protoc
run: sudo apt install -y protobuf-compiler

# Use https://github.com/marketplace/actions/rust-cache

Expand Down
20 changes: 10 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[workspace]
members = ["floc"]
members = ["clade"]

[package]
name = "seafowl"
Expand Down Expand Up @@ -45,6 +45,7 @@ base64 = "0.21.0"

bytes = "1.4.0"
chrono = { version = "0.4", default_features = false }
clade = { path = "clade" }
clap = { version = "3.2.19", features = [ "derive" ] }
config = "0.13.3"

Expand All @@ -61,7 +62,6 @@ datafusion-expr = "32.0.0"
datafusion-remote-tables = { path = "./datafusion_remote_tables", optional = true }

deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "2b913b37e71ed96212dcec8c3fc8e865754ced82", features = ["s3-native-tls", "datafusion-ext"] }
floc = { path = "floc" }

futures = "0.3"
hex = ">=0.4.0"
Expand Down
2 changes: 1 addition & 1 deletion floc/Cargo.toml → clade/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = "floc"
name = "clade"
version = "0.1.0"
edition = "2021"

Expand Down
File renamed without changes.
8 changes: 2 additions & 6 deletions floc/proto/catalog.proto → clade/proto/catalog.proto
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
syntax = "proto3";

package floc.catalog;

message UUID {
string value = 1;
}
package clade.catalog;

message CatalogReference {
string catalog_name = 1;
Expand All @@ -24,7 +20,7 @@ message TableReference {
message TableObject {
SchemaReference schema = 1;
string name = 2;
UUID uuid = 3; // Will become location
string location = 3;
}


2 changes: 1 addition & 1 deletion floc/proto/schema.proto → clade/proto/schema.proto
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
syntax = "proto3";

package floc.schema;
package clade.schema;

import "catalog.proto";
import "google/protobuf/empty.proto";
Expand Down
7 changes: 7 additions & 0 deletions clade/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
pub mod catalog {
tonic::include_proto!("clade.catalog");
}

pub mod schema {
tonic::include_proto!("clade.schema");
}
7 changes: 0 additions & 7 deletions floc/src/lib.rs

This file was deleted.

19 changes: 13 additions & 6 deletions src/catalog/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,27 @@ use crate::repository::interface::{
};
use crate::wasm_udf::data_types::CreateFunctionDetails;
use arrow_schema::Schema;
use floc::catalog::CatalogReference;
use floc::schema::schema_store_service_client::SchemaStoreServiceClient;
use floc::schema::ListSchemaResponse;
use tonic::transport::channel::Channel;
use clade::catalog::CatalogReference;
use clade::schema::schema_store_service_client::SchemaStoreServiceClient;
use clade::schema::ListSchemaResponse;
use tonic::transport::{channel::Channel, Error};
use tonic::Request;
use uuid::Uuid;

// An external store, facilitated via a remote floc server implementation
// An external store, facilitated via a remote clade server implementation
#[derive(Clone)]
pub struct ExternalStore {
client: SchemaStoreServiceClient<Channel>,
}

impl ExternalStore {
// Create a new external store implementing the clade interface
pub async fn new(dsn: String) -> Result<Self, Error> {
println!("Client dsn is {dsn}");
let client = SchemaStoreServiceClient::connect(dsn).await?;
Ok(Self { client })
}

// Tonic client implementations always end up needing mut references, and apparently the way
// to go is cloning a client instance instead of introducing synchronization primitives:
// https://github.com/hyperium/tonic/issues/33#issuecomment-538154015
Expand Down Expand Up @@ -177,7 +184,7 @@ impl FunctionStore for ExternalStore {
&self,
_catalog_name: &str,
) -> CatalogResult<Vec<AllDatabaseFunctionsResult>> {
not_impl()
Ok(vec![])
}

async fn delete(
Expand Down
20 changes: 18 additions & 2 deletions src/catalog/metastore.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::catalog::external::ExternalStore;
use crate::catalog::repository::RepositoryStore;
use crate::catalog::{
CatalogError, CatalogResult, CatalogStore, CreateFunctionError, FunctionStore,
Expand All @@ -11,10 +12,10 @@ use crate::wasm_udf::data_types::{
CreateFunctionDataType, CreateFunctionDetails, CreateFunctionLanguage,
CreateFunctionVolatility,
};
use clade::schema::SchemaObject;
use datafusion::catalog::schema::MemorySchemaProvider;
use datafusion::datasource::TableProvider;
use deltalake::DeltaTable;
use floc::schema::SchemaObject;
use parking_lot::RwLock;
use std::collections::HashMap;
use std::str::FromStr;
Expand Down Expand Up @@ -51,6 +52,21 @@ impl Metastore {
}
}

pub fn new_from_external(
external_store: Arc<ExternalStore>,
object_store: Arc<InternalObjectStore>,
) -> Self {
let staging_schema = Arc::new(MemorySchemaProvider::new());
Self {
catalogs: external_store.clone(),
schemas: external_store.clone(),
tables: external_store.clone(),
functions: external_store,
staging_schema,
object_store,
}
}

pub async fn build_catalog(
&self,
catalog_name: &str,
Expand Down Expand Up @@ -84,7 +100,7 @@ impl Metastore {
let tables = schema
.tables
.into_iter()
.map(|table| self.build_table(table.name, &table.uuid.unwrap().value))
.map(|table| self.build_table(table.name, &table.location))
.collect::<HashMap<_, _>>();

(
Expand Down
6 changes: 3 additions & 3 deletions src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ use crate::repository::interface::{
use crate::wasm_udf::data_types::CreateFunctionDetails;
use arrow_schema::Schema;
use async_trait::async_trait;
use clade::schema::ListSchemaResponse;
use datafusion_common::DataFusionError;
use floc::schema::ListSchemaResponse;
use tonic::Status;
use uuid::Uuid;

mod external;
pub(crate) mod metastore;
pub mod external;
pub mod metastore;
mod repository;

pub const DEFAULT_DB: &str = "default";
Expand Down
10 changes: 4 additions & 6 deletions src/catalog/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use async_trait::async_trait;
use itertools::Itertools;
use uuid::Uuid;

use floc::catalog::{TableObject, Uuid as FlocUuid};
use floc::schema::{ListSchemaResponse, SchemaObject};
use clade::catalog::TableObject;
use clade::schema::{ListSchemaResponse, SchemaObject};

use crate::catalog::{
CatalogError, CatalogResult, CatalogStore, FunctionStore, SchemaStore, TableStore,
Expand All @@ -21,7 +21,7 @@ use crate::repository::interface::{
};
use crate::wasm_udf::data_types::CreateFunctionDetails;

// The native, in-process catalog implementation for Seafowl.
// The native catalog implementation for Seafowl.
pub struct RepositoryStore {
pub repository: Arc<dyn Repository>,
}
Expand Down Expand Up @@ -128,9 +128,7 @@ impl SchemaStore for RepositoryStore {
Some(TableObject {
schema: None,
name: name.clone(),
uuid: Some(FlocUuid {
value: uuid.to_string(),
}),
location: uuid.to_string(),
})
} else {
None
Expand Down
10 changes: 9 additions & 1 deletion src/config/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use object_store::{local::LocalFileSystem, memory::InMemory, ObjectStore};
#[cfg(feature = "catalog-postgres")]
use crate::repository::postgres::PostgresRepository;

use crate::catalog::{metastore::Metastore, CatalogError};
use crate::catalog::{external::ExternalStore, metastore::Metastore, CatalogError};
use crate::object_store::http::add_http_object_store;
use crate::object_store::wrapped::InternalObjectStore;
#[cfg(feature = "remote-tables")]
Expand Down Expand Up @@ -58,6 +58,14 @@ async fn build_metastore(
.await
.expect("Error setting up the database"),
),
schema::Catalog::Clade(schema::Clade { dsn }) => {
let external = Arc::new(
ExternalStore::new(dsn.clone())
.await
.expect("Error setting up remote store"),
);
return Metastore::new_from_external(external, object_store);
}
};

Metastore::new_from_repository(repository, object_store)
Expand Down
9 changes: 8 additions & 1 deletion src/config/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{
time::Duration,
};

use crate::catalog::DEFAULT_SCHEMA;
use crate::object_store::cache::{
CachingObjectStore, DEFAULT_CACHE_CAPACITY, DEFAULT_CACHE_ENTRY_TTL,
DEFAULT_MIN_FETCH_SIZE,
Expand Down Expand Up @@ -221,6 +222,7 @@ pub enum Catalog {
#[cfg(feature = "catalog-postgres")]
Postgres(Postgres),
Sqlite(Sqlite),
Clade(Clade),
}

#[derive(Deserialize, Debug, PartialEq, Eq, Clone)]
Expand Down Expand Up @@ -254,8 +256,13 @@ pub struct Sqlite {
pub read_only: bool,
}

#[derive(Deserialize, Debug, PartialEq, Eq, Clone)]
pub struct Clade {
pub dsn: String,
}

fn default_schema() -> String {
"public".to_string()
DEFAULT_SCHEMA.to_string()
}

#[derive(Deserialize, Debug, PartialEq, Eq, Default, Clone)]
Expand Down
Loading

0 comments on commit 6ce1526

Please sign in to comment.