Skip to content

Commit

Permalink
Async GeoParquet writer (#587)
Browse files Browse the repository at this point in the history
Rust only for now
  • Loading branch information
kylebarron authored Mar 27, 2024
1 parent 541fd47 commit c3bd251
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 1 deletion.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ geos = ["dep:geos"]
geozero = ["dep:geozero"]
ipc_compression = ["arrow-ipc/lz4", "arrow-ipc/zstd"]
parquet = ["dep:parquet"]
parquet_async = ["parquet", "parquet/async", "dep:futures"]
parquet_async = ["parquet", "parquet/async", "dep:futures", "dep:tokio"]
parquet_compression = [
"parquet/snap",
"parquet/brotli",
Expand Down Expand Up @@ -88,6 +88,7 @@ sqlx = { version = "0.7", optional = true, default-features = false, features =
"tls-rustls",
] }
thiserror = "1"
tokio = { version = "1", default-features = false, optional = true }


[dev-dependencies]
Expand Down
1 change: 1 addition & 0 deletions python/core/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/io/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,5 @@ pub use reader::{read_geoparquet_async, ParquetDataset, ParquetFile, ParquetRead
pub use writer::{
write_geoparquet, GeoParquetWriter, GeoParquetWriterEncoding, GeoParquetWriterOptions,
};
#[cfg(feature = "parquet_async")]
pub use writer::{write_geoparquet_async, GeoParquetWriterAsync};
67 changes: 67 additions & 0 deletions src/io/parquet/writer/async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use crate::error::Result;
use crate::io::parquet::writer::encode::encode_record_batch;
use crate::io::parquet::writer::metadata::GeoParquetMetadataBuilder;
use crate::io::parquet::writer::options::GeoParquetWriterOptions;
use crate::table::GeoTable;
use arrow_array::RecordBatch;
use arrow_schema::Schema;
use parquet::arrow::AsyncArrowWriter;
use parquet::file::metadata::KeyValue;
use tokio::io::AsyncWrite;

pub async fn write_geoparquet_async<W: AsyncWrite + Unpin + Send>(
table: &mut GeoTable,
writer: W,
options: &GeoParquetWriterOptions,
) -> Result<()> {
let mut parquet_writer = GeoParquetWriterAsync::try_new(writer, table.schema(), options)?;

for batch in table.batches() {
parquet_writer.write_batch(batch).await?;
}

parquet_writer.finish().await?;
Ok(())
}

pub struct GeoParquetWriterAsync<W: AsyncWrite + Unpin + Send> {
writer: AsyncArrowWriter<W>,
metadata_builder: GeoParquetMetadataBuilder,
}

impl<W: AsyncWrite + Unpin + Send> GeoParquetWriterAsync<W> {
pub fn try_new(writer: W, schema: &Schema, options: &GeoParquetWriterOptions) -> Result<Self> {
let metadata_builder = GeoParquetMetadataBuilder::try_new(schema, options)?;

let writer = AsyncArrowWriter::try_new(
writer,
metadata_builder.output_schema.clone(),
options.writer_properties.clone(),
)?;

Ok(Self {
writer,
metadata_builder,
})
}

pub async fn write_batch(&mut self, batch: &RecordBatch) -> Result<()> {
let encoded_batch = encode_record_batch(batch, &mut self.metadata_builder)?;
self.writer.write(&encoded_batch).await?;
Ok(())
}

pub fn writer(&self) -> &AsyncArrowWriter<W> {
&self.writer
}

pub async fn finish(mut self) -> Result<()> {
if let Some(geo_meta) = self.metadata_builder.finish() {
let kv_metadata = KeyValue::new("geo".to_string(), serde_json::to_string(&geo_meta)?);
self.writer.append_key_value_metadata(kv_metadata);
}

self.writer.close().await?;
Ok(())
}
}
3 changes: 3 additions & 0 deletions src/io/parquet/writer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
mod r#async;
mod encode;
mod metadata;
mod options;
mod sync;

pub use options::{GeoParquetWriterEncoding, GeoParquetWriterOptions};
#[cfg(feature = "parquet_async")]
pub use r#async::{write_geoparquet_async, GeoParquetWriterAsync};
pub use sync::{write_geoparquet, GeoParquetWriter};
4 changes: 4 additions & 0 deletions src/io/parquet/writer/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ impl<W: Write + Send> GeoParquetWriter<W> {
Ok(())
}

pub fn writer(&self) -> &ArrowWriter<W> {
&self.writer
}

pub fn finish(mut self) -> Result<()> {
if let Some(geo_meta) = self.metadata_builder.finish() {
let kv_metadata = KeyValue::new("geo".to_string(), serde_json::to_string(&geo_meta)?);
Expand Down

0 comments on commit c3bd251

Please sign in to comment.