Skip to content

Commit

Permalink
feat: cassandra external database (#2374)
Browse files Browse the repository at this point in the history
I realized I branched off `slt-ci` instead of main for this. So this
depends on #2363

closes #2346
  • Loading branch information
universalmind303 authored Jan 8, 2024
1 parent 44c28f8 commit 4b17c4a
Show file tree
Hide file tree
Showing 9 changed files with 246 additions and 18 deletions.
2 changes: 2 additions & 0 deletions crates/datasources/src/cassandra/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ pub enum CassandraError {
UnsupportedDataType(String),
#[error("Table not found: {0}")]
TableNotFound(String),
#[error("{0}")]
String(String),
}

pub type Result<T, E = CassandraError> = std::result::Result<T, E>;
125 changes: 123 additions & 2 deletions crates/datasources/src/cassandra/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
SendableRecordBatchStream, Statistics,
};
use datafusion_ext::errors::ExtensionError;
use datafusion_ext::functions::VirtualLister;
use datafusion_ext::metrics::DataSourceMetricsStreamAdapter;
pub use errors::*;
use futures::Stream;
Expand All @@ -42,6 +44,22 @@ use std::time::Duration;
use self::exec::CassandraExec;

pub struct CassandraAccess {
host: String,
}
impl CassandraAccess {
pub fn new(host: String) -> Self {
Self { host }
}
pub async fn validate_access(&self) -> Result<()> {
let _access = CassandraAccessState::try_new(&self.host).await?;

Ok(())
}
pub async fn connect(&self) -> Result<CassandraAccessState> {
CassandraAccessState::try_new(&self.host).await
}
}
pub struct CassandraAccessState {
session: Session,
}

Expand All @@ -66,8 +84,36 @@ fn try_convert_dtype(ty: &ColumnType) -> Result<DataType> {
_ => return Err(CassandraError::UnsupportedDataType(format!("{:?}", ty))),
})
}
fn try_convert_dtype_string(ty: &str) -> Result<DataType> {
Ok(match ty {
"custom" => return Err(CassandraError::UnsupportedDataType(ty.to_string())),
"ascii" => DataType::Utf8,
"date" => DataType::Date64,
"double" => DataType::Float64,
"duration" => DataType::Duration(TimeUnit::Nanosecond),
"float" => DataType::Float32,
"int" => DataType::Int32,
"text" => DataType::Utf8,
"timestamp" => DataType::Timestamp(TimeUnit::Millisecond, None),
"smallint" => DataType::Int16,
"tinyint" => DataType::Int8,
"uuid" => DataType::Utf8,
"bigint" => DataType::Int64,
// list<T>
lst if lst.contains('<') => {
// get the inner type from "list<{inner}>"
let inner = lst.split('<').nth(1).and_then(|s| s.split('>').next());

impl CassandraAccess {
match inner {
Some(inner) => DataType::new_list(try_convert_dtype_string(inner)?, true),
None => return Err(CassandraError::UnsupportedDataType(ty.to_string())),
}
}
_ => return Err(CassandraError::UnsupportedDataType(ty.to_string())),
})
}

impl CassandraAccessState {
pub async fn try_new(host: impl AsRef<str>) -> Result<Self> {
let session = SessionBuilder::new()
.known_node(host)
Expand Down Expand Up @@ -114,7 +160,7 @@ pub struct CassandraTableProvider {

impl CassandraTableProvider {
pub async fn try_new(host: String, ks: String, table: String) -> Result<Self> {
let access = CassandraAccess::try_new(host).await?;
let access = CassandraAccessState::try_new(host).await?;
let schema = access.get_schema(&ks, &table).await?;
Ok(Self {
schema: Arc::new(schema),
Expand Down Expand Up @@ -189,3 +235,78 @@ impl TableProvider for CassandraTableProvider {
))
}
}

#[async_trait]
impl VirtualLister for CassandraAccessState {
/// List schemas for a data source.
async fn list_schemas(&self) -> Result<Vec<String>, ExtensionError> {
let query = "SELECT keyspace_name FROM system_schema.keyspaces";
self.session
.query(query, &[])
.await
.map_err(CassandraError::from)
.and_then(|res| {
res.rows_or_empty()
.into_iter()
.map(|row| match row.columns[0] {
Some(CqlValue::Text(ref s)) => Ok(s.clone()),
_ => Err(CassandraError::String("invalid schema".to_string())),
})
.collect::<Result<Vec<String>>>()
})
.map_err(ExtensionError::access)
}

/// List tables for a data source.
async fn list_tables(&self, schema: &str) -> Result<Vec<String>, ExtensionError> {
let query = format!(
"SELECT table_name FROM system_schema.tables WHERE keyspace_name = '{}'",
schema
);
self.session
.query(query, &[])
.await
.map_err(CassandraError::from)
.and_then(|res| {
res.rows_or_empty()
.into_iter()
.map(|row| match row.columns[0] {
Some(CqlValue::Text(ref s)) => Ok(s.clone()),
_ => Err(CassandraError::String("invalid table".to_string())),
})
.collect::<Result<Vec<String>>>()
})
.map_err(ExtensionError::access)
}

/// List columns for a specific table in the datasource.
async fn list_columns(&self, schema: &str, table: &str) -> Result<Fields, ExtensionError> {
let query = format!(
"SELECT column_name, type FROM system_schema.columns WHERE keyspace_name = '{}' AND table_name = '{}'",
schema, table
);
self.session
.query(query, &[])
.await
.map_err(CassandraError::from)
.and_then(|res| {
res.rows_or_empty()
.into_iter()
.map(|row| {
let name = match row.columns[0] {
Some(CqlValue::Text(ref s)) => s.clone(),
_ => return Err(CassandraError::String("invalid column".to_string())),
};
let ty = match row.columns[1] {
Some(CqlValue::Text(ref s)) => s.clone(),
_ => return Err(CassandraError::String("invalid column".to_string())),
};
let dtype = try_convert_dtype_string(&ty)?;

Ok(Field::new(name, dtype, true))
})
.collect::<Result<_>>()
})
.map_err(ExtensionError::access)
}
}
5 changes: 5 additions & 0 deletions crates/protogen/proto/metastore/options.proto
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ message DatabaseOptions {
DatabaseOptionsDeltaLake delta = 8;
DatabaseOptionsSqlServer sql_server = 9;
DatabaseOptionsClickhouse clickhouse = 10;
DatabaseOptionsCassandra cassandra = 11;
}
// next: 11
}
Expand Down Expand Up @@ -80,6 +81,10 @@ message DatabaseOptionsClickhouse {
string connection_string = 1;
}

message DatabaseOptionsCassandra {
string host = 1;
}

message DatabaseOptionsSnowflake {
string account_name = 1;
string login_name = 2;
Expand Down
26 changes: 26 additions & 0 deletions crates/protogen/src/metastore/types/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ pub enum DatabaseOptions {
Delta(DatabaseOptionsDeltaLake),
SqlServer(DatabaseOptionsSqlServer),
Clickhouse(DatabaseOptionsClickhouse),
Cassandra(DatabaseOptionsCassandra),
}

impl DatabaseOptions {
Expand All @@ -106,6 +107,7 @@ impl DatabaseOptions {
pub const DELTA: &'static str = "delta";
pub const SQL_SERVER: &'static str = "sql_server";
pub const CLICKHOUSE: &'static str = "clickhouse";
pub const CASSANDRA: &'static str = "cassandra";

pub fn as_str(&self) -> &'static str {
match self {
Expand All @@ -119,6 +121,7 @@ impl DatabaseOptions {
DatabaseOptions::Delta(_) => Self::DELTA,
DatabaseOptions::SqlServer(_) => Self::SQL_SERVER,
DatabaseOptions::Clickhouse(_) => Self::CLICKHOUSE,
DatabaseOptions::Cassandra(_) => Self::CASSANDRA,
}
}
}
Expand Down Expand Up @@ -157,6 +160,9 @@ impl TryFrom<options::database_options::Options> for DatabaseOptions {
options::database_options::Options::Clickhouse(v) => {
DatabaseOptions::Clickhouse(v.try_into()?)
}
options::database_options::Options::Cassandra(v) => {
DatabaseOptions::Cassandra(v.try_into()?)
}
})
}
}
Expand Down Expand Up @@ -187,6 +193,9 @@ impl From<DatabaseOptions> for options::database_options::Options {
DatabaseOptions::Clickhouse(v) => {
options::database_options::Options::Clickhouse(v.into())
}
DatabaseOptions::Cassandra(v) => {
options::database_options::Options::Cassandra(v.into())
}
}
}
}
Expand Down Expand Up @@ -365,6 +374,23 @@ impl From<DatabaseOptionsClickhouse> for options::DatabaseOptionsClickhouse {
}
}
}
#[derive(Debug, Clone, Arbitrary, PartialEq, Eq, Hash)]
pub struct DatabaseOptionsCassandra {
pub host: String,
}

impl TryFrom<options::DatabaseOptionsCassandra> for DatabaseOptionsCassandra {
type Error = ProtoConvError;
fn try_from(value: options::DatabaseOptionsCassandra) -> Result<Self, Self::Error> {
Ok(DatabaseOptionsCassandra { host: value.host })
}
}

impl From<DatabaseOptionsCassandra> for options::DatabaseOptionsCassandra {
fn from(value: DatabaseOptionsCassandra) -> Self {
options::DatabaseOptionsCassandra { host: value.host }
}
}

#[derive(Debug, Clone, Arbitrary, PartialEq, Eq, Hash)]
pub struct DatabaseOptionsSnowflake {
Expand Down
14 changes: 11 additions & 3 deletions crates/sqlbuiltins/src/functions/table/virtual_listing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use datafusion_ext::functions::{
FuncParamValue, IdentValue, TableFuncContextProvider, VirtualLister,
};
use datasources::bigquery::BigQueryAccessor;
use datasources::cassandra::CassandraAccess;
use datasources::clickhouse::ClickhouseAccess;
use datasources::debug::DebugVirtualLister;
use datasources::mongodb::MongoDbAccessor;
Expand All @@ -21,9 +22,9 @@ use datasources::snowflake::{SnowflakeAccessor, SnowflakeDbConnection};
use datasources::sqlserver::SqlServerAccess;
use protogen::metastore::types::catalog::{FunctionType, RuntimePreference};
use protogen::metastore::types::options::{
DatabaseOptions, DatabaseOptionsBigQuery, DatabaseOptionsClickhouse, DatabaseOptionsMongoDb,
DatabaseOptionsMysql, DatabaseOptionsPostgres, DatabaseOptionsSnowflake,
DatabaseOptionsSqlServer,
DatabaseOptions, DatabaseOptionsBigQuery, DatabaseOptionsCassandra, DatabaseOptionsClickhouse,
DatabaseOptionsMongoDb, DatabaseOptionsMysql, DatabaseOptionsPostgres,
DatabaseOptionsSnowflake, DatabaseOptionsSqlServer,
};

use super::TableFunc;
Expand Down Expand Up @@ -350,6 +351,13 @@ pub(crate) async fn get_virtual_lister_for_external_db(
.map_err(ExtensionError::access)?;
Box::new(state)
}
DatabaseOptions::Cassandra(DatabaseOptionsCassandra { host }) => {
let state = CassandraAccess::new(host.to_string())
.connect()
.await
.map_err(ExtensionError::access)?;
Box::new(state)
}
DatabaseOptions::Delta(_) => {
return Err(ExtensionError::Unimplemented(
"deltalake information listing",
Expand Down
13 changes: 11 additions & 2 deletions crates/sqlexec/src/dispatch/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ use datasources::sqlserver::{
};
use protogen::metastore::types::catalog::{CatalogEntry, DatabaseEntry, FunctionEntry, TableEntry};
use protogen::metastore::types::options::{
DatabaseOptions, DatabaseOptionsBigQuery, DatabaseOptionsClickhouse, DatabaseOptionsDebug,
DatabaseOptionsDeltaLake, DatabaseOptionsMongoDb, DatabaseOptionsMysql,
DatabaseOptions, DatabaseOptionsBigQuery, DatabaseOptionsCassandra, DatabaseOptionsClickhouse,
DatabaseOptionsDebug, DatabaseOptionsDeltaLake, DatabaseOptionsMongoDb, DatabaseOptionsMysql,
DatabaseOptionsPostgres, DatabaseOptionsSnowflake, DatabaseOptionsSqlServer, TableOptions,
TableOptionsBigQuery, TableOptionsCassandra, TableOptionsClickhouse, TableOptionsDebug,
TableOptionsGcs, TableOptionsInternal, TableOptionsLocal, TableOptionsMongoDb,
Expand Down Expand Up @@ -229,6 +229,15 @@ impl<'a> ExternalDispatcher<'a> {
let table = ClickhouseTableProvider::try_new(access, table_ref).await?;
Ok(Arc::new(table))
}
DatabaseOptions::Cassandra(DatabaseOptionsCassandra { host }) => {
let table = CassandraTableProvider::try_new(
host.clone(),
schema.to_string(),
name.to_string(),
)
.await?;
Ok(Arc::new(table))
}
}
}

Expand Down
29 changes: 19 additions & 10 deletions crates/sqlexec/src/planner/session_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use datafusion::sql::TableReference;
use datafusion_ext::planner::SqlQueryPlanner;
use datafusion_ext::AsyncContextProvider;
use datasources::bigquery::{BigQueryAccessor, BigQueryTableAccess};
use datasources::cassandra::CassandraAccess;
use datasources::cassandra::{CassandraAccess, CassandraAccessState};
use datasources::clickhouse::{ClickhouseAccess, ClickhouseTableRef};
use datasources::common::ssh::{key::SshKey, SshConnection, SshConnectionParameters};
use datasources::common::url::{DatasourceUrl, DatasourceUrlType};
Expand Down Expand Up @@ -43,14 +43,15 @@ use protogen::metastore::types::options::{
CopyToDestinationOptionsLocal, CopyToDestinationOptionsS3, CopyToFormatOptions,
CopyToFormatOptionsCsv, CopyToFormatOptionsJson, CopyToFormatOptionsParquet,
CredentialsOptions, CredentialsOptionsAws, CredentialsOptionsAzure, CredentialsOptionsDebug,
CredentialsOptionsGcp, DatabaseOptions, DatabaseOptionsBigQuery, DatabaseOptionsClickhouse,
DatabaseOptionsDebug, DatabaseOptionsDeltaLake, DatabaseOptionsMongoDb, DatabaseOptionsMysql,
DatabaseOptionsPostgres, DatabaseOptionsSnowflake, DatabaseOptionsSqlServer, DeltaLakeCatalog,
DeltaLakeUnityCatalog, StorageOptions, TableOptions, TableOptionsBigQuery,
TableOptionsCassandra, TableOptionsClickhouse, TableOptionsDebug, TableOptionsGcs,
TableOptionsLocal, TableOptionsMongoDb, TableOptionsMysql, TableOptionsObjectStore,
TableOptionsPostgres, TableOptionsS3, TableOptionsSnowflake, TableOptionsSqlServer,
TunnelOptions, TunnelOptionsDebug, TunnelOptionsInternal, TunnelOptionsSsh,
CredentialsOptionsGcp, DatabaseOptions, DatabaseOptionsBigQuery, DatabaseOptionsCassandra,
DatabaseOptionsClickhouse, DatabaseOptionsDebug, DatabaseOptionsDeltaLake,
DatabaseOptionsMongoDb, DatabaseOptionsMysql, DatabaseOptionsPostgres,
DatabaseOptionsSnowflake, DatabaseOptionsSqlServer, DeltaLakeCatalog, DeltaLakeUnityCatalog,
StorageOptions, TableOptions, TableOptionsBigQuery, TableOptionsCassandra,
TableOptionsClickhouse, TableOptionsDebug, TableOptionsGcs, TableOptionsLocal,
TableOptionsMongoDb, TableOptionsMysql, TableOptionsObjectStore, TableOptionsPostgres,
TableOptionsS3, TableOptionsSnowflake, TableOptionsSqlServer, TunnelOptions,
TunnelOptionsDebug, TunnelOptionsInternal, TunnelOptionsSsh,
};
use protogen::metastore::types::service::{AlterDatabaseOperation, AlterTableOperation};
use sqlbuiltins::builtins::{CURRENT_SESSION_SCHEMA, DEFAULT_CATALOG};
Expand Down Expand Up @@ -318,6 +319,14 @@ impl<'a> SessionPlanner<'a> {

DatabaseOptions::Clickhouse(DatabaseOptionsClickhouse { connection_string })
}
DatabaseOptions::CASSANDRA => {
let host: String = m.remove_required("host")?;

let access = CassandraAccess::new(host.clone());
access.validate_access().await?;

DatabaseOptions::Cassandra(DatabaseOptionsCassandra { host })
}
DatabaseOptions::DEBUG => {
datasources::debug::validate_tunnel_connections(tunnel_options.as_ref())?;
DatabaseOptions::Debug(DatabaseOptionsDebug {})
Expand Down Expand Up @@ -538,7 +547,7 @@ impl<'a> SessionPlanner<'a> {
let host: String = m.remove_required("host")?;
let keyspace: String = m.remove_required("keyspace")?;
let table: String = m.remove_required("table")?;
let access = CassandraAccess::try_new(host.clone()).await?;
let access = CassandraAccessState::try_new(host.clone()).await?;
access.validate_table_access(&keyspace, &table).await?;

TableOptions::Cassandra(TableOptionsCassandra {
Expand Down
Loading

0 comments on commit 4b17c4a

Please sign in to comment.