Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: copy to support for lance #2342

Merged
merged 26 commits into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 164 additions & 0 deletions crates/datasources/src/common/sink/lance.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
use std::collections::HashSet;
use std::fmt;
use std::sync::Arc;

use async_trait::async_trait;
use datafusion::arrow::record_batch::RecordBatchIterator;
use datafusion::common::Result as DfResult;
use datafusion::error::DataFusionError;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::insert::DataSink;
use datafusion::physical_plan::DisplayAs;
use datafusion::physical_plan::{DisplayFormatType, SendableRecordBatchStream};
use futures::StreamExt;
use lance::dataset::{WriteMode, WriteParams};
use lance::io::writer::FileWriterOptions;
use lance::Dataset;
use object_store::{path::Path as ObjectPath, ObjectStore};

#[derive(Debug, Clone)]
pub struct LanceSinkOpts {
pub disable_all_column_stats: Option<bool>,
pub collect_all_column_stats: Option<bool>,
pub column_stats: Option<Vec<String>>,
pub url: Option<url::Url>,
}

/// Writes lance files to object storage.
#[derive(Debug, Clone)]
pub struct LanceSink {
store: Arc<dyn ObjectStore>,
loc: ObjectPath,
opts: LanceSinkOpts,
}

impl Default for LanceSinkOpts {
fn default() -> Self {
LanceSinkOpts {
collect_all_column_stats: Some(true),
column_stats: None,
disable_all_column_stats: None,
url: None,
}
}
}

impl fmt::Display for LanceSink {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "LanceSink({}:{})", self.store, self.loc)
}
}

impl DisplayAs for LanceSink {
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
match t {
DisplayFormatType::Default => write!(f, "{self}"),
DisplayFormatType::Verbose => write!(f, "{self}"),
}
}
}

impl LanceSink {
pub fn try_from_obj_store(
store: Arc<dyn ObjectStore>,
loc: impl Into<ObjectPath>,
opts: Option<LanceSinkOpts>,
) -> Result<Self, DataFusionError> {
Ok(LanceSink {
store,
loc: loc.into(),
opts: match opts {
Some(o) => o,
None => LanceSinkOpts::default(),
},
})
}

async fn stream_into_inner(
&self,
stream: SendableRecordBatchStream,
mut ds: Option<Dataset>,
) -> DfResult<Option<Dataset>> {
tychoish marked this conversation as resolved.
Show resolved Hide resolved
let mut opts = FileWriterOptions::default();
tychoish marked this conversation as resolved.
Show resolved Hide resolved
if self.opts.collect_all_column_stats.is_some_and(|val| val) {
tychoish marked this conversation as resolved.
Show resolved Hide resolved
opts.collect_stats_for_fields = stream
.schema()
.fields
.iter()
.enumerate()
.map(|v| v.0 as i32)
.collect();
} else if self.opts.column_stats.is_some() {
let colls: &Vec<String> = self.opts.column_stats.as_ref().unwrap();
let mut set = HashSet::with_capacity(colls.len());

for c in colls.iter() {
set.replace(c);
}

opts.collect_stats_for_fields = stream
.schema()
.fields
.iter()
.map(|f| f.name().to_owned())
.filter(|f| set.contains(f))
.enumerate()
.map(|v| v.0 as i32)
.collect();
}
let table = if self.opts.url.is_some() {
self.opts
.url
.clone()
.unwrap()
.join(self.loc.as_ref())
.map_err(|e| DataFusionError::External(Box::new(e)))?
} else {
url::Url::parse(self.loc.as_ref())
.map_err(|e| DataFusionError::External(Box::new(e)))?
};

let schema = stream.schema().clone();
let mut chunks = stream.chunks(32);
let write_opts = WriteParams {
mode: WriteMode::Overwrite,
..Default::default()
};
while let Some(batches) = chunks.next().await {
let batch_iter =
RecordBatchIterator::new(batches.into_iter().map(|item| Ok(item?)), schema.clone());

match ds.clone() {
Some(mut d) => {
d.append(batch_iter, Some(write_opts.clone())).await?;
}
None => {
ds.replace(
Dataset::write(batch_iter, table.as_str(), Some(write_opts.clone()))
.await?,
);
}
}
}

Ok(ds)
}
}

#[async_trait]
impl DataSink for LanceSink {
async fn write_all(
&self,
data: Vec<SendableRecordBatchStream>,
_context: &Arc<TaskContext>,
) -> DfResult<u64> {
let mut ds: Option<Dataset> = None;
for stream in data {
ds = self.stream_into_inner(stream, ds).await?;
}
match ds {
Some(ds) => Ok(ds.count_rows().await? as u64),
None => Ok(0),
}
}
tychoish marked this conversation as resolved.
Show resolved Hide resolved
}
1 change: 1 addition & 0 deletions crates/datasources/src/common/sink/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod bson;
pub mod csv;
pub mod json;
pub mod lance;
pub mod parquet;

use std::io::{self, Write};
Expand Down
5 changes: 2 additions & 3 deletions crates/datasources/src/lance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ use lance::{dataset::builder::DatasetBuilder, Dataset};
use protogen::metastore::types::options::StorageOptions;

pub async fn scan_lance_table(location: &str, options: StorageOptions) -> Result<Dataset> {
DatasetBuilder::from_uri(location)
Ok(DatasetBuilder::from_uri(location)
.with_storage_options(options.inner.into_iter().collect())
.load()
.await
.map_err(|e| e.into())
.await?)
}
10 changes: 10 additions & 0 deletions crates/protogen/src/metastore/types/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1536,6 +1536,7 @@ pub struct CopyToDestinationOptionsAzure {
pub enum CopyToFormatOptions {
Csv(CopyToFormatOptionsCsv),
Parquet(CopyToFormatOptionsParquet),
Lance(CopyToFormatOptionsLance),
Json(CopyToFormatOptionsJson),
Bson,
}
Expand All @@ -1554,13 +1555,15 @@ impl CopyToFormatOptions {
pub const PARQUET: &'static str = "parquet";
pub const JSON: &'static str = "json";
pub const BSON: &'static str = "bson";
pub const LANCE: &'static str = "lance";

pub fn as_str(&self) -> &'static str {
match self {
Self::Csv(_) => Self::CSV,
Self::Parquet(_) => Self::PARQUET,
Self::Json(_) => Self::JSON,
Self::Bson => Self::BSON,
Self::Lance(_) => Self::LANCE,
}
}
}
Expand All @@ -1580,3 +1583,10 @@ pub struct CopyToFormatOptionsParquet {
pub struct CopyToFormatOptionsJson {
pub array: bool,
}

#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub struct CopyToFormatOptionsLance {
pub disable_all_column_stats: Option<bool>,
pub collect_all_column_stats: Option<bool>,
pub collect_column_stats: Option<Vec<String>>,
}
tychoish marked this conversation as resolved.
Show resolved Hide resolved
44 changes: 43 additions & 1 deletion crates/protogen/src/sqlexec/copy_to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ pub enum CopyToFormatOptionsEnum {
Json(CopyToFormatOptionsJson),
#[prost(message, tag = "3")]
Parquet(CopyToFormatOptionsParquet),
#[prost(message, tag = "4")]
Lance(CopyToFormatOptionsLance),
}

#[derive(Clone, PartialEq, Message)]
Expand All @@ -96,6 +98,19 @@ pub struct CopyToFormatOptionsParquet {
pub row_group_size: u64,
}

#[derive(Clone, PartialEq, Message)]
pub struct CopyToFormatOptionsLance {
#[prost(bool, optional, tag = "1")]
pub disable_all_column_stats: Option<bool>,
#[prost(bool, optional, tag = "2")]
pub collect_all_column_stats: Option<bool>,
#[prost(string, repeated, tag = "3")]
pub collect_column_stats: Vec<String>,
}

#[derive(Clone, PartialEq, Message)]
pub struct CopyToFormatOptionsBson {}

impl TryFrom<crate::metastore::types::options::CopyToFormatOptions> for CopyToFormatOptions {
type Error = crate::errors::ProtoConvError;
fn try_from(
Expand All @@ -105,6 +120,21 @@ impl TryFrom<crate::metastore::types::options::CopyToFormatOptions> for CopyToFo
crate::metastore::types::options::CopyToFormatOptions::Bson => {
Ok(CopyToFormatOptions::default())
}
crate::metastore::types::options::CopyToFormatOptions::Lance(opts) => {
Ok(CopyToFormatOptions {
copy_to_format_options_enum: Some(CopyToFormatOptionsEnum::Lance(
CopyToFormatOptionsLance {
disable_all_column_stats: opts.disable_all_column_stats,
collect_all_column_stats: opts.collect_all_column_stats,
collect_column_stats: if let Some(colls) = opts.collect_column_stats {
colls
} else {
Vec::new()
},
},
)),
})
}
crate::metastore::types::options::CopyToFormatOptions::Csv(csv) => {
Ok(CopyToFormatOptions {
copy_to_format_options_enum: Some(CopyToFormatOptionsEnum::Csv(
Expand Down Expand Up @@ -154,12 +184,24 @@ impl TryFrom<CopyToFormatOptions> for crate::metastore::types::options::CopyToFo
},
))
}
CopyToFormatOptionsEnum::Lance(lance) => Ok(
crate::metastore::types::options::CopyToFormatOptions::Lance(
crate::metastore::types::options::CopyToFormatOptionsLance {
disable_all_column_stats: lance.disable_all_column_stats,
collect_all_column_stats: lance.collect_all_column_stats,
collect_column_stats: if lance.collect_column_stats.is_empty() {
None
} else {
Some(lance.collect_column_stats)
},
},
),
),
CopyToFormatOptionsEnum::Json(json) => {
Ok(crate::metastore::types::options::CopyToFormatOptions::Json(
crate::metastore::types::options::CopyToFormatOptionsJson { array: json.array },
))
}

CopyToFormatOptionsEnum::Parquet(parquet) => Ok(
crate::metastore::types::options::CopyToFormatOptions::Parquet(
crate::metastore::types::options::CopyToFormatOptionsParquet {
Expand Down
11 changes: 11 additions & 0 deletions crates/sqlexec/src/parser/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,17 @@ impl ParseOptionValue<String> for OptionValue {
}
}

impl ParseOptionValue<Vec<String>> for OptionValue {
fn parse_opt(self) -> Result<Vec<String>, ParserError> {
match self {
Self::QuotedLiteral(s) | Self::UnquotedLiteral(s) => {
Ok(s.split(',').map(|s| s.to_string()).collect())
}
o => Err(unexpected_type_err!("string slice", o)),
}
}
}

impl ParseOptionValue<bool> for OptionValue {
fn parse_opt(self) -> Result<bool, ParserError> {
let opt = match self {
Expand Down
19 changes: 19 additions & 0 deletions crates/sqlexec/src/planner/physical_plan/copy_to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use datafusion_ext::metrics::WriteOnlyDataSourceMetricsExecAdapter;
use datasources::common::sink::bson::BsonSink;
use datasources::common::sink::csv::{CsvSink, CsvSinkOpts};
use datasources::common::sink::json::{JsonSink, JsonSinkOpts};
use datasources::common::sink::lance::{LanceSink, LanceSinkOpts};
use datasources::common::sink::parquet::{ParquetSink, ParquetSinkOpts};
use datasources::common::url::DatasourceUrl;
use datasources::object_store::gcs::GcsStoreAccess;
Expand Down Expand Up @@ -106,6 +107,13 @@ impl DisplayAs for CopyToExec {
impl CopyToExec {
async fn copy_to(self, context: Arc<TaskContext>) -> DataFusionResult<RecordBatch> {
let sink = match (self.dest, self.format) {
(CopyToDestinationOptions::Local(local_options), CopyToFormatOptions::Lance(opts)) => {
get_sink_for_obj(
CopyToFormatOptions::Lance(opts),
&LocalStoreAccess {},
&local_options.location,
)?
}
(CopyToDestinationOptions::Local(local_options), format) => {
{
// Create the path if it doesn't exist (for local).
Expand Down Expand Up @@ -177,6 +185,7 @@ fn get_sink_for_obj(
let store = access
.create_store()
.map_err(|e| DataFusionError::External(Box::new(e)))?;

let path = access
.path(location)
.map_err(|e| DataFusionError::External(Box::new(e)))?;
Expand All @@ -197,6 +206,16 @@ fn get_sink_for_obj(
row_group_size: parquet_opts.row_group_size,
},
)),
CopyToFormatOptions::Lance(opts) => Box::new(LanceSink::try_from_obj_store(
store,
path,
Some(LanceSinkOpts {
disable_all_column_stats: opts.disable_all_column_stats,
collect_all_column_stats: opts.collect_all_column_stats,
column_stats: opts.collect_column_stats,
url: Some(url::Url::parse(access.base_url().unwrap().as_str()).unwrap()),
}),
)?),
CopyToFormatOptions::Json(json_opts) => Box::new(JsonSink::from_obj_store(
store,
path,
Expand Down
Loading