diff --git a/crates/datasources/src/cassandra/errors.rs b/crates/datasources/src/cassandra/errors.rs index 5c9123291..e3da0c3c7 100644 --- a/crates/datasources/src/cassandra/errors.rs +++ b/crates/datasources/src/cassandra/errors.rs @@ -8,6 +8,8 @@ pub enum CassandraError { UnsupportedDataType(String), #[error("Table not found: {0}")] TableNotFound(String), + #[error("{0}")] + String(String), } pub type Result = std::result::Result; diff --git a/crates/datasources/src/cassandra/mod.rs b/crates/datasources/src/cassandra/mod.rs index caa604e46..61a4e4234 100644 --- a/crates/datasources/src/cassandra/mod.rs +++ b/crates/datasources/src/cassandra/mod.rs @@ -23,6 +23,8 @@ use datafusion::physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics, }; +use datafusion_ext::errors::ExtensionError; +use datafusion_ext::functions::VirtualLister; use datafusion_ext::metrics::DataSourceMetricsStreamAdapter; pub use errors::*; use futures::Stream; @@ -42,6 +44,22 @@ use std::time::Duration; use self::exec::CassandraExec; pub struct CassandraAccess { + host: String, +} +impl CassandraAccess { + pub fn new(host: String) -> Self { + Self { host } + } + pub async fn validate_access(&self) -> Result<()> { + let _access = CassandraAccessState::try_new(&self.host).await?; + + Ok(()) + } + pub async fn connect(&self) -> Result { + CassandraAccessState::try_new(&self.host).await + } +} +pub struct CassandraAccessState { session: Session, } @@ -66,8 +84,36 @@ fn try_convert_dtype(ty: &ColumnType) -> Result { _ => return Err(CassandraError::UnsupportedDataType(format!("{:?}", ty))), }) } +fn try_convert_dtype_string(ty: &str) -> Result { + Ok(match ty { + "custom" => return Err(CassandraError::UnsupportedDataType(ty.to_string())), + "ascii" => DataType::Utf8, + "date" => DataType::Date64, + "double" => DataType::Float64, + "duration" => DataType::Duration(TimeUnit::Nanosecond), + "float" => DataType::Float32, + "int" => DataType::Int32, + "text" => DataType::Utf8, + "timestamp" => DataType::Timestamp(TimeUnit::Millisecond, None), + "smallint" => DataType::Int16, + "tinyint" => DataType::Int8, + "uuid" => DataType::Utf8, + "bigint" => DataType::Int64, + // list + lst if lst.contains('<') => { + // get the inner type from "list<{inner}>" + let inner = lst.split('<').nth(1).and_then(|s| s.split('>').next()); -impl CassandraAccess { + match inner { + Some(inner) => DataType::new_list(try_convert_dtype_string(inner)?, true), + None => return Err(CassandraError::UnsupportedDataType(ty.to_string())), + } + } + _ => return Err(CassandraError::UnsupportedDataType(ty.to_string())), + }) +} + +impl CassandraAccessState { pub async fn try_new(host: impl AsRef) -> Result { let session = SessionBuilder::new() .known_node(host) @@ -114,7 +160,7 @@ pub struct CassandraTableProvider { impl CassandraTableProvider { pub async fn try_new(host: String, ks: String, table: String) -> Result { - let access = CassandraAccess::try_new(host).await?; + let access = CassandraAccessState::try_new(host).await?; let schema = access.get_schema(&ks, &table).await?; Ok(Self { schema: Arc::new(schema), @@ -189,3 +235,78 @@ impl TableProvider for CassandraTableProvider { )) } } + +#[async_trait] +impl VirtualLister for CassandraAccessState { + /// List schemas for a data source. + async fn list_schemas(&self) -> Result, ExtensionError> { + let query = "SELECT keyspace_name FROM system_schema.keyspaces"; + self.session + .query(query, &[]) + .await + .map_err(CassandraError::from) + .and_then(|res| { + res.rows_or_empty() + .into_iter() + .map(|row| match row.columns[0] { + Some(CqlValue::Text(ref s)) => Ok(s.clone()), + _ => Err(CassandraError::String("invalid schema".to_string())), + }) + .collect::>>() + }) + .map_err(ExtensionError::access) + } + + /// List tables for a data source. + async fn list_tables(&self, schema: &str) -> Result, ExtensionError> { + let query = format!( + "SELECT table_name FROM system_schema.tables WHERE keyspace_name = '{}'", + schema + ); + self.session + .query(query, &[]) + .await + .map_err(CassandraError::from) + .and_then(|res| { + res.rows_or_empty() + .into_iter() + .map(|row| match row.columns[0] { + Some(CqlValue::Text(ref s)) => Ok(s.clone()), + _ => Err(CassandraError::String("invalid table".to_string())), + }) + .collect::>>() + }) + .map_err(ExtensionError::access) + } + + /// List columns for a specific table in the datasource. + async fn list_columns(&self, schema: &str, table: &str) -> Result { + let query = format!( + "SELECT column_name, type FROM system_schema.columns WHERE keyspace_name = '{}' AND table_name = '{}'", + schema, table + ); + self.session + .query(query, &[]) + .await + .map_err(CassandraError::from) + .and_then(|res| { + res.rows_or_empty() + .into_iter() + .map(|row| { + let name = match row.columns[0] { + Some(CqlValue::Text(ref s)) => s.clone(), + _ => return Err(CassandraError::String("invalid column".to_string())), + }; + let ty = match row.columns[1] { + Some(CqlValue::Text(ref s)) => s.clone(), + _ => return Err(CassandraError::String("invalid column".to_string())), + }; + let dtype = try_convert_dtype_string(&ty)?; + + Ok(Field::new(name, dtype, true)) + }) + .collect::>() + }) + .map_err(ExtensionError::access) + } +} diff --git a/crates/protogen/proto/metastore/options.proto b/crates/protogen/proto/metastore/options.proto index 02f0cdb78..2a5376665 100644 --- a/crates/protogen/proto/metastore/options.proto +++ b/crates/protogen/proto/metastore/options.proto @@ -47,6 +47,7 @@ message DatabaseOptions { DatabaseOptionsDeltaLake delta = 8; DatabaseOptionsSqlServer sql_server = 9; DatabaseOptionsClickhouse clickhouse = 10; + DatabaseOptionsCassandra cassandra = 11; } // next: 11 } @@ -80,6 +81,10 @@ message DatabaseOptionsClickhouse { string connection_string = 1; } +message DatabaseOptionsCassandra { + string host = 1; +} + message DatabaseOptionsSnowflake { string account_name = 1; string login_name = 2; diff --git a/crates/protogen/src/metastore/types/options.rs b/crates/protogen/src/metastore/types/options.rs index 7cc781c53..0ee7907a3 100644 --- a/crates/protogen/src/metastore/types/options.rs +++ b/crates/protogen/src/metastore/types/options.rs @@ -93,6 +93,7 @@ pub enum DatabaseOptions { Delta(DatabaseOptionsDeltaLake), SqlServer(DatabaseOptionsSqlServer), Clickhouse(DatabaseOptionsClickhouse), + Cassandra(DatabaseOptionsCassandra), } impl DatabaseOptions { @@ -106,6 +107,7 @@ impl DatabaseOptions { pub const DELTA: &'static str = "delta"; pub const SQL_SERVER: &'static str = "sql_server"; pub const CLICKHOUSE: &'static str = "clickhouse"; + pub const CASSANDRA: &'static str = "cassandra"; pub fn as_str(&self) -> &'static str { match self { @@ -119,6 +121,7 @@ impl DatabaseOptions { DatabaseOptions::Delta(_) => Self::DELTA, DatabaseOptions::SqlServer(_) => Self::SQL_SERVER, DatabaseOptions::Clickhouse(_) => Self::CLICKHOUSE, + DatabaseOptions::Cassandra(_) => Self::CASSANDRA, } } } @@ -157,6 +160,9 @@ impl TryFrom for DatabaseOptions { options::database_options::Options::Clickhouse(v) => { DatabaseOptions::Clickhouse(v.try_into()?) } + options::database_options::Options::Cassandra(v) => { + DatabaseOptions::Cassandra(v.try_into()?) + } }) } } @@ -187,6 +193,9 @@ impl From for options::database_options::Options { DatabaseOptions::Clickhouse(v) => { options::database_options::Options::Clickhouse(v.into()) } + DatabaseOptions::Cassandra(v) => { + options::database_options::Options::Cassandra(v.into()) + } } } } @@ -365,6 +374,23 @@ impl From for options::DatabaseOptionsClickhouse { } } } +#[derive(Debug, Clone, Arbitrary, PartialEq, Eq, Hash)] +pub struct DatabaseOptionsCassandra { + pub host: String, +} + +impl TryFrom for DatabaseOptionsCassandra { + type Error = ProtoConvError; + fn try_from(value: options::DatabaseOptionsCassandra) -> Result { + Ok(DatabaseOptionsCassandra { host: value.host }) + } +} + +impl From for options::DatabaseOptionsCassandra { + fn from(value: DatabaseOptionsCassandra) -> Self { + options::DatabaseOptionsCassandra { host: value.host } + } +} #[derive(Debug, Clone, Arbitrary, PartialEq, Eq, Hash)] pub struct DatabaseOptionsSnowflake { diff --git a/crates/sqlbuiltins/src/functions/table/virtual_listing.rs b/crates/sqlbuiltins/src/functions/table/virtual_listing.rs index 620b30ced..012137b12 100644 --- a/crates/sqlbuiltins/src/functions/table/virtual_listing.rs +++ b/crates/sqlbuiltins/src/functions/table/virtual_listing.rs @@ -12,6 +12,7 @@ use datafusion_ext::functions::{ FuncParamValue, IdentValue, TableFuncContextProvider, VirtualLister, }; use datasources::bigquery::BigQueryAccessor; +use datasources::cassandra::CassandraAccess; use datasources::clickhouse::ClickhouseAccess; use datasources::debug::DebugVirtualLister; use datasources::mongodb::MongoDbAccessor; @@ -21,9 +22,9 @@ use datasources::snowflake::{SnowflakeAccessor, SnowflakeDbConnection}; use datasources::sqlserver::SqlServerAccess; use protogen::metastore::types::catalog::{FunctionType, RuntimePreference}; use protogen::metastore::types::options::{ - DatabaseOptions, DatabaseOptionsBigQuery, DatabaseOptionsClickhouse, DatabaseOptionsMongoDb, - DatabaseOptionsMysql, DatabaseOptionsPostgres, DatabaseOptionsSnowflake, - DatabaseOptionsSqlServer, + DatabaseOptions, DatabaseOptionsBigQuery, DatabaseOptionsCassandra, DatabaseOptionsClickhouse, + DatabaseOptionsMongoDb, DatabaseOptionsMysql, DatabaseOptionsPostgres, + DatabaseOptionsSnowflake, DatabaseOptionsSqlServer, }; use super::TableFunc; @@ -350,6 +351,13 @@ pub(crate) async fn get_virtual_lister_for_external_db( .map_err(ExtensionError::access)?; Box::new(state) } + DatabaseOptions::Cassandra(DatabaseOptionsCassandra { host }) => { + let state = CassandraAccess::new(host.to_string()) + .connect() + .await + .map_err(ExtensionError::access)?; + Box::new(state) + } DatabaseOptions::Delta(_) => { return Err(ExtensionError::Unimplemented( "deltalake information listing", diff --git a/crates/sqlexec/src/dispatch/external.rs b/crates/sqlexec/src/dispatch/external.rs index f9724d086..fba270955 100644 --- a/crates/sqlexec/src/dispatch/external.rs +++ b/crates/sqlexec/src/dispatch/external.rs @@ -34,8 +34,8 @@ use datasources::sqlserver::{ }; use protogen::metastore::types::catalog::{CatalogEntry, DatabaseEntry, FunctionEntry, TableEntry}; use protogen::metastore::types::options::{ - DatabaseOptions, DatabaseOptionsBigQuery, DatabaseOptionsClickhouse, DatabaseOptionsDebug, - DatabaseOptionsDeltaLake, DatabaseOptionsMongoDb, DatabaseOptionsMysql, + DatabaseOptions, DatabaseOptionsBigQuery, DatabaseOptionsCassandra, DatabaseOptionsClickhouse, + DatabaseOptionsDebug, DatabaseOptionsDeltaLake, DatabaseOptionsMongoDb, DatabaseOptionsMysql, DatabaseOptionsPostgres, DatabaseOptionsSnowflake, DatabaseOptionsSqlServer, TableOptions, TableOptionsBigQuery, TableOptionsCassandra, TableOptionsClickhouse, TableOptionsDebug, TableOptionsGcs, TableOptionsInternal, TableOptionsLocal, TableOptionsMongoDb, @@ -229,6 +229,15 @@ impl<'a> ExternalDispatcher<'a> { let table = ClickhouseTableProvider::try_new(access, table_ref).await?; Ok(Arc::new(table)) } + DatabaseOptions::Cassandra(DatabaseOptionsCassandra { host }) => { + let table = CassandraTableProvider::try_new( + host.clone(), + schema.to_string(), + name.to_string(), + ) + .await?; + Ok(Arc::new(table)) + } } } diff --git a/crates/sqlexec/src/planner/session_planner.rs b/crates/sqlexec/src/planner/session_planner.rs index 53029bfba..4f73e8d0b 100644 --- a/crates/sqlexec/src/planner/session_planner.rs +++ b/crates/sqlexec/src/planner/session_planner.rs @@ -14,7 +14,7 @@ use datafusion::sql::TableReference; use datafusion_ext::planner::SqlQueryPlanner; use datafusion_ext::AsyncContextProvider; use datasources::bigquery::{BigQueryAccessor, BigQueryTableAccess}; -use datasources::cassandra::CassandraAccess; +use datasources::cassandra::{CassandraAccess, CassandraAccessState}; use datasources::clickhouse::{ClickhouseAccess, ClickhouseTableRef}; use datasources::common::ssh::{key::SshKey, SshConnection, SshConnectionParameters}; use datasources::common::url::{DatasourceUrl, DatasourceUrlType}; @@ -43,14 +43,15 @@ use protogen::metastore::types::options::{ CopyToDestinationOptionsLocal, CopyToDestinationOptionsS3, CopyToFormatOptions, CopyToFormatOptionsCsv, CopyToFormatOptionsJson, CopyToFormatOptionsParquet, CredentialsOptions, CredentialsOptionsAws, CredentialsOptionsAzure, CredentialsOptionsDebug, - CredentialsOptionsGcp, DatabaseOptions, DatabaseOptionsBigQuery, DatabaseOptionsClickhouse, - DatabaseOptionsDebug, DatabaseOptionsDeltaLake, DatabaseOptionsMongoDb, DatabaseOptionsMysql, - DatabaseOptionsPostgres, DatabaseOptionsSnowflake, DatabaseOptionsSqlServer, DeltaLakeCatalog, - DeltaLakeUnityCatalog, StorageOptions, TableOptions, TableOptionsBigQuery, - TableOptionsCassandra, TableOptionsClickhouse, TableOptionsDebug, TableOptionsGcs, - TableOptionsLocal, TableOptionsMongoDb, TableOptionsMysql, TableOptionsObjectStore, - TableOptionsPostgres, TableOptionsS3, TableOptionsSnowflake, TableOptionsSqlServer, - TunnelOptions, TunnelOptionsDebug, TunnelOptionsInternal, TunnelOptionsSsh, + CredentialsOptionsGcp, DatabaseOptions, DatabaseOptionsBigQuery, DatabaseOptionsCassandra, + DatabaseOptionsClickhouse, DatabaseOptionsDebug, DatabaseOptionsDeltaLake, + DatabaseOptionsMongoDb, DatabaseOptionsMysql, DatabaseOptionsPostgres, + DatabaseOptionsSnowflake, DatabaseOptionsSqlServer, DeltaLakeCatalog, DeltaLakeUnityCatalog, + StorageOptions, TableOptions, TableOptionsBigQuery, TableOptionsCassandra, + TableOptionsClickhouse, TableOptionsDebug, TableOptionsGcs, TableOptionsLocal, + TableOptionsMongoDb, TableOptionsMysql, TableOptionsObjectStore, TableOptionsPostgres, + TableOptionsS3, TableOptionsSnowflake, TableOptionsSqlServer, TunnelOptions, + TunnelOptionsDebug, TunnelOptionsInternal, TunnelOptionsSsh, }; use protogen::metastore::types::service::{AlterDatabaseOperation, AlterTableOperation}; use sqlbuiltins::builtins::{CURRENT_SESSION_SCHEMA, DEFAULT_CATALOG}; @@ -318,6 +319,14 @@ impl<'a> SessionPlanner<'a> { DatabaseOptions::Clickhouse(DatabaseOptionsClickhouse { connection_string }) } + DatabaseOptions::CASSANDRA => { + let host: String = m.remove_required("host")?; + + let access = CassandraAccess::new(host.clone()); + access.validate_access().await?; + + DatabaseOptions::Cassandra(DatabaseOptionsCassandra { host }) + } DatabaseOptions::DEBUG => { datasources::debug::validate_tunnel_connections(tunnel_options.as_ref())?; DatabaseOptions::Debug(DatabaseOptionsDebug {}) @@ -538,7 +547,7 @@ impl<'a> SessionPlanner<'a> { let host: String = m.remove_required("host")?; let keyspace: String = m.remove_required("keyspace")?; let table: String = m.remove_required("table")?; - let access = CassandraAccess::try_new(host.clone()).await?; + let access = CassandraAccessState::try_new(host.clone()).await?; access.validate_table_access(&keyspace, &table).await?; TableOptions::Cassandra(TableOptionsCassandra { diff --git a/testdata/sqllogictests_cassandra/external_database.slt b/testdata/sqllogictests_cassandra/external_database.slt new file mode 100644 index 000000000..546717068 --- /dev/null +++ b/testdata/sqllogictests_cassandra/external_database.slt @@ -0,0 +1,48 @@ +# Basic tests for external database. + +statement ok +CREATE EXTERNAL DATABASE EXTERNAL_DB + FROM CASSANDRA + OPTIONS ( + HOST = '127.0.0.1:9042', + ); + +query I +SELECT count(*) FROM external_db.test.bikeshare_stations; +---- +102 + +# Ensure we can query into the virtual schema. + +query T +SELECT * FROM list_schemas(external_db) WHERE schema_name = 'test'; +---- +test + +query T +SELECT table_name + FROM list_tables(external_db, test) + WHERE table_name = 'bikeshare_stations'; +---- +bikeshare_stations + +query TTT rowsort +SELECT column_name, data_type, nullable + FROM list_columns(external_db, test, bikeshare_stations) + WHERE data_type = 'Int32'; +---- +city_asset_number Int32 t +council_district Int32 t +footprint_length Int32 t +number_of_docks Int32 t +station_id Int32 t + +# TODO: Fix the error message: +halt + +# Try to query non-existent table. +statement error failed to find table: 'test.doesnotexist' +SELECT * FROM external_db.test.doesnotexist + +statement ok +DROP DATABASE external_db; diff --git a/testdata/sqllogictests_cassandra/external_table.slt b/testdata/sqllogictests_cassandra/external_table.slt index 42cc7697a..a9b85f2ae 100644 --- a/testdata/sqllogictests_cassandra/external_table.slt +++ b/testdata/sqllogictests_cassandra/external_table.slt @@ -4,7 +4,7 @@ statement ok CREATE EXTERNAL TABLE external_table FROM cassandra OPTIONS ( - host = '127.0.0.1:9042', + host = '${CASSANDRA_CONN_STRING}', keyspace = 'test', table = 'bikeshare_stations', );