Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: copy to support for lance #2342

Merged
merged 26 commits into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -220,9 +220,9 @@ DROP DATABASE my_pg;
| Newline Delimited JSON | ✅ | ✅\* | ✅ | ✅ | ➖ |
| Apache Parquet | ✅ | ✅\* | ✅ | ✅ | ➖ |
| BSON | ✅ | ✅\* | ✅ | ✅ | ➖ |
| Lance | ✅ | ✅\* | ✅ | ✅ | ➖ |
| Delta | ✅ | 🚧 | ✅ | ✅ | ➖ |
| Iceberg | ✅ | 🚧 | ✅ | ✅ | ➖ |
| Lance | ✅ | 🚧 | ✅ | ✅ | ➖ |
| Microsoft Excel | ✅ | 🚧 | ✅ | 🚧 | ➖ |
| JSON | 🚧 | 🚧 | 🚧 | 🚧 | ➖ |
| Apache Avro | 🚧 | 🚧 | 🚧 | 🚧 | ➖ |
Expand Down
119 changes: 119 additions & 0 deletions crates/datasources/src/common/sink/lance.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
use std::fmt;
use std::sync::Arc;

use async_trait::async_trait;
use datafusion::arrow::record_batch::RecordBatchIterator;
use datafusion::common::Result as DfResult;
use datafusion::error::DataFusionError;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::insert::DataSink;
use datafusion::physical_plan::DisplayAs;
use datafusion::physical_plan::{DisplayFormatType, SendableRecordBatchStream};
use futures::StreamExt;
use lance::dataset::WriteMode;
use lance::Dataset;
use object_store::{path::Path as ObjectPath, ObjectStore};

pub type LanceWriteParams = lance::dataset::WriteParams;

#[derive(Debug, Clone)]
pub struct LanceSinkOpts {
pub url: Option<url::Url>,
pub max_rows_per_file: usize,
pub max_rows_per_group: usize,
pub max_bytes_per_file: usize,
pub input_batch_size: usize,
}

/// Writes lance files to object storage.
#[derive(Debug, Clone)]
pub struct LanceSink {
store: Arc<dyn ObjectStore>,
loc: ObjectPath,
opts: LanceSinkOpts,
}

impl fmt::Display for LanceSink {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "LanceSink({}:{})", self.store, self.loc)
}
}

impl DisplayAs for LanceSink {
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
match t {
DisplayFormatType::Default => write!(f, "{self}"),
DisplayFormatType::Verbose => write!(f, "{self}"),
}
}
}

impl LanceSink {
pub fn from_obj_store(
store: Arc<dyn ObjectStore>,
loc: impl Into<ObjectPath>,
opts: LanceSinkOpts,
) -> Self {
LanceSink {
store,
loc: loc.into(),
opts,
}
}

async fn stream_into_inner(
&self,
stream: SendableRecordBatchStream,
mut ds: Option<Dataset>,
) -> DfResult<Option<Dataset>> {
tychoish marked this conversation as resolved.
Show resolved Hide resolved
let table = match self.opts.url.clone() {
Some(opts_url) => opts_url.join(self.loc.as_ref()),
None => url::Url::parse(self.loc.as_ref()),
}
.map_err(|e| DataFusionError::External(Box::new(e)))?;

let schema = stream.schema().clone();
let mut chunks = stream.chunks(32);
let write_opts = LanceWriteParams {
mode: WriteMode::Overwrite,
..Default::default()
};

while let Some(batches) = chunks.next().await {
let batch_iter =
RecordBatchIterator::new(batches.into_iter().map(|item| Ok(item?)), schema.clone());

match ds.clone() {
Some(mut d) => {
d.append(batch_iter, Some(write_opts.clone())).await?;
}
None => {
ds.replace(
Dataset::write(batch_iter, table.as_str(), Some(write_opts.clone()))
.await?,
);
}
}
}

Ok(ds)
}
}

#[async_trait]
impl DataSink for LanceSink {
async fn write_all(
&self,
data: Vec<SendableRecordBatchStream>,
_context: &Arc<TaskContext>,
) -> DfResult<u64> {
let mut ds: Option<Dataset> = None;
for stream in data {
ds = self.stream_into_inner(stream, ds).await?;
}
match ds {
Some(ds) => Ok(ds.count_rows().await? as u64),
None => Ok(0),
}
}
tychoish marked this conversation as resolved.
Show resolved Hide resolved
}
1 change: 1 addition & 0 deletions crates/datasources/src/common/sink/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod bson;
pub mod csv;
pub mod json;
pub mod lance;
pub mod parquet;

use std::io::{self, Write};
Expand Down
5 changes: 2 additions & 3 deletions crates/datasources/src/lance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ use lance::{dataset::builder::DatasetBuilder, Dataset};
use protogen::metastore::types::options::StorageOptions;

pub async fn scan_lance_table(location: &str, options: StorageOptions) -> Result<Dataset> {
DatasetBuilder::from_uri(location)
Ok(DatasetBuilder::from_uri(location)
.with_storage_options(options.inner.into_iter().collect())
.load()
.await
.map_err(|e| e.into())
.await?)
}
11 changes: 11 additions & 0 deletions crates/protogen/src/metastore/types/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1536,6 +1536,7 @@ pub struct CopyToDestinationOptionsAzure {
pub enum CopyToFormatOptions {
Csv(CopyToFormatOptionsCsv),
Parquet(CopyToFormatOptionsParquet),
Lance(CopyToFormatOptionsLance),
Json(CopyToFormatOptionsJson),
Bson,
}
Expand All @@ -1554,13 +1555,15 @@ impl CopyToFormatOptions {
pub const PARQUET: &'static str = "parquet";
pub const JSON: &'static str = "json";
pub const BSON: &'static str = "bson";
pub const LANCE: &'static str = "lance";

pub fn as_str(&self) -> &'static str {
match self {
Self::Csv(_) => Self::CSV,
Self::Parquet(_) => Self::PARQUET,
Self::Json(_) => Self::JSON,
Self::Bson => Self::BSON,
Self::Lance(_) => Self::LANCE,
}
}
}
Expand All @@ -1580,3 +1583,11 @@ pub struct CopyToFormatOptionsParquet {
pub struct CopyToFormatOptionsJson {
pub array: bool,
}

#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub struct CopyToFormatOptionsLance {
pub max_rows_per_file: Option<usize>,
pub max_rows_per_group: Option<usize>,
pub max_bytes_per_file: Option<usize>,
pub input_batch_size: Option<usize>,
}
40 changes: 39 additions & 1 deletion crates/protogen/src/sqlexec/copy_to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ pub enum CopyToFormatOptionsEnum {
Json(CopyToFormatOptionsJson),
#[prost(message, tag = "3")]
Parquet(CopyToFormatOptionsParquet),
#[prost(message, tag = "4")]
Lance(CopyToFormatOptionsLance),
}

#[derive(Clone, PartialEq, Message)]
Expand All @@ -96,6 +98,21 @@ pub struct CopyToFormatOptionsParquet {
pub row_group_size: u64,
}

#[derive(Clone, PartialEq, Message)]
pub struct CopyToFormatOptionsLance {
#[prost(uint64, optional, tag = "1")]
pub max_rows_per_file: Option<u64>,
#[prost(uint64, optional, tag = "2")]
pub max_rows_per_group: Option<u64>,
#[prost(uint64, optional, tag = "3")]
pub max_bytes_per_file: Option<u64>,
#[prost(uint64, optional, tag = "4")]
pub input_batch_size: Option<u64>,
}

#[derive(Clone, PartialEq, Message)]
pub struct CopyToFormatOptionsBson {}

impl TryFrom<crate::metastore::types::options::CopyToFormatOptions> for CopyToFormatOptions {
type Error = crate::errors::ProtoConvError;
fn try_from(
Expand All @@ -105,6 +122,18 @@ impl TryFrom<crate::metastore::types::options::CopyToFormatOptions> for CopyToFo
crate::metastore::types::options::CopyToFormatOptions::Bson => {
Ok(CopyToFormatOptions::default())
}
crate::metastore::types::options::CopyToFormatOptions::Lance(opts) => {
Ok(CopyToFormatOptions {
copy_to_format_options_enum: Some(CopyToFormatOptionsEnum::Lance(
CopyToFormatOptionsLance {
max_rows_per_file: opts.max_rows_per_file.map(|v| v as u64),
max_rows_per_group: opts.max_rows_per_group.map(|v| v as u64),
max_bytes_per_file: opts.max_bytes_per_file.map(|v| v as u64),
input_batch_size: opts.input_batch_size.map(|v| v as u64),
},
)),
})
}
crate::metastore::types::options::CopyToFormatOptions::Csv(csv) => {
Ok(CopyToFormatOptions {
copy_to_format_options_enum: Some(CopyToFormatOptionsEnum::Csv(
Expand Down Expand Up @@ -154,12 +183,21 @@ impl TryFrom<CopyToFormatOptions> for crate::metastore::types::options::CopyToFo
},
))
}
CopyToFormatOptionsEnum::Lance(lance) => Ok(
crate::metastore::types::options::CopyToFormatOptions::Lance(
crate::metastore::types::options::CopyToFormatOptionsLance {
max_rows_per_file: lance.max_rows_per_file.map(|v| v as usize),
max_rows_per_group: lance.max_rows_per_group.map(|v| v as usize),
max_bytes_per_file: lance.max_rows_per_group.map(|v| v as usize),
input_batch_size: lance.input_batch_size.map(|v| v as usize),
},
),
),
CopyToFormatOptionsEnum::Json(json) => {
Ok(crate::metastore::types::options::CopyToFormatOptions::Json(
crate::metastore::types::options::CopyToFormatOptionsJson { array: json.array },
))
}

CopyToFormatOptionsEnum::Parquet(parquet) => Ok(
crate::metastore::types::options::CopyToFormatOptions::Parquet(
crate::metastore::types::options::CopyToFormatOptionsParquet {
Expand Down
11 changes: 11 additions & 0 deletions crates/sqlexec/src/parser/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,17 @@ impl ParseOptionValue<String> for OptionValue {
}
}

impl ParseOptionValue<Vec<String>> for OptionValue {
fn parse_opt(self) -> Result<Vec<String>, ParserError> {
match self {
Self::QuotedLiteral(s) | Self::UnquotedLiteral(s) => {
Ok(s.split(',').map(|s| s.to_string()).collect())
}
o => Err(unexpected_type_err!("string slice", o)),
}
}
}

impl ParseOptionValue<bool> for OptionValue {
fn parse_opt(self) -> Result<bool, ParserError> {
let opt = match self {
Expand Down
32 changes: 32 additions & 0 deletions crates/sqlexec/src/planner/physical_plan/copy_to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use datafusion_ext::metrics::WriteOnlyDataSourceMetricsExecAdapter;
use datasources::common::sink::bson::BsonSink;
use datasources::common::sink::csv::{CsvSink, CsvSinkOpts};
use datasources::common::sink::json::{JsonSink, JsonSinkOpts};
use datasources::common::sink::lance::{LanceSink, LanceSinkOpts, LanceWriteParams};
use datasources::common::sink::parquet::{ParquetSink, ParquetSinkOpts};
use datasources::common::url::DatasourceUrl;
use datasources::object_store::gcs::GcsStoreAccess;
Expand Down Expand Up @@ -106,6 +107,13 @@ impl DisplayAs for CopyToExec {
impl CopyToExec {
async fn copy_to(self, context: Arc<TaskContext>) -> DataFusionResult<RecordBatch> {
let sink = match (self.dest, self.format) {
(CopyToDestinationOptions::Local(local_options), CopyToFormatOptions::Lance(opts)) => {
get_sink_for_obj(
CopyToFormatOptions::Lance(opts),
&LocalStoreAccess {},
&local_options.location,
)?
}
(CopyToDestinationOptions::Local(local_options), format) => {
{
// Create the path if it doesn't exist (for local).
Expand Down Expand Up @@ -177,6 +185,7 @@ fn get_sink_for_obj(
let store = access
.create_store()
.map_err(|e| DataFusionError::External(Box::new(e)))?;

let path = access
.path(location)
.map_err(|e| DataFusionError::External(Box::new(e)))?;
Expand All @@ -197,6 +206,29 @@ fn get_sink_for_obj(
row_group_size: parquet_opts.row_group_size,
},
)),
CopyToFormatOptions::Lance(opts) => {
let wp = LanceWriteParams::default();

Box::new(LanceSink::from_obj_store(
store,
path,
LanceSinkOpts {
url: Some(
url::Url::parse(
access
.base_url()
.map_err(|e| DataFusionError::External(Box::new(e)))?
.as_str(),
)
.map_err(|e| DataFusionError::External(Box::new(e)))?,
),
max_rows_per_file: opts.max_rows_per_file.unwrap_or(wp.max_rows_per_file),
max_rows_per_group: opts.max_rows_per_group.unwrap_or(wp.max_rows_per_group),
max_bytes_per_file: opts.max_bytes_per_file.unwrap_or(wp.max_bytes_per_file),
input_batch_size: opts.input_batch_size.unwrap_or(64),
},
))
}
CopyToFormatOptions::Json(json_opts) => Box::new(JsonSink::from_obj_store(
store,
path,
Expand Down
30 changes: 19 additions & 11 deletions crates/sqlexec/src/planner/session_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,17 @@ use protogen::metastore::types::catalog::{
use protogen::metastore::types::options::{
CopyToDestinationOptions, CopyToDestinationOptionsAzure, CopyToDestinationOptionsGcs,
CopyToDestinationOptionsLocal, CopyToDestinationOptionsS3, CopyToFormatOptions,
CopyToFormatOptionsCsv, CopyToFormatOptionsJson, CopyToFormatOptionsParquet,
CredentialsOptions, CredentialsOptionsAws, CredentialsOptionsAzure, CredentialsOptionsDebug,
CredentialsOptionsGcp, DatabaseOptions, DatabaseOptionsBigQuery, DatabaseOptionsCassandra,
DatabaseOptionsClickhouse, DatabaseOptionsDebug, DatabaseOptionsDeltaLake,
DatabaseOptionsMongoDb, DatabaseOptionsMysql, DatabaseOptionsPostgres,
DatabaseOptionsSnowflake, DatabaseOptionsSqlServer, DeltaLakeCatalog, DeltaLakeUnityCatalog,
StorageOptions, TableOptions, TableOptionsBigQuery, TableOptionsCassandra,
TableOptionsClickhouse, TableOptionsDebug, TableOptionsGcs, TableOptionsLocal,
TableOptionsMongoDb, TableOptionsMysql, TableOptionsObjectStore, TableOptionsPostgres,
TableOptionsS3, TableOptionsSnowflake, TableOptionsSqlServer, TunnelOptions,
TunnelOptionsDebug, TunnelOptionsInternal, TunnelOptionsSsh,
CopyToFormatOptionsCsv, CopyToFormatOptionsJson, CopyToFormatOptionsLance,
CopyToFormatOptionsParquet, CredentialsOptions, CredentialsOptionsAws, CredentialsOptionsAzure,
CredentialsOptionsDebug, CredentialsOptionsGcp, DatabaseOptions, DatabaseOptionsBigQuery,
DatabaseOptionsCassandra, DatabaseOptionsClickhouse, DatabaseOptionsDebug,
DatabaseOptionsDeltaLake, DatabaseOptionsMongoDb, DatabaseOptionsMysql,
DatabaseOptionsPostgres, DatabaseOptionsSnowflake, DatabaseOptionsSqlServer, DeltaLakeCatalog,
DeltaLakeUnityCatalog, StorageOptions, TableOptions, TableOptionsBigQuery,
TableOptionsCassandra, TableOptionsClickhouse, TableOptionsDebug, TableOptionsGcs,
TableOptionsLocal, TableOptionsMongoDb, TableOptionsMysql, TableOptionsObjectStore,
TableOptionsPostgres, TableOptionsS3, TableOptionsSnowflake, TableOptionsSqlServer,
TunnelOptions, TunnelOptionsDebug, TunnelOptionsInternal, TunnelOptionsSsh,
};
use protogen::metastore::types::service::{AlterDatabaseOperation, AlterTableOperation};
use sqlbuiltins::builtins::{CURRENT_SESSION_SCHEMA, DEFAULT_CATALOG};
Expand Down Expand Up @@ -1790,6 +1790,14 @@ impl<'a> SessionPlanner<'a> {
CopyToFormatOptions::Json(CopyToFormatOptionsJson { array })
}
Some(CopyToFormatOptions::BSON) => CopyToFormatOptions::Bson {},
Some(CopyToFormatOptions::LANCE) => {
CopyToFormatOptions::Lance(CopyToFormatOptionsLance {
max_rows_per_file: m.remove_optional("max_rows_per_file")?,
max_rows_per_group: m.remove_optional("max_rows_per_group")?,
max_bytes_per_file: m.remove_optional("max_bytes_per_file")?,
input_batch_size: m.remove_optional("input_batch_size")?,
})
}
Some(other) => return Err(internal!("unsupported output format: {other}")),
};

Expand Down
Loading