diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 56c55fcc..fd0bebab 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -35,6 +35,8 @@ jobs: with: path: ~/.cache/pre-commit key: pre-commit-${{ runner.os }}-pre-commit-${{ hashFiles('**/.pre-commit-config.yaml') }} + - name: Install protoc + run: sudo apt install -y protobuf-compiler # Use https://github.com/marketplace/actions/rust-cache diff --git a/Cargo.lock b/Cargo.lock index d49f1f27..e38911a5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1034,6 +1034,15 @@ dependencies = [ "phf_codegen", ] +[[package]] +name = "clade" +version = "0.1.0" +dependencies = [ + "prost", + "tonic", + "tonic-build", +] + [[package]] name = "clang-sys" version = "1.6.1" @@ -5211,6 +5220,7 @@ dependencies = [ "base64 0.21.5", "bytes", "chrono", + "clade", "clap", "config", "convergence", @@ -5250,8 +5260,10 @@ dependencies = [ "strum", "strum_macros", "tempfile", + "thiserror", "tokio", "tonic", + "tonic-reflection", "url", "uuid 1.5.0", "vergen", @@ -6312,6 +6324,32 @@ dependencies = [ "tracing", ] +[[package]] +name = "tonic-build" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d021fc044c18582b9a2408cd0dd05b1596e3ecdb5c4df822bb0183545683889" +dependencies = [ + "prettyplease", + "proc-macro2", + "prost-build", + "quote", + "syn 2.0.39", +] + +[[package]] +name = "tonic-reflection" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fa37c513df1339d197f4ba21d28c918b9ef1ac1768265f11ecb6b7f1cba1b76" +dependencies = [ + "prost", + "prost-types", + "tokio", + "tokio-stream", + "tonic", +] + [[package]] name = "tower" version = "0.4.13" diff --git a/Cargo.toml b/Cargo.toml index adecb626..f54c0d7c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,4 +1,5 @@ [workspace] +members = ["clade"] [package] name = "seafowl" @@ -44,6 +45,7 @@ base64 = "0.21.0" bytes = "1.4.0" chrono = { version = "0.4", default_features = false } +clade = { path = "clade" } clap = { version = "3.2.19", features = [ "derive" ] } config = "0.13.3" @@ -90,6 +92,7 @@ sqlx = { version = "0.7.1", features = [ "runtime-tokio-rustls", "sqlite", "any" strum = ">=0.24" strum_macros = ">=0.24" tempfile = "3" +thiserror = "1" tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "signal", "process"] } tonic = { version = "0.10.0", optional = true } url = "2.2" @@ -106,6 +109,7 @@ assert_cmd = "2" assert_unordered = "0.3" rstest = "*" serial_test = "2" +tonic-reflection = "0.10" wiremock = "0.5" [build-dependencies] diff --git a/clade/Cargo.toml b/clade/Cargo.toml new file mode 100644 index 00000000..d3220833 --- /dev/null +++ b/clade/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "clade" +version = "0.1.0" +edition = "2021" + +[dependencies] +prost = "0.12" +tonic = "0.10" + +[build-dependencies] +tonic-build = "0.10" diff --git a/clade/build.rs b/clade/build.rs new file mode 100644 index 00000000..f45c2809 --- /dev/null +++ b/clade/build.rs @@ -0,0 +1,13 @@ +use std::env; +use std::path::PathBuf; + +fn main() -> Result<(), Box> { + let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap()); + tonic_build::configure() + .file_descriptor_set_path(out_dir.join("clade_descriptor.bin")) + .build_server(true) + .build_client(true) + .compile(&["proto/schema.proto"], &["proto"])?; + + Ok(()) +} diff --git a/clade/proto/schema.proto b/clade/proto/schema.proto new file mode 100644 index 00000000..93466aa0 --- /dev/null +++ b/clade/proto/schema.proto @@ -0,0 +1,26 @@ +syntax = "proto3"; + +package clade.schema; + +message SchemaObject { + string name = 1; + repeated TableObject tables = 2; +} + +message TableObject { + string name = 1; + string location = 2; +} + +message ListSchemaRequest { + string catalog_name = 1; +} + +message ListSchemaResponse { + repeated SchemaObject schemas = 1; +} + +service SchemaStoreService { + // List the available schemas + rpc ListSchemas(ListSchemaRequest) returns (ListSchemaResponse); +} diff --git a/clade/src/lib.rs b/clade/src/lib.rs new file mode 100644 index 00000000..85931ae3 --- /dev/null +++ b/clade/src/lib.rs @@ -0,0 +1,5 @@ +pub mod schema { + tonic::include_proto!("clade.schema"); + pub const FILE_DESCRIPTOR_SET: &[u8] = + tonic::include_file_descriptor_set!("clade_descriptor"); +} diff --git a/src/catalog.rs b/src/catalog.rs deleted file mode 100644 index e5d2f2e5..00000000 --- a/src/catalog.rs +++ /dev/null @@ -1,801 +0,0 @@ -use std::str::FromStr; -use std::{collections::HashMap, fmt::Debug, sync::Arc}; - -use arrow_schema::Schema; -use async_trait::async_trait; -use datafusion::catalog::schema::MemorySchemaProvider; -use datafusion::datasource::TableProvider; -use datafusion::error::DataFusionError; -use deltalake::DeltaTable; -use itertools::Itertools; -use parking_lot::RwLock; -use uuid::Uuid; - -use crate::object_store::wrapped::InternalObjectStore; -use crate::provider::SeafowlFunction; -use crate::repository::interface::{ - DatabaseRecord, DroppedTableDeletionStatus, DroppedTablesResult, TableRecord, -}; -use crate::system_tables::SystemSchemaProvider; -use crate::wasm_udf::data_types::{ - CreateFunctionDataType, CreateFunctionDetails, CreateFunctionLanguage, - CreateFunctionVolatility, -}; -use crate::{ - provider::{SeafowlDatabase, SeafowlSchema}, - repository::interface::{ - AllDatabaseColumnsResult, AllDatabaseFunctionsResult, CollectionId, - CollectionRecord, DatabaseId, Error as RepositoryError, FunctionId, Repository, - TableId, TableVersionId, TableVersionsResult, - }, -}; - -pub const DEFAULT_DB: &str = "default"; -pub const DEFAULT_SCHEMA: &str = "public"; -pub const STAGING_SCHEMA: &str = "staging"; - -#[derive(Debug)] -pub enum Error { - CatalogDoesNotExist { name: String }, - SchemaDoesNotExist { name: String }, - TableDoesNotExist { name: String }, - TableUuidDoesNotExist { uuid: Uuid }, - TableAlreadyExists { name: String }, - CatalogAlreadyExists { name: String }, - SchemaAlreadyExists { name: String }, - FunctionAlreadyExists { name: String }, - FunctionDeserializationError { reason: String }, - FunctionNotFound { names: String }, - // Creating a table in / dropping the staging schema - UsedStagingSchema, - SqlxError(sqlx::Error), -} - -// TODO janky, we want to: -// - use the ? operator to avoid a lot of map_err -// - but there are 2 distinct error types, so we have to be able to convert them into a single type -// - don't want to impl From for Error (since serde parse errors -// might not just be for FunctionDeserializationError) -// -// Currently, we have a struct that we automatically convert both errors into (storing their messages) -// and then use one map_err to make the final Error::FunctionDeserializationError. -// -// - could use Box? -// - should maybe avoid just passing the to_string() of the error reason, but this is for internal -// use right now anyway (we made a mistake serializing the function into the DB, it's our fault) - -struct CreateFunctionError { - message: String, -} - -impl From for CreateFunctionError { - fn from(val: strum::ParseError) -> Self { - Self { - message: val.to_string(), - } - } -} - -impl From for CreateFunctionError { - fn from(val: serde_json::Error) -> Self { - Self { - message: val.to_string(), - } - } -} - -pub type Result = std::result::Result; - -/// Implement a global converter into a DataFusionError from the catalog error type. -/// These might be raised from different parts of query execution and in different contexts, -/// but we want roughly the same message in each case anyway, so we can take advantage of -/// the ? operator and automatic error conversion. -impl From for DataFusionError { - fn from(val: Error) -> Self { - match val { - Error::CatalogDoesNotExist { name } => { - DataFusionError::Plan(format!("Database {name:?} doesn't exist")) - } - Error::SchemaDoesNotExist { name } => { - DataFusionError::Plan(format!("Schema {name:?} doesn't exist")) - } - Error::TableDoesNotExist { name } => { - DataFusionError::Plan(format!("Table {name:?} doesn't exist")) - } - Error::TableUuidDoesNotExist { uuid } => { - DataFusionError::Plan(format!("Table with UUID {uuid} doesn't exist")) - } - Error::FunctionDeserializationError { reason } => DataFusionError::Internal( - format!("Error deserializing function: {reason:?}"), - ), - - // Errors that are the user's fault. - - // Even though these are "execution" errors, we raise them from the plan stage, - // where we manipulate data in the catalog because that's the only chance we get at - // being async, so we follow DataFusion's convention and return these as Plan errors. - Error::TableAlreadyExists { name } => { - DataFusionError::Plan(format!("Table {name:?} already exists")) - } - Error::CatalogAlreadyExists { name } => { - DataFusionError::Plan(format!("Database {name:?} already exists")) - } - Error::SchemaAlreadyExists { name } => { - DataFusionError::Plan(format!("Schema {name:?} already exists")) - } - Error::FunctionAlreadyExists { name } => { - DataFusionError::Plan(format!("Function {name:?} already exists")) - } - Error::FunctionNotFound { names } => { - DataFusionError::Plan(format!("Function {names:?} not found")) - } - Error::UsedStagingSchema => DataFusionError::Plan( - "The staging schema can only be referenced via CREATE EXTERNAL TABLE" - .to_string(), - ), - // Miscellaneous sqlx error. We want to log it but it's not worth showing to the user. - Error::SqlxError(e) => { - DataFusionError::Plan(format!("Internal SQL error: {:?}", e.to_string())) - } - } - } -} - -// This is the main entrypoint to all individual catalogs for various objects types. -// The intention is to make it extensible and de-coupled from the underlying metastore -// persistence mechanism (such as the presently used `Repository`). -#[derive(Clone)] -pub struct Metastore { - pub catalogs: Arc, - pub schemas: Arc, - pub tables: Arc, - pub functions: Arc, - staging_schema: Arc, - object_store: Arc, -} - -pub struct RepositoryStore { - pub repository: Arc, -} - -impl Metastore { - pub fn new_from_repository( - repository: Arc, - object_store: Arc, - ) -> Self { - let repository_store = Arc::new(RepositoryStore { repository }); - - let staging_schema = Arc::new(MemorySchemaProvider::new()); - Self { - catalogs: repository_store.clone(), - schemas: repository_store.clone(), - tables: repository_store.clone(), - functions: repository_store, - staging_schema, - object_store, - } - } - - pub async fn build_catalog(&self, catalog_name: &str) -> Result { - let all_columns = self.schemas.list(catalog_name).await?; - - // NB we can't distinguish between a database without tables and a database - // that doesn't exist at all due to our query. - - // Turn the list of all collections, tables and their columns into a nested map. - - let schemas: HashMap, Arc> = all_columns - .iter() - .group_by(|col| &col.collection_name) - .into_iter() - .map(|(cn, cc)| self.build_schema(cn, cc)) - .collect(); - - let name: Arc = Arc::from(catalog_name); - - Ok(SeafowlDatabase { - name: name.clone(), - schemas, - staging_schema: self.staging_schema.clone(), - system_schema: Arc::new(SystemSchemaProvider::new(name, self.tables.clone())), - }) - } - - fn build_schema<'a, I>( - &self, - collection_name: &str, - collection_columns: I, - ) -> (Arc, Arc) - where - I: Iterator, - { - let tables = collection_columns - .filter_map(|col| { - if let Some(table_name) = &col.table_name - && let Some(table_uuid) = col.table_uuid - { - Some(self.build_table(table_name, table_uuid)) - } else { - None - } - }) - .collect::>(); - - ( - Arc::from(collection_name.to_string()), - Arc::new(SeafowlSchema { - name: Arc::from(collection_name.to_string()), - tables: RwLock::new(tables), - }), - ) - } - - fn build_table( - &self, - table_name: &str, - table_uuid: Uuid, - ) -> (Arc, Arc) { - // Build a delta table but don't load it yet; we'll do that only for tables that are - // actually referenced in a statement, via the async `table` method of the schema provider. - // TODO: this means that any `information_schema.columns` query will serially load all - // delta tables present in the database. The real fix for this is to make DF use `TableSource` - // for the information schema, and then implement `TableSource` for `DeltaTable` in delta-rs. - let table_log_store = self.object_store.get_log_store(table_uuid); - - let table = DeltaTable::new(table_log_store, Default::default()); - (Arc::from(table_name.to_string()), Arc::new(table) as _) - } - - pub async fn build_functions( - &self, - catalog_name: &str, - ) -> Result> { - let functions = self.functions.list(catalog_name).await?; - - functions - .iter() - .map(|item| { - Self::parse_create_function_details(item) - .map(|details| SeafowlFunction { - function_id: item.id, - name: item.name.to_owned(), - details, - }) - .map_err(|e| Error::FunctionDeserializationError { - reason: e.message, - }) - }) - .collect::>>() - } - - fn parse_create_function_details( - item: &AllDatabaseFunctionsResult, - ) -> std::result::Result { - let AllDatabaseFunctionsResult { - id: _, - name: _, - entrypoint, - language, - input_types, - return_type, - data, - volatility, - } = item; - - Ok(CreateFunctionDetails { - entrypoint: entrypoint.to_string(), - language: CreateFunctionLanguage::from_str(language.as_str())?, - input_types: serde_json::from_str::>( - input_types, - )?, - return_type: CreateFunctionDataType::from_str( - &return_type.as_str().to_ascii_uppercase(), - )?, - data: data.to_string(), - volatility: CreateFunctionVolatility::from_str(volatility.as_str())?, - }) - } -} - -#[async_trait] -pub trait CatalogStore: Sync + Send { - async fn create(&self, name: &str) -> Result; - - async fn get(&self, name: &str) -> Result; - - async fn delete(&self, name: &str) -> Result<()>; -} - -#[async_trait] -pub trait SchemaStore: Sync + Send { - async fn create(&self, catalog_name: &str, schema_name: &str) - -> Result; - - async fn list( - &self, - catalog_name: &str, - ) -> Result, Error>; - - async fn get( - &self, - catalog_name: &str, - schema_name: &str, - ) -> Result; - - async fn delete(&self, catalog_name: &str, schema_name: &str) -> Result<()>; -} - -#[async_trait] -pub trait TableStore: Sync + Send { - async fn create( - &self, - catalog_name: &str, - schema_name: &str, - table_name: &str, - schema: &Schema, - uuid: Uuid, - ) -> Result<(TableId, TableVersionId)>; - - async fn get( - &self, - catalog_name: &str, - schema_name: &str, - table_name: &str, - ) -> Result; - - async fn create_new_version( - &self, - uuid: Uuid, - version: i64, - ) -> Result; - - async fn delete_old_versions( - &self, - catalog_name: &str, - schema_name: &str, - table_name: &str, - ) -> Result; - - async fn get_all_versions( - &self, - catalog_name: &str, - table_names: Option>, - ) -> Result>; - - async fn update( - &self, - old_catalog_name: &str, - old_schema_name: &str, - old_table_name: &str, - new_catalog_name: &str, - new_schema_name: &str, - new_table_name: &str, - ) -> Result<()>; - - async fn delete( - &self, - catalog_name: &str, - schema_name: &str, - table_name: &str, - ) -> Result<()>; - - async fn get_dropped_tables( - &self, - catalog_name: Option, - ) -> Result>; - - async fn update_dropped_table( - &self, - uuid: Uuid, - deletion_status: DroppedTableDeletionStatus, - ) -> Result<(), Error>; - - async fn delete_dropped_table(&self, uuid: Uuid) -> Result<()>; -} - -#[async_trait] -pub trait FunctionStore: Sync + Send { - async fn create( - &self, - catalog_name: &str, - function_name: &str, - or_replace: bool, - details: &CreateFunctionDetails, - ) -> Result; - - async fn list(&self, catalog_name: &str) -> Result>; - - async fn delete( - &self, - catalog_name: &str, - if_exists: bool, - func_names: &[String], - ) -> Result<()>; -} - -impl From for Error { - fn from(err: RepositoryError) -> Error { - Error::SqlxError(match err { - RepositoryError::UniqueConstraintViolation(e) => e, - RepositoryError::FKConstraintViolation(e) => e, - RepositoryError::SqlxError(e) => e, - }) - } -} - -#[async_trait] - -impl CatalogStore for RepositoryStore { - async fn create(&self, name: &str) -> Result { - self.repository - .create_database(name) - .await - .map_err(|e| match e { - RepositoryError::UniqueConstraintViolation(_) => { - Error::CatalogAlreadyExists { - name: name.to_string(), - } - } - e => e.into(), - }) - } - - async fn get(&self, name: &str) -> Result { - self.repository - .get_database(name) - .await - .map_err(|e| match e { - RepositoryError::SqlxError(sqlx::error::Error::RowNotFound) => { - Error::CatalogDoesNotExist { - name: name.to_string(), - } - } - e => e.into(), - }) - } - - async fn delete(&self, name: &str) -> Result<()> { - let database = CatalogStore::get(self, name).await?; - - self.repository - .delete_database(database.id) - .await - .map_err(|e| match e { - RepositoryError::SqlxError(sqlx::error::Error::RowNotFound) => { - Error::CatalogDoesNotExist { - name: name.to_string(), - } - } - e => e.into(), - }) - } -} - -#[async_trait] -impl SchemaStore for RepositoryStore { - async fn create( - &self, - catalog_name: &str, - schema_name: &str, - ) -> Result { - if schema_name == STAGING_SCHEMA { - return Err(Error::UsedStagingSchema); - } - - let database = CatalogStore::get(self, catalog_name).await?; - - self.repository - .create_collection(database.id, schema_name) - .await - .map_err(|e| match e { - RepositoryError::UniqueConstraintViolation(_) => { - Error::SchemaAlreadyExists { - name: schema_name.to_string(), - } - } - e => e.into(), - }) - } - - async fn list( - &self, - catalog_name: &str, - ) -> Result, Error> { - Ok(self.repository.list_collections(catalog_name).await?) - } - - async fn get( - &self, - catalog_name: &str, - schema_name: &str, - ) -> Result { - if schema_name == STAGING_SCHEMA { - return Err(Error::UsedStagingSchema); - } - - self.repository - .get_collection(catalog_name, schema_name) - .await - .map_err(|e| match e { - RepositoryError::SqlxError(sqlx::error::Error::RowNotFound) => { - Error::SchemaDoesNotExist { - name: schema_name.to_string(), - } - } - e => e.into(), - }) - } - - async fn delete(&self, catalog_name: &str, schema_name: &str) -> Result<()> { - let schema = SchemaStore::get(self, catalog_name, schema_name).await?; - - self.repository - .delete_collection(schema.id) - .await - .map_err(|e| match e { - RepositoryError::SqlxError(sqlx::error::Error::RowNotFound) => { - Error::SchemaDoesNotExist { - name: schema_name.to_string(), - } - } - e => e.into(), - }) - } -} - -#[async_trait] -impl TableStore for RepositoryStore { - async fn create( - &self, - catalog_name: &str, - schema_name: &str, - table_name: &str, - schema: &Schema, - uuid: Uuid, - ) -> Result<(TableId, TableVersionId)> { - let collection = SchemaStore::get(self, catalog_name, schema_name).await?; - - self.repository - .create_table(collection.id, table_name, schema, uuid) - .await - .map_err(|e| match e { - RepositoryError::UniqueConstraintViolation(_) => { - Error::TableAlreadyExists { - name: table_name.to_string(), - } - } - RepositoryError::FKConstraintViolation(_) => Error::SchemaDoesNotExist { - name: schema_name.to_string(), - }, - RepositoryError::SqlxError(e) => Error::SqlxError(e), - }) - } - - async fn get( - &self, - catalog_name: &str, - schema_name: &str, - table_name: &str, - ) -> Result { - self.repository - .get_table(catalog_name, schema_name, table_name) - .await - .map_err(|e| match e { - RepositoryError::SqlxError(sqlx::error::Error::RowNotFound) => { - Error::TableDoesNotExist { - name: table_name.to_string(), - } - } - e => e.into(), - }) - } - - async fn create_new_version( - &self, - uuid: Uuid, - version: i64, - ) -> Result { - self.repository - .create_new_version(uuid, version) - .await - .map_err(|e| match e { - RepositoryError::SqlxError(sqlx::error::Error::RowNotFound) => { - Error::TableUuidDoesNotExist { uuid } - } - e => e.into(), - }) - } - - async fn delete_old_versions( - &self, - catalog_name: &str, - schema_name: &str, - table_name: &str, - ) -> Result { - let table = TableStore::get(self, catalog_name, schema_name, table_name).await?; - - Ok(self.repository.delete_old_versions(table.id).await?) - } - - async fn get_all_versions( - &self, - catalog_name: &str, - table_names: Option>, - ) -> Result> { - Ok(self - .repository - .get_all_versions(catalog_name, table_names) - .await?) - } - - async fn update( - &self, - old_catalog_name: &str, - old_schema_name: &str, - old_table_name: &str, - new_catalog_name: &str, - new_schema_name: &str, - new_table_name: &str, - ) -> Result<()> { - assert_eq!( - old_catalog_name, new_catalog_name, - "Moving across catalogs not yet supported" - ); - - let table = - TableStore::get(self, old_catalog_name, old_schema_name, old_table_name) - .await?; - let new_schema_id = if new_schema_name != old_schema_name { - let schema = - SchemaStore::get(self, old_catalog_name, new_schema_name).await?; - Some(schema.id) - } else { - None - }; - - self.repository - .rename_table(table.id, new_table_name, new_schema_id) - .await - .map_err(|e| match e { - RepositoryError::FKConstraintViolation(_) => { - // We only FK on collection_id, so this will be Some - Error::SchemaDoesNotExist { - name: new_schema_name.to_string(), - } - } - RepositoryError::UniqueConstraintViolation(_) => { - Error::TableAlreadyExists { - name: new_table_name.to_string(), - } - } - RepositoryError::SqlxError(sqlx::error::Error::RowNotFound) => { - Error::TableDoesNotExist { - name: old_table_name.to_string(), - } - } - e => e.into(), - }) - } - - async fn delete( - &self, - catalog_name: &str, - schema_name: &str, - table_name: &str, - ) -> Result<()> { - let table = TableStore::get(self, catalog_name, schema_name, table_name).await?; - - self.repository - .delete_table(table.id) - .await - .map_err(|e| match e { - RepositoryError::SqlxError(sqlx::error::Error::RowNotFound) => { - Error::TableDoesNotExist { - name: table_name.to_string(), - } - } - e => e.into(), - }) - } - - async fn get_dropped_tables( - &self, - catalog_name: Option, - ) -> Result> { - Ok(self.repository.get_dropped_tables(catalog_name).await?) - } - - async fn update_dropped_table( - &self, - uuid: Uuid, - deletion_status: DroppedTableDeletionStatus, - ) -> Result<(), Error> { - self.repository - .update_dropped_table(uuid, deletion_status) - .await - .map_err(|e| match e { - RepositoryError::SqlxError(sqlx::error::Error::RowNotFound) => { - Error::TableUuidDoesNotExist { uuid } - } - e => e.into(), - }) - } - - async fn delete_dropped_table(&self, uuid: Uuid) -> Result<()> { - self.repository - .delete_dropped_table(uuid) - .await - .map_err(|e| match e { - RepositoryError::SqlxError(sqlx::error::Error::RowNotFound) => { - Error::TableUuidDoesNotExist { uuid } - } - e => e.into(), - }) - } -} - -#[async_trait] -impl FunctionStore for RepositoryStore { - async fn create( - &self, - catalog_name: &str, - function_name: &str, - or_replace: bool, - details: &CreateFunctionDetails, - ) -> Result { - let database = CatalogStore::get(self, catalog_name).await?; - - self.repository - .create_function(database.id, function_name, or_replace, details) - .await - .map_err(|e| match e { - RepositoryError::FKConstraintViolation(_) => Error::CatalogDoesNotExist { - name: catalog_name.to_string(), - }, - RepositoryError::UniqueConstraintViolation(_) => { - Error::FunctionAlreadyExists { - name: function_name.to_string(), - } - } - e => e.into(), - }) - } - - async fn list(&self, catalog_name: &str) -> Result> { - let database = CatalogStore::get(self, catalog_name).await?; - - Ok(self - .repository - .get_all_functions_in_database(database.id) - .await?) - } - - async fn delete( - &self, - catalog_name: &str, - if_exists: bool, - - func_names: &[String], - ) -> Result<()> { - let database = CatalogStore::get(self, catalog_name).await?; - - match self.repository.drop_function(database.id, func_names).await { - Ok(id) => Ok(id), - Err(RepositoryError::FKConstraintViolation(_)) => { - Err(Error::CatalogDoesNotExist { - name: catalog_name.to_string(), - }) - } - Err(RepositoryError::SqlxError(sqlx::error::Error::RowNotFound)) => { - if if_exists { - Ok(()) - } else { - Err(Error::FunctionNotFound { - names: func_names.join(", "), - }) - } - } - Err(e) => Err(e.into()), - } - } -} diff --git a/src/catalog/external.rs b/src/catalog/external.rs new file mode 100644 index 00000000..0c58d739 --- /dev/null +++ b/src/catalog/external.rs @@ -0,0 +1,197 @@ +use crate::catalog::{ + not_impl, CatalogResult, CatalogStore, FunctionStore, SchemaStore, TableStore, +}; +use crate::repository::interface::{ + AllDatabaseFunctionsResult, CollectionRecord, DatabaseRecord, + DroppedTableDeletionStatus, DroppedTablesResult, TableId, TableRecord, + TableVersionId, TableVersionsResult, +}; +use crate::wasm_udf::data_types::CreateFunctionDetails; +use arrow_schema::Schema; +use clade::schema::schema_store_service_client::SchemaStoreServiceClient; +use clade::schema::{ListSchemaRequest, ListSchemaResponse}; +use tonic::transport::{channel::Channel, Endpoint, Error}; +use tonic::Request; +use uuid::Uuid; + +// An external store, facilitated via a remote clade server implementation +#[derive(Clone)] +pub struct ExternalStore { + client: SchemaStoreServiceClient, +} + +impl ExternalStore { + // Create a new external store implementing the clade interface + pub async fn new(dsn: String) -> Result { + let endpoint = Endpoint::new(dsn)?.connect_lazy(); + let client = SchemaStoreServiceClient::new(endpoint); + Ok(Self { client }) + } + + // Tonic client implementations always end up needing mut references, and apparently the way + // to go is cloning a client instance instead of introducing synchronization primitives: + // https://github.com/hyperium/tonic/issues/33#issuecomment-538154015 + fn client(&self) -> SchemaStoreServiceClient { + self.client.clone() + } +} + +#[tonic::async_trait] +impl CatalogStore for ExternalStore { + async fn create(&self, _name: &str) -> CatalogResult<()> { + not_impl() + } + + async fn get(&self, _name: &str) -> CatalogResult { + not_impl() + } + + async fn delete(&self, _name: &str) -> CatalogResult<()> { + not_impl() + } +} + +#[tonic::async_trait] +impl SchemaStore for ExternalStore { + async fn create(&self, _catalog_name: &str, _schema_name: &str) -> CatalogResult<()> { + not_impl() + } + + async fn list(&self, catalog_name: &str) -> CatalogResult { + let req = Request::new(ListSchemaRequest { + catalog_name: catalog_name.to_string(), + }); + + let response = self.client().list_schemas(req).await?; + Ok(response.into_inner()) + } + + async fn get( + &self, + _catalog_name: &str, + _schema_name: &str, + ) -> CatalogResult { + not_impl() + } + + async fn delete(&self, _catalog_name: &str, _schema_name: &str) -> CatalogResult<()> { + not_impl() + } +} + +#[tonic::async_trait] +impl TableStore for ExternalStore { + async fn create( + &self, + _catalog_name: &str, + _schema_name: &str, + _table_name: &str, + _schema: &Schema, + _uuid: Uuid, + ) -> CatalogResult<(TableId, TableVersionId)> { + not_impl() + } + + async fn get( + &self, + _catalog_name: &str, + _schema_name: &str, + _table_name: &str, + ) -> CatalogResult { + not_impl() + } + + async fn create_new_version( + &self, + _uuid: Uuid, + _version: i64, + ) -> CatalogResult { + not_impl() + } + + async fn delete_old_versions( + &self, + _catalog_name: &str, + _schema_name: &str, + _table_name: &str, + ) -> CatalogResult { + not_impl() + } + + async fn get_all_versions( + &self, + _catalog_name: &str, + _table_names: Option>, + ) -> CatalogResult> { + not_impl() + } + + async fn update( + &self, + _old_catalog_name: &str, + _old_schema_name: &str, + _old_table_name: &str, + _new_catalog_name: &str, + _new_schema_name: &str, + _new_table_name: &str, + ) -> CatalogResult<()> { + not_impl() + } + + async fn delete( + &self, + _catalog_name: &str, + _schema_name: &str, + _table_name: &str, + ) -> CatalogResult<()> { + not_impl() + } + + async fn get_dropped_tables( + &self, + _catalog_name: Option, + ) -> CatalogResult> { + not_impl() + } + + async fn update_dropped_table( + &self, + _uuid: Uuid, + _deletion_status: DroppedTableDeletionStatus, + ) -> CatalogResult<()> { + not_impl() + } + + async fn delete_dropped_table(&self, _uuid: Uuid) -> CatalogResult<()> { + not_impl() + } +} + +#[tonic::async_trait] +impl FunctionStore for ExternalStore { + async fn create( + &self, + _catalog_name: &str, + _function_name: &str, + _or_replace: bool, + _details: &CreateFunctionDetails, + ) -> CatalogResult<()> { + not_impl() + } + + async fn list( + &self, + _catalog_name: &str, + ) -> CatalogResult> { + Ok(vec![]) + } + + async fn delete( + &self, + _catalog_name: &str, + _if_exists: bool, + _func_names: &[String], + ) -> CatalogResult<()> { + not_impl() + } +} diff --git a/src/catalog/metastore.rs b/src/catalog/metastore.rs new file mode 100644 index 00000000..76706ffc --- /dev/null +++ b/src/catalog/metastore.rs @@ -0,0 +1,180 @@ +use crate::catalog::external::ExternalStore; +use crate::catalog::repository::RepositoryStore; +use crate::catalog::{ + CatalogError, CatalogResult, CatalogStore, CreateFunctionError, FunctionStore, + SchemaStore, TableStore, +}; +use crate::object_store::wrapped::InternalObjectStore; +use crate::provider::{SeafowlDatabase, SeafowlFunction, SeafowlSchema}; +use crate::repository::interface::{AllDatabaseFunctionsResult, Repository}; +use crate::system_tables::SystemSchemaProvider; +use crate::wasm_udf::data_types::{ + CreateFunctionDataType, CreateFunctionDetails, CreateFunctionLanguage, + CreateFunctionVolatility, +}; +use clade::schema::SchemaObject; +use datafusion::catalog::schema::MemorySchemaProvider; +use datafusion::datasource::TableProvider; +use deltalake::DeltaTable; +use parking_lot::RwLock; +use std::collections::HashMap; +use std::str::FromStr; +use std::sync::Arc; + +// This is the main entrypoint to all individual catalogs for various objects types. +// The intention is to make it extensible and de-coupled from the underlying metastore +// persistence mechanism (such as the presently used `Repository`). +#[derive(Clone)] +pub struct Metastore { + pub catalogs: Arc, + pub schemas: Arc, + pub tables: Arc, + pub functions: Arc, + staging_schema: Arc, + object_store: Arc, +} + +impl Metastore { + pub fn new_from_repository( + repository: Arc, + object_store: Arc, + ) -> Self { + let repository_store = Arc::new(RepositoryStore { repository }); + + let staging_schema = Arc::new(MemorySchemaProvider::new()); + Self { + catalogs: repository_store.clone(), + schemas: repository_store.clone(), + tables: repository_store.clone(), + functions: repository_store, + staging_schema, + object_store, + } + } + + pub fn new_from_external( + external_store: Arc, + object_store: Arc, + ) -> Self { + let staging_schema = Arc::new(MemorySchemaProvider::new()); + Self { + catalogs: external_store.clone(), + schemas: external_store.clone(), + tables: external_store.clone(), + functions: external_store, + staging_schema, + object_store, + } + } + + pub async fn build_catalog( + &self, + catalog_name: &str, + ) -> CatalogResult { + let catalog_schemas = self.schemas.list(catalog_name).await?; + + // NB we can't distinguish between a database without tables and a database + // that doesn't exist at all due to our query. + + // Turn the list of all collections, tables and their columns into a nested map. + + let schemas: HashMap, Arc> = catalog_schemas + .schemas + .into_iter() + .map(|schema| self.build_schema(schema)) + .collect(); + + let name: Arc = Arc::from(catalog_name); + + Ok(SeafowlDatabase { + name: name.clone(), + schemas, + staging_schema: self.staging_schema.clone(), + system_schema: Arc::new(SystemSchemaProvider::new(name, self.tables.clone())), + }) + } + + fn build_schema(&self, schema: SchemaObject) -> (Arc, Arc) { + let schema_name = schema.name; + + let tables = schema + .tables + .into_iter() + .map(|table| self.build_table(table.name, &table.location)) + .collect::>(); + + ( + Arc::from(schema_name.clone()), + Arc::new(SeafowlSchema { + name: Arc::from(schema_name), + tables: RwLock::new(tables), + }), + ) + } + + fn build_table( + &self, + table_name: String, + table_uuid: &str, + ) -> (Arc, Arc) { + // Build a delta table but don't load it yet; we'll do that only for tables that are + // actually referenced in a statement, via the async `table` method of the schema provider. + // TODO: this means that any `information_schema.columns` query will serially load all + // delta tables present in the database. The real fix for this is to make DF use `TableSource` + // for the information schema, and then implement `TableSource` for `DeltaTable` in delta-rs. + let table_log_store = self.object_store.get_log_store(table_uuid); + + let table = DeltaTable::new(table_log_store, Default::default()); + (Arc::from(table_name), Arc::new(table) as _) + } + + pub async fn build_functions( + &self, + catalog_name: &str, + ) -> CatalogResult> { + let functions = self.functions.list(catalog_name).await?; + + functions + .iter() + .map(|item| { + Self::parse_create_function_details(item) + .map(|details| SeafowlFunction { + function_id: item.id, + name: item.name.to_owned(), + details, + }) + .map_err(|e| CatalogError::FunctionDeserializationError { + reason: e.message, + }) + }) + .collect::>>() + } + + fn parse_create_function_details( + item: &AllDatabaseFunctionsResult, + ) -> Result { + let AllDatabaseFunctionsResult { + id: _, + name: _, + entrypoint, + language, + input_types, + return_type, + data, + volatility, + } = item; + + Ok(CreateFunctionDetails { + entrypoint: entrypoint.to_string(), + language: CreateFunctionLanguage::from_str(language.as_str())?, + input_types: serde_json::from_str::>( + input_types, + )?, + return_type: CreateFunctionDataType::from_str( + &return_type.as_str().to_ascii_uppercase(), + )?, + data: data.to_string(), + volatility: CreateFunctionVolatility::from_str(volatility.as_str())?, + }) + } +} diff --git a/src/catalog/mod.rs b/src/catalog/mod.rs new file mode 100644 index 00000000..4d74733a --- /dev/null +++ b/src/catalog/mod.rs @@ -0,0 +1,242 @@ +use crate::repository::interface::{ + AllDatabaseFunctionsResult, CollectionRecord, DatabaseRecord, + DroppedTableDeletionStatus, DroppedTablesResult, TableId, TableRecord, + TableVersionId, TableVersionsResult, +}; +use crate::wasm_udf::data_types::CreateFunctionDetails; +use arrow_schema::Schema; +use async_trait::async_trait; +use clade::schema::ListSchemaResponse; +use datafusion_common::DataFusionError; +use tonic::Status; +use uuid::Uuid; + +pub mod external; +pub mod metastore; +mod repository; + +pub const DEFAULT_DB: &str = "default"; +pub const DEFAULT_SCHEMA: &str = "public"; +pub const STAGING_SCHEMA: &str = "staging"; + +#[derive(Debug, thiserror::Error)] +pub enum CatalogError { + // Catalog errors + #[error("Catalog {name:?} doesn't exist")] + CatalogDoesNotExist { name: String }, + + #[error("Catalog {name:?} already exists")] + CatalogAlreadyExists { name: String }, + + // Schema errors + #[error("Schema {name:?} doesn't exist")] + SchemaDoesNotExist { name: String }, + + #[error("Schema {name:?} already exists")] + SchemaAlreadyExists { name: String }, + + // Table errors + #[error("Table {name:?} doesn't exist")] + TableDoesNotExist { name: String }, + + #[error("Table with UUID {uuid} doesn't exist")] + TableUuidDoesNotExist { uuid: Uuid }, + + #[error("Table {name:?} already exists")] + TableAlreadyExists { name: String }, + + // Function errors + #[error("Function {name:?} already exists")] + FunctionAlreadyExists { name: String }, + + #[error("Error deserializing function: {reason}")] + FunctionDeserializationError { reason: String }, + + #[error("Function {names:?} not found")] + FunctionNotFound { names: String }, + + // Creating a table in / dropping the staging schema + #[error("The staging schema can only be referenced via CREATE EXTERNAL TABLE")] + UsedStagingSchema, + + #[error("Catalog method not implemented: {reason}")] + NotImplemented { reason: String }, + + // Metastore implementation errors + #[error("Internal SQL error: {0:?}")] + SqlxError(sqlx::Error), + + #[error(transparent)] + TonicStatus(#[from] Status), +} + +/// Implement a global converter into a DataFusionError from the catalog error type. +/// These might be raised from different parts of query execution and in different contexts, +/// but we want roughly the same message in each case anyway, so we can take advantage of +/// the ? operator and automatic error conversion. +impl From for DataFusionError { + fn from(val: CatalogError) -> Self { + match val { + CatalogError::NotImplemented { reason } => { + DataFusionError::NotImplemented(reason) + } + _ => DataFusionError::Plan(val.to_string()), + } + } +} + +fn not_impl() -> CatalogResult { + Err(CatalogError::NotImplemented { + reason: "Metastore method not supported".to_string(), + }) +} + +// TODO janky, we want to: +// - use the ? operator to avoid a lot of map_err +// - but there are 2 distinct error types, so we have to be able to convert them into a single type +// - don't want to impl From for Error (since serde parse errors +// might not just be for FunctionDeserializationError) +// +// Currently, we have a struct that we automatically convert both errors into (storing their messages) +// and then use one map_err to make the final Error::FunctionDeserializationError. +// +// - could use Box? +// - should maybe avoid just passing the to_string() of the error reason, but this is for internal +// use right now anyway (we made a mistake serializing the function into the DB, it's our fault) + +pub(super) struct CreateFunctionError { + message: String, +} + +impl From for CreateFunctionError { + fn from(val: strum::ParseError) -> Self { + Self { + message: val.to_string(), + } + } +} + +impl From for CreateFunctionError { + fn from(val: serde_json::Error) -> Self { + Self { + message: val.to_string(), + } + } +} + +pub type CatalogResult = Result; + +#[async_trait] +pub trait CatalogStore: Sync + Send { + async fn create(&self, name: &str) -> CatalogResult<()>; + + async fn get(&self, name: &str) -> CatalogResult; + + async fn delete(&self, name: &str) -> CatalogResult<()>; +} + +#[async_trait] +pub trait SchemaStore: Sync + Send { + async fn create(&self, catalog_name: &str, schema_name: &str) -> CatalogResult<()>; + + async fn list(&self, catalog_name: &str) -> CatalogResult; + + async fn get( + &self, + catalog_name: &str, + schema_name: &str, + ) -> CatalogResult; + + async fn delete(&self, catalog_name: &str, schema_name: &str) -> CatalogResult<()>; +} + +#[async_trait] +pub trait TableStore: Sync + Send { + async fn create( + &self, + catalog_name: &str, + schema_name: &str, + table_name: &str, + schema: &Schema, + uuid: Uuid, + ) -> CatalogResult<(TableId, TableVersionId)>; + + async fn get( + &self, + catalog_name: &str, + schema_name: &str, + table_name: &str, + ) -> CatalogResult; + + async fn create_new_version( + &self, + uuid: Uuid, + version: i64, + ) -> CatalogResult; + + async fn delete_old_versions( + &self, + catalog_name: &str, + schema_name: &str, + table_name: &str, + ) -> CatalogResult; + + async fn get_all_versions( + &self, + catalog_name: &str, + table_names: Option>, + ) -> CatalogResult>; + + async fn update( + &self, + old_catalog_name: &str, + old_schema_name: &str, + old_table_name: &str, + new_catalog_name: &str, + new_schema_name: &str, + new_table_name: &str, + ) -> CatalogResult<()>; + + async fn delete( + &self, + catalog_name: &str, + schema_name: &str, + table_name: &str, + ) -> CatalogResult<()>; + + async fn get_dropped_tables( + &self, + catalog_name: Option, + ) -> CatalogResult>; + + async fn update_dropped_table( + &self, + uuid: Uuid, + deletion_status: DroppedTableDeletionStatus, + ) -> CatalogResult<()>; + + async fn delete_dropped_table(&self, uuid: Uuid) -> CatalogResult<()>; +} + +#[async_trait] +pub trait FunctionStore: Sync + Send { + async fn create( + &self, + catalog_name: &str, + function_name: &str, + or_replace: bool, + details: &CreateFunctionDetails, + ) -> CatalogResult<()>; + + async fn list( + &self, + catalog_name: &str, + ) -> CatalogResult>; + + async fn delete( + &self, + catalog_name: &str, + if_exists: bool, + func_names: &[String], + ) -> CatalogResult<()>; +} diff --git a/src/catalog/repository.rs b/src/catalog/repository.rs new file mode 100644 index 00000000..1cb6c529 --- /dev/null +++ b/src/catalog/repository.rs @@ -0,0 +1,444 @@ +use std::sync::Arc; + +use arrow_schema::Schema; +use async_trait::async_trait; +use itertools::Itertools; +use uuid::Uuid; + +use clade::schema::{ListSchemaResponse, SchemaObject, TableObject}; + +use crate::catalog::{ + CatalogError, CatalogResult, CatalogStore, FunctionStore, SchemaStore, TableStore, + STAGING_SCHEMA, +}; +use crate::repository::interface::{ + AllDatabaseFunctionsResult, CollectionRecord, Error as RepositoryError, Repository, + TableId, TableVersionId, TableVersionsResult, +}; +use crate::repository::interface::{ + DatabaseRecord, DroppedTableDeletionStatus, DroppedTablesResult, TableRecord, +}; +use crate::wasm_udf::data_types::CreateFunctionDetails; + +// The native catalog implementation for Seafowl. +pub struct RepositoryStore { + pub repository: Arc, +} + +impl From for CatalogError { + fn from(err: RepositoryError) -> CatalogError { + CatalogError::SqlxError(match err { + RepositoryError::UniqueConstraintViolation(e) => e, + RepositoryError::FKConstraintViolation(e) => e, + RepositoryError::SqlxError(e) => e, + }) + } +} + +#[async_trait] +impl CatalogStore for RepositoryStore { + async fn create(&self, name: &str) -> CatalogResult<()> { + self.repository + .create_database(name) + .await + .map_err(|e| match e { + RepositoryError::UniqueConstraintViolation(_) => { + CatalogError::CatalogAlreadyExists { + name: name.to_string(), + } + } + e => e.into(), + })?; + + Ok(()) + } + + async fn get(&self, name: &str) -> CatalogResult { + self.repository + .get_database(name) + .await + .map_err(|e| match e { + RepositoryError::SqlxError(sqlx::error::Error::RowNotFound) => { + CatalogError::CatalogDoesNotExist { + name: name.to_string(), + } + } + e => e.into(), + }) + } + + async fn delete(&self, name: &str) -> CatalogResult<()> { + let database = CatalogStore::get(self, name).await?; + + self.repository + .delete_database(database.id) + .await + .map_err(|e| match e { + RepositoryError::SqlxError(sqlx::error::Error::RowNotFound) => { + CatalogError::CatalogDoesNotExist { + name: name.to_string(), + } + } + e => e.into(), + }) + } +} + +#[async_trait] +impl SchemaStore for RepositoryStore { + async fn create(&self, catalog_name: &str, schema_name: &str) -> CatalogResult<()> { + if schema_name == STAGING_SCHEMA { + return Err(CatalogError::UsedStagingSchema); + } + + let database = CatalogStore::get(self, catalog_name).await?; + + self.repository + .create_collection(database.id, schema_name) + .await + .map_err(|e| match e { + RepositoryError::UniqueConstraintViolation(_) => { + CatalogError::SchemaAlreadyExists { + name: schema_name.to_string(), + } + } + e => e.into(), + })?; + + Ok(()) + } + + async fn list(&self, catalog_name: &str) -> CatalogResult { + let cols = self.repository.list_collections(catalog_name).await?; + + let schemas = cols + .iter() + .group_by(|col| &col.collection_name) + .into_iter() + .map(|(cn, ct)| SchemaObject { + name: cn.clone(), + tables: ct + .into_iter() + .filter_map(|t| { + if let Some(name) = &t.table_name + && let Some(uuid) = t.table_uuid + { + Some(TableObject { + name: name.clone(), + location: uuid.to_string(), + }) + } else { + None + } + }) + .collect(), + }) + .collect(); + + Ok(ListSchemaResponse { schemas }) + } + + async fn get( + &self, + catalog_name: &str, + schema_name: &str, + ) -> CatalogResult { + if schema_name == STAGING_SCHEMA { + return Err(CatalogError::UsedStagingSchema); + } + + self.repository + .get_collection(catalog_name, schema_name) + .await + .map_err(|e| match e { + RepositoryError::SqlxError(sqlx::error::Error::RowNotFound) => { + CatalogError::SchemaDoesNotExist { + name: schema_name.to_string(), + } + } + e => e.into(), + }) + } + + async fn delete(&self, catalog_name: &str, schema_name: &str) -> CatalogResult<()> { + let schema = SchemaStore::get(self, catalog_name, schema_name).await?; + + self.repository + .delete_collection(schema.id) + .await + .map_err(|e| match e { + RepositoryError::SqlxError(sqlx::error::Error::RowNotFound) => { + CatalogError::SchemaDoesNotExist { + name: schema_name.to_string(), + } + } + e => e.into(), + }) + } +} + +#[async_trait] +impl TableStore for RepositoryStore { + async fn create( + &self, + catalog_name: &str, + schema_name: &str, + table_name: &str, + schema: &Schema, + uuid: Uuid, + ) -> CatalogResult<(TableId, TableVersionId)> { + let collection = SchemaStore::get(self, catalog_name, schema_name).await?; + + self.repository + .create_table(collection.id, table_name, schema, uuid) + .await + .map_err(|e| match e { + RepositoryError::UniqueConstraintViolation(_) => { + CatalogError::TableAlreadyExists { + name: table_name.to_string(), + } + } + RepositoryError::FKConstraintViolation(_) => { + CatalogError::SchemaDoesNotExist { + name: schema_name.to_string(), + } + } + RepositoryError::SqlxError(e) => CatalogError::SqlxError(e), + }) + } + + async fn get( + &self, + catalog_name: &str, + schema_name: &str, + table_name: &str, + ) -> CatalogResult { + self.repository + .get_table(catalog_name, schema_name, table_name) + .await + .map_err(|e| match e { + RepositoryError::SqlxError(sqlx::error::Error::RowNotFound) => { + CatalogError::TableDoesNotExist { + name: table_name.to_string(), + } + } + e => e.into(), + }) + } + + async fn create_new_version( + &self, + uuid: Uuid, + version: i64, + ) -> CatalogResult { + self.repository + .create_new_version(uuid, version) + .await + .map_err(|e| match e { + RepositoryError::SqlxError(sqlx::error::Error::RowNotFound) => { + CatalogError::TableUuidDoesNotExist { uuid } + } + e => e.into(), + }) + } + + async fn delete_old_versions( + &self, + catalog_name: &str, + schema_name: &str, + table_name: &str, + ) -> CatalogResult { + let table = TableStore::get(self, catalog_name, schema_name, table_name).await?; + + Ok(self.repository.delete_old_versions(table.id).await?) + } + + async fn get_all_versions( + &self, + catalog_name: &str, + table_names: Option>, + ) -> CatalogResult> { + Ok(self + .repository + .get_all_versions(catalog_name, table_names) + .await?) + } + + async fn update( + &self, + old_catalog_name: &str, + old_schema_name: &str, + old_table_name: &str, + new_catalog_name: &str, + new_schema_name: &str, + new_table_name: &str, + ) -> CatalogResult<()> { + assert_eq!( + old_catalog_name, new_catalog_name, + "Moving across catalogs not yet supported" + ); + + let table = + TableStore::get(self, old_catalog_name, old_schema_name, old_table_name) + .await?; + let new_schema_id = if new_schema_name != old_schema_name { + let schema = + SchemaStore::get(self, old_catalog_name, new_schema_name).await?; + Some(schema.id) + } else { + None + }; + + self.repository + .rename_table(table.id, new_table_name, new_schema_id) + .await + .map_err(|e| match e { + RepositoryError::FKConstraintViolation(_) => { + // We only FK on collection_id, so this will be Some + CatalogError::SchemaDoesNotExist { + name: new_schema_name.to_string(), + } + } + RepositoryError::UniqueConstraintViolation(_) => { + CatalogError::TableAlreadyExists { + name: new_table_name.to_string(), + } + } + RepositoryError::SqlxError(sqlx::error::Error::RowNotFound) => { + CatalogError::TableDoesNotExist { + name: old_table_name.to_string(), + } + } + e => e.into(), + }) + } + + async fn delete( + &self, + catalog_name: &str, + schema_name: &str, + table_name: &str, + ) -> CatalogResult<()> { + let table = TableStore::get(self, catalog_name, schema_name, table_name).await?; + + self.repository + .delete_table(table.id) + .await + .map_err(|e| match e { + RepositoryError::SqlxError(sqlx::error::Error::RowNotFound) => { + CatalogError::TableDoesNotExist { + name: table_name.to_string(), + } + } + e => e.into(), + }) + } + + async fn get_dropped_tables( + &self, + catalog_name: Option, + ) -> CatalogResult> { + Ok(self.repository.get_dropped_tables(catalog_name).await?) + } + + async fn update_dropped_table( + &self, + uuid: Uuid, + deletion_status: DroppedTableDeletionStatus, + ) -> CatalogResult<()> { + self.repository + .update_dropped_table(uuid, deletion_status) + .await + .map_err(|e| match e { + RepositoryError::SqlxError(sqlx::error::Error::RowNotFound) => { + CatalogError::TableUuidDoesNotExist { uuid } + } + e => e.into(), + }) + } + + async fn delete_dropped_table(&self, uuid: Uuid) -> CatalogResult<()> { + self.repository + .delete_dropped_table(uuid) + .await + .map_err(|e| match e { + RepositoryError::SqlxError(sqlx::error::Error::RowNotFound) => { + CatalogError::TableUuidDoesNotExist { uuid } + } + e => e.into(), + }) + } +} + +#[async_trait] +impl FunctionStore for RepositoryStore { + async fn create( + &self, + catalog_name: &str, + function_name: &str, + or_replace: bool, + details: &CreateFunctionDetails, + ) -> CatalogResult<()> { + let database = CatalogStore::get(self, catalog_name).await?; + + self.repository + .create_function(database.id, function_name, or_replace, details) + .await + .map_err(|e| match e { + RepositoryError::FKConstraintViolation(_) => { + CatalogError::CatalogDoesNotExist { + name: catalog_name.to_string(), + } + } + RepositoryError::UniqueConstraintViolation(_) => { + CatalogError::FunctionAlreadyExists { + name: function_name.to_string(), + } + } + e => e.into(), + })?; + + Ok(()) + } + + async fn list( + &self, + catalog_name: &str, + ) -> CatalogResult> { + let database = CatalogStore::get(self, catalog_name).await?; + + Ok(self + .repository + .get_all_functions_in_database(database.id) + .await?) + } + + async fn delete( + &self, + catalog_name: &str, + if_exists: bool, + + func_names: &[String], + ) -> CatalogResult<()> { + let database = CatalogStore::get(self, catalog_name).await?; + + match self.repository.drop_function(database.id, func_names).await { + Ok(id) => Ok(id), + Err(RepositoryError::FKConstraintViolation(_)) => { + Err(CatalogError::CatalogDoesNotExist { + name: catalog_name.to_string(), + }) + } + Err(RepositoryError::SqlxError(sqlx::error::Error::RowNotFound)) => { + if if_exists { + Ok(()) + } else { + Err(CatalogError::FunctionNotFound { + names: func_names.join(", "), + }) + } + } + Err(e) => Err(e.into()), + } + } +} diff --git a/src/config/context.rs b/src/config/context.rs index e43bde77..cfbd3796 100644 --- a/src/config/context.rs +++ b/src/config/context.rs @@ -17,7 +17,7 @@ use object_store::{local::LocalFileSystem, memory::InMemory, ObjectStore}; #[cfg(feature = "catalog-postgres")] use crate::repository::postgres::PostgresRepository; -use crate::catalog::{Error, Metastore}; +use crate::catalog::{external::ExternalStore, metastore::Metastore, CatalogError}; use crate::object_store::http::add_http_object_store; use crate::object_store::wrapped::InternalObjectStore; #[cfg(feature = "remote-tables")] @@ -58,6 +58,14 @@ async fn build_metastore( .await .expect("Error setting up the database"), ), + schema::Catalog::Clade(schema::Clade { dsn }) => { + let external = Arc::new( + ExternalStore::new(dsn.clone()) + .await + .expect("Error setting up remote store"), + ); + return Metastore::new_from_external(external, object_store); + } }; Metastore::new_from_repository(repository, object_store) @@ -178,13 +186,13 @@ pub async fn build_context(cfg: &schema::SeafowlConfig) -> Result String { - "public".to_string() + DEFAULT_SCHEMA.to_string() } #[derive(Deserialize, Debug, PartialEq, Eq, Default, Clone)] diff --git a/src/context/delta.rs b/src/context/delta.rs index 37fe2b0f..d24ffcf1 100644 --- a/src/context/delta.rs +++ b/src/context/delta.rs @@ -307,8 +307,9 @@ impl SeafowlContext { // with the returned uuid (and delete the catalog entry if the object store creation fails). // On the other hand that would complicate etag testing logic. let table_uuid = get_uuid(); - let table_log_store = - self.internal_object_store.get_log_store(table_uuid); + let table_log_store = self + .internal_object_store + .get_log_store(&table_uuid.to_string()); let delta_schema = DeltaSchema::try_from(&schema)?; let table = CreateBuilder::new() @@ -330,8 +331,9 @@ impl SeafowlContext { "Unable to parse the UUID path of the table: {e}" )) })?; - let table_log_store = - self.internal_object_store.get_log_store(table_uuid); + let table_log_store = self + .internal_object_store + .get_log_store(&table_uuid.to_string()); let table = ConvertToDeltaBuilder::new() .with_log_store(table_log_store) .with_table_name(&*table_name) @@ -372,8 +374,9 @@ impl SeafowlContext { plan: &Arc, ) -> Result { let table_uuid = self.get_table_uuid(name).await?; - let table_log_store = self.internal_object_store.get_log_store(table_uuid); - let table_prefix = self.internal_object_store.table_prefix(table_uuid); + let prefix = table_uuid.to_string(); + let table_log_store = self.internal_object_store.get_log_store(&prefix); + let table_prefix = self.internal_object_store.table_prefix(&prefix); // Upload partition files to table's root directory let adds = plan_to_object_store( @@ -509,7 +512,7 @@ mod tests { &ctx.inner.state(), &execution_plan, object_store.clone(), - object_store.table_prefix(table_uuid), + object_store.table_prefix(&table_uuid.to_string()), 2, ) .await @@ -596,7 +599,9 @@ mod tests { ); assert_uploaded_objects( - object_store.get_log_store(table_uuid).object_store(), + object_store + .get_log_store(&table_uuid.to_string()) + .object_store(), vec![ Path::from(PART_0_FILE_NAME.to_string()), Path::from(PART_1_FILE_NAME.to_string()), diff --git a/src/context/logical.rs b/src/context/logical.rs index 1eae9f79..f44147e6 100644 --- a/src/context/logical.rs +++ b/src/context/logical.rs @@ -324,7 +324,9 @@ impl SeafowlContext { // We only support datetime DeltaTable version specification for start let table_uuid = self.get_table_uuid(resolved_ref.clone()).await?; - let table_log_store = self.internal_object_store.get_log_store(table_uuid); + let table_log_store = self + .internal_object_store + .get_log_store(&table_uuid.to_string()); let datetime = TableVersionProcessor::version_to_datetime(version)?; let mut delta_table = DeltaTable::new(table_log_store, Default::default()); diff --git a/src/context/mod.rs b/src/context/mod.rs index b9fa2815..19f50d8e 100644 --- a/src/context/mod.rs +++ b/src/context/mod.rs @@ -2,7 +2,7 @@ pub mod delta; pub mod logical; pub mod physical; -use crate::catalog::Metastore; +use crate::catalog::metastore::Metastore; use crate::catalog::{DEFAULT_SCHEMA, STAGING_SCHEMA}; use crate::config::context::build_state_with_table_factories; use crate::object_store::wrapped::InternalObjectStore; diff --git a/src/context/physical.rs b/src/context/physical.rs index 4c996c82..c0f3deb2 100644 --- a/src/context/physical.rs +++ b/src/context/physical.rs @@ -329,7 +329,8 @@ impl SeafowlContext { ); // Write the new files with updated data - let table_prefix = self.internal_object_store.table_prefix(uuid); + let table_prefix = + self.internal_object_store.table_prefix(&uuid.to_string()); let adds = plan_to_object_store( &state, &update_plan, @@ -450,8 +451,9 @@ impl SeafowlContext { Arc::new(FilterExec::try_new(filter_expr, base_scan)?); // Write the filtered out data - let table_prefix = - self.internal_object_store.table_prefix(uuid); + let table_prefix = self + .internal_object_store + .table_prefix(&uuid.to_string()); let adds = plan_to_object_store( &state, &filter_plan, diff --git a/src/frontend/http.rs b/src/frontend/http.rs index 65939903..5ccd397d 100644 --- a/src/frontend/http.rs +++ b/src/frontend/http.rs @@ -1011,7 +1011,7 @@ pub mod tests { assert_eq!(resp.status(), StatusCode::BAD_REQUEST); assert_eq!( resp.body(), - "Error during planning: Database \"missing_db\" doesn't exist" + "Error during planning: Catalog \"missing_db\" doesn't exist" ); } diff --git a/src/object_store/wrapped.rs b/src/object_store/wrapped.rs index 8ec89bae..b8fdb73e 100644 --- a/src/object_store/wrapped.rs +++ b/src/object_store/wrapped.rs @@ -19,7 +19,6 @@ use std::path::Path as StdPath; use std::str::FromStr; use std::sync::Arc; use url::Url; -use uuid::Uuid; /// Wrapper around the object_store crate that holds on to the original config /// in order to provide a more efficient "upload" for the local object store (since it's @@ -82,7 +81,7 @@ impl InternalObjectStore { // Get the table prefix relative to the root of the internal object store. // This is either just a UUID, or potentially UUID prepended by some path. - pub fn table_prefix(&self, table_uuid: Uuid) -> Path { + pub fn table_prefix(&self, table_prefix: &str) -> Path { match self.config.clone() { schema::ObjectStore::S3(S3 { prefix: Some(prefix), @@ -91,8 +90,8 @@ impl InternalObjectStore { | schema::ObjectStore::GCS(GCS { prefix: Some(prefix), .. - }) => Path::from(format!("{prefix}/{table_uuid}")), - _ => Path::from(table_uuid.to_string()), + }) => Path::from(format!("{prefix}/{table_prefix}")), + _ => Path::from(table_prefix), } } @@ -103,15 +102,15 @@ impl InternalObjectStore { // 2. We want to override `rename_if_not_exists` for AWS S3 // This means we have 2 layers of indirection before we hit the "real" object store: // (Delta `LogStore` -> `PrefixStore` -> `InternalObjectStore` -> `inner`). - pub fn get_log_store(&self, table_uuid: Uuid) -> Arc { - let prefix = self.table_prefix(table_uuid); + pub fn get_log_store(&self, table_prefix: &str) -> Arc { + let prefix = self.table_prefix(table_prefix); let prefixed_store: PrefixStore = PrefixStore::new(self.clone(), prefix); Arc::from(DefaultLogStore::new( Arc::from(prefixed_store), LogStoreConfig { - location: self.root_uri.join(&table_uuid.to_string()).unwrap(), + location: self.root_uri.join(table_prefix).unwrap(), options: Default::default(), }, )) @@ -291,23 +290,22 @@ impl ObjectStore for InternalObjectStore { mod tests { use crate::config::context::build_object_store; use crate::config::schema::{ObjectStore, S3}; - use crate::frontend::http::tests::deterministic_uuid; use crate::object_store::wrapped::InternalObjectStore; use datafusion::common::Result; use deltalake::logstore::LogStore; use rstest::rstest; #[rstest] - #[case::bucket_root("test-bucket", None, "01020304-0506-4708-890a-0b0c0d0e0f10")] + #[case::bucket_root("test-bucket", None, "6bb9913e-0341-446d-bb58-b865803ce0ff")] #[case::path_no_delimiter( "test-bucket", Some("some/path/no/delimiter"), - "some/path/no/delimiter/01020304-0506-4708-890a-0b0c0d0e0f10" + "some/path/no/delimiter/6bb9913e-0341-446d-bb58-b865803ce0ff" )] #[case::path_with_delimiter( "test-bucket", Some("some/path/with/delimiter/"), - "some/path/with/delimiter/01020304-0506-4708-890a-0b0c0d0e0f10" + "some/path/with/delimiter/6bb9913e-0341-446d-bb58-b865803ce0ff" )] #[test] fn test_table_location_s3( @@ -333,7 +331,7 @@ mod tests { let store = InternalObjectStore::new(inner_store, config); - let uuid = deterministic_uuid(); + let uuid = "6bb9913e-0341-446d-bb58-b865803ce0ff"; let prefix = store.table_prefix(uuid); let uri = store.get_log_store(uuid).root_uri(); diff --git a/src/utils.rs b/src/utils.rs index b75479d7..cfa560c6 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -65,7 +65,9 @@ pub async fn gc_databases(context: &SeafowlContext, database_name: Option, + ) -> Result, Status> { + let catalog = request.into_inner().catalog_name; + if self.catalog == catalog { + Ok(Response::new(self.schemas.clone())) + } else { + Err(Status::not_found(format!( + "Catalog {catalog} does not exist", + ))) + } + } +} + +async fn start_clade_server() -> Arc { + // let OS choose a a free port + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + let config_text = format!( + r#" +[object_store] +type = "local" +data_dir = "tests/data" + +[catalog] +type = "clade" +dsn = "http://{addr}""#, + ); + + let config = load_config_from_string(&config_text, false, None).unwrap(); + let context = Arc::from(build_context(&config).await.unwrap()); + + let clade = run_clade_server(addr); + tokio::task::spawn(clade); + + context +} + +async fn run_clade_server(addr: SocketAddr) { + let metastore = TestCladeMetastore { + catalog: DEFAULT_DB.to_string(), + schemas: ListSchemaResponse { + schemas: vec![SchemaObject { + name: "some_schema".to_string(), + tables: vec![TableObject { + name: "some_table".to_string(), + location: "delta-0.8.0-partitioned".to_string(), + }], + }], + }, + }; + + let svc = SchemaStoreServiceServer::new(metastore); + + let reflection = tonic_reflection::server::Builder::configure() + .register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET) + .build() + .unwrap(); + + Server::builder() + .add_service(svc) + .add_service(reflection) + .serve(addr) + .await + .unwrap(); +} diff --git a/tests/clade/query.rs b/tests/clade/query.rs new file mode 100644 index 00000000..2fb4116e --- /dev/null +++ b/tests/clade/query.rs @@ -0,0 +1,34 @@ +use crate::clade::*; + +#[tokio::test] +async fn test_basic_select() -> Result<(), Box> { + let context = start_clade_server().await; + + // Before proceeding with the test swallow up a single initial + // ConnectError("tcp connect error", Os { code: 61, kind: ConnectionRefused, message: "Connection refused" }) + // TODO: why does this happen? + let _r = context.metastore.schemas.list(DEFAULT_DB).await; + + let plan = context + .plan_query("SELECT * FROM some_schema.some_table ORDER BY value") + .await + .unwrap(); + let results = context.collect(plan).await.unwrap(); + + let expected = [ + "+-------+------+-------+-----+", + "| value | year | month | day |", + "+-------+------+-------+-----+", + "| 1 | 2020 | 1 | 1 |", + "| 2 | 2020 | 2 | 3 |", + "| 3 | 2020 | 2 | 5 |", + "| 4 | 2021 | 4 | 5 |", + "| 5 | 2021 | 12 | 4 |", + "| 6 | 2021 | 12 | 20 |", + "| 7 | 2021 | 12 | 20 |", + "+-------+------+-------+-----+", + ]; + assert_batches_eq!(expected, &results); + + Ok(()) +} diff --git a/tests/main.rs b/tests/main.rs index c746400c..e94fd2cc 100644 --- a/tests/main.rs +++ b/tests/main.rs @@ -1,6 +1,7 @@ // Single main.rs for all integration tests // https://endler.dev/2020/rust-compile-times/#combine-all-integration-tests-in-a-single-binary +mod clade; mod cli; mod flight; mod http; diff --git a/tests/statements/convert.rs b/tests/statements/convert.rs index 26d03664..1a3a1e6b 100644 --- a/tests/statements/convert.rs +++ b/tests/statements/convert.rs @@ -60,7 +60,7 @@ async fn test_convert_from_flat_parquet_table() -> Result<()> { testutils::assert_uploaded_objects( context .internal_object_store - .get_log_store(table_uuid) + .get_log_store(&table_uuid.to_string()) .object_store(), vec![ Path::from("_delta_log/00000000000000000000.json"), diff --git a/tests/statements/ddl.rs b/tests/statements/ddl.rs index 81e4921e..95c78873 100644 --- a/tests/statements/ddl.rs +++ b/tests/statements/ddl.rs @@ -354,7 +354,7 @@ async fn test_create_table_drop_schema( testutils::assert_uploaded_objects( context .internal_object_store - .get_log_store(table_uuid) + .get_log_store(&table_uuid.to_string()) .object_store(), vec![], ) @@ -374,7 +374,7 @@ async fn test_create_table_drop_schema( testutils::assert_uploaded_objects( context .internal_object_store - .get_log_store(table_uuid) + .get_log_store(&table_uuid.to_string()) .object_store(), vec![ Path::from("_delta_log/00000000000000000000.json"), diff --git a/tests/statements/vacuum.rs b/tests/statements/vacuum.rs index c5eae09a..589f8380 100644 --- a/tests/statements/vacuum.rs +++ b/tests/statements/vacuum.rs @@ -59,7 +59,7 @@ async fn test_vacuum_table() -> Result<()> { testutils::assert_uploaded_objects( context .internal_object_store - .get_log_store(table_1_uuid) + .get_log_store(&table_1_uuid.to_string()) .object_store(), vec![ Path::from("_delta_log/00000000000000000000.json"), @@ -99,7 +99,7 @@ async fn test_vacuum_table() -> Result<()> { testutils::assert_uploaded_objects( context .internal_object_store - .get_log_store(table_1_uuid) + .get_log_store(&table_1_uuid.to_string()) .object_store(), vec![ Path::from("_delta_log/00000000000000000000.json"), @@ -148,7 +148,7 @@ async fn test_vacuum_table() -> Result<()> { testutils::assert_uploaded_objects( context .internal_object_store - .get_log_store(table_2_uuid) + .get_log_store(&table_2_uuid.to_string()) .object_store(), vec![ Path::from("_delta_log/00000000000000000000.json"),