Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migration of storage layer to delta-rs #307

Merged
merged 21 commits into from
Mar 14, 2023
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
7362d0e
Initial migration of create and insert table paths to delta-rs
gruuya Feb 23, 2023
88b3e53
Revise the schema provider implementation to fit with non-cloneable D…
gruuya Feb 24, 2023
9707fdf
Use root internal object_store instead of storage options for managin…
gruuya Mar 1, 2023
97de330
Enable time-travel for Delta tables
gruuya Mar 1, 2023
6dce354
Implement deletion of table objects on DROP TABLE
gruuya Mar 1, 2023
6ae318f
Enable ALTER TABLE on object stores by table object renaming
gruuya Mar 2, 2023
dc2018f
Fix tests or ignore them if the implementation is still missing
gruuya Mar 3, 2023
77179e6
Remove testing crutches used during development
gruuya Mar 3, 2023
13ce11c
Format Cargo.toml as per tomlfmt
gruuya Mar 3, 2023
6838409
Pivot to using a uuid as delta table root directory
gruuya Mar 6, 2023
aead9eb
Test legacy tables by bootstraping from an existing catalog
gruuya Mar 7, 2023
c3faf29
Extend legacy table test to check migration works ok
gruuya Mar 7, 2023
513ee02
Re-implement DELETE in accordance with delta-rs
gruuya Mar 9, 2023
fa65b16
Enable UPDATE statements for delta tables
gruuya Mar 9, 2023
dd09c7b
Skip eager deletion of schema files used initially
gruuya Mar 9, 2023
6b54c1d
Implement repository changes needed for lazy cleanup of dropped tables
gruuya Mar 13, 2023
b3e71dd
Add dropped_tables system table and extend tests
gruuya Mar 13, 2023
0cbfab9
Persist new Delta table versions in Seafowl catalog
gruuya Mar 13, 2023
0249337
Bump to Datafusion 19
gruuya Mar 14, 2023
0e2543e
Move copied delta-rs methods to a separate module
gruuya Mar 14, 2023
9f56b12
Make table building logic more explicit
gruuya Mar 14, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
279 changes: 152 additions & 127 deletions Cargo.lock

Large diffs are not rendered by default.

40 changes: 24 additions & 16 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,19 @@ include = [

[features]
catalog-postgres = ["sqlx/postgres"]
default = ["catalog-postgres", "delta-tables", "frontend-postgres", "object-store-s3", "remote-tables"]
delta-tables = ["dep:deltalake", "dep:dynamodb_lock"]
default = ["catalog-postgres", "frontend-postgres", "object-store-s3", "remote-tables"]
frontend-postgres = ["convergence", "convergence-arrow"]
object-store-s3 = ["object_store/aws"]
remote-tables = ["dep:datafusion-remote-tables"]

[dependencies]
arrow = "32.0.0"
arrow-buffer = "32.0.0"
arrow = "33.0.0"
arrow-buffer = "33.0.0"
# For the JSON format support
# https://github.com/apache/arrow-rs/pull/2868
# https://github.com/apache/arrow-rs/pull/2724
arrow-integration-test = "32.0.0"
arrow-schema = "32.0.0"
arrow-integration-test = "33.0.0"
arrow-schema = "33.0.0"
async-trait = "0.1.64"
base64 = "0.21.0"

Expand All @@ -45,18 +44,20 @@ clap = { version = "3.2.19", features = [ "derive" ] }
config = "0.13.1"

# PG wire protocol support
convergence = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-18-upgrade", optional = true }
convergence-arrow = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-18-upgrade", package = "convergence-arrow", optional = true }
convergence = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-19-upgrade", optional = true }
convergence-arrow = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-19-upgrade", package = "convergence-arrow", optional = true }

datafusion = "18.0.0"
datafusion-common = "18.0.0"
datafusion-expr = "18.0.0"
datafusion-proto = "18.0.0"
datafusion = "19.0.0"
datafusion-common = "19.0.0"
datafusion-expr = "19.0.0"
datafusion-proto = "19.0.0"

datafusion-remote-tables = { path = "./datafusion_remote_tables", optional = true }

deltalake = { git = "https://github.com/splitgraph/delta-rs", branch = "datafusion-18-upgrade", features = ["s3-native-tls", "datafusion-ext"], optional = true }
dynamodb_lock = { git = "https://github.com/splitgraph/delta-rs", branch = "datafusion-18-upgrade", package = "dynamodb_lock", default_features = false, features = ["native-tls"], optional = true }

# Pick up unique delta object store url: https://github.com/delta-io/delta-rs/pull/1212
deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "cf95721866c3bf8e815cfa60195181f7282ea4fd", features = ["s3-native-tls", "datafusion-ext"] }
dynamodb_lock = { git = "https://github.com/delta-io/delta-rs", package = "dynamodb_lock", rev = "cf95721866c3bf8e815cfa60195181f7282ea4fd", default_features = false, features = ["native-tls"] }

futures = "0.3"
hex = ">=0.4.0"
Expand All @@ -82,12 +83,13 @@ serde = "1.0.138"
serde_json = "1.0.93"
sha2 = ">=0.10.1"
sqlparser = "0.30.0"
sqlx = { version = "0.6.2", features = [ "runtime-tokio-rustls", "sqlite", "any" ] }
sqlx = { version = "0.6.2", features = [ "runtime-tokio-rustls", "sqlite", "any", "uuid" ] }
strum = ">=0.24"
strum_macros = ">=0.24"
tempfile = "3"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "signal", "process"] }
url = "2.2"
uuid = "1.2.1"
warp = "0.3"
wasi-common = "1.0.1"

Expand All @@ -96,10 +98,16 @@ wasmtime = "1.0.1"
wasmtime-wasi = "1.0.1"

[patch.crates-io]
# Post-33 version with string to us timestamp casting: https://github.com/apache/arrow-rs/pull/3752
arrow-array = { git = "https://github.com/splitgraph/arrow-rs", rev = "57f79c03a8dee9d8bf8601bf555aa271746913fe", package = "arrow-array" }
arrow-buffer = { git = "https://github.com/splitgraph/arrow-rs", rev = "57f79c03a8dee9d8bf8601bf555aa271746913fe", package = "arrow-buffer" }
arrow-cast = { git = "https://github.com/splitgraph/arrow-rs", rev = "57f79c03a8dee9d8bf8601bf555aa271746913fe", package = "arrow-cast" }
arrow-data = { git = "https://github.com/splitgraph/arrow-rs", rev = "57f79c03a8dee9d8bf8601bf555aa271746913fe", package = "arrow-data" }
arrow-schema = { git = "https://github.com/splitgraph/arrow-rs", rev = "57f79c03a8dee9d8bf8601bf555aa271746913fe", package = "arrow-schema" }

[dev-dependencies]
assert_unordered = "0.3"
datafusion-common = "18.0.0"
datafusion-common = "19.0.0"
mockall = "0.11.1"
rstest = "*"
wiremock = "0.5"
Expand Down
12 changes: 6 additions & 6 deletions datafusion_remote_tables/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@ license = "Apache-2.0"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
arrow = "32.0.0"
arrow-buffer = "32.0.0"
arrow-schema = "32.0.0"
arrow = "33.0.0"
arrow-buffer = "33.0.0"
arrow-schema = "33.0.0"
async-trait = "0.1.64"

# Remote query execution for a variety of DBs
connectorx = { git = "https://github.com/splitgraph/connector-x", branch = "datafusion-18-upgrade", features = [ "dst_arrow", "src_postgres", "src_mysql", "src_sqlite" ] }
connectorx = { git = "https://github.com/splitgraph/connector-x", rev = "df8f50b3f53606717407c6677c7c0c2cbcc7f6ce", features = [ "dst_arrow", "src_postgres", "src_mysql", "src_sqlite" ] }

datafusion = "18.0.0"
datafusion-expr = "18.0.0"
datafusion = "19.0.0"
datafusion-expr = "19.0.0"
itertools = ">=0.10.0"
log = "0.4"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "signal", "process"] }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- Add down migration script here
ALTER TABLE "table" DROP COLUMN legacy;
ALTER TABLE "table" DROP COLUMN uuid;

DROP TABLE dropped_table;
19 changes: 19 additions & 0 deletions migrations/postgres/20230221081920_delta_lake_integration.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-- Add up migration script here
ALTER TABLE "table" ADD COLUMN legacy BOOLEAN DEFAULT FALSE;
ALTER TABLE "table" ADD COLUMN uuid UUID NOT NULL DEFAULT '00000000-0000-0000-0000-000000000000';
UPDATE "table" SET legacy = TRUE;

-- Add column for tracking Delta table versions; back-populate -1 for legacy tables
-- TODO: maybe version should be part of the primary key
ALTER TABLE table_version ADD COLUMN version BIGINT NOT NULL DEFAULT 0;
UPDATE table_version SET version = -1;

-- Table for facilitating soft-dropping of tables, via deferring the actual file deletion for later.
CREATE TABLE dropped_table (
database_name VARCHAR NOT NULL,
collection_name VARCHAR NOT NULL,
table_name VARCHAR NOT NULL,
uuid UUID NOT NULL,
deletion_status VARCHAR DEFAULT 'PENDING' CHECK ( deletion_status in ('PENDING', 'RETRY', 'FAILED') ),
drop_time TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT(now())
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- Add down migration script here
ALTER TABLE "table" DROP COLUMN legacy;
ALTER TABLE "table" DROP COLUMN uuid;

DROP TABLE dropped_table;
23 changes: 23 additions & 0 deletions migrations/sqlite/20230221081928_delta_lake_integration.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
-- Add up migration script here
ALTER TABLE "table" ADD COLUMN legacy BOOLEAN DEFAULT FALSE;
-- The main reason to go with BLOB (which isn't human readable) instead of TEXT is not performance but instead the lack
-- of support for decoding TEXT to uuid::Uuid in SQLite by sqlx: https://github.com/launchbadge/sqlx/issues/1083
-- On the other hand, while decoding TEXT to uuid::fmt::Hyphenated is supported in SQLite it isn't in Postgres, so this is
-- the only approach that works for now.
ALTER TABLE "table" ADD COLUMN uuid BLOB NOT NULL DEFAULT x'00000000000000000000000000000000';
UPDATE "table" SET legacy = TRUE;

-- Add column for tracking Delta table versions; back-populate -1 for legacy tables
-- TODO: maybe version should be part of the primary key
ALTER TABLE table_version ADD COLUMN version INTEGER NOT NULL DEFAULT 0;
UPDATE table_version SET version = -1;

-- Table for facilitating soft-dropping of tables, via deferring the actual file deletion for later.
CREATE TABLE dropped_table (
database_name VARCHAR NOT NULL,
collection_name VARCHAR NOT NULL,
table_name VARCHAR NOT NULL,
uuid BLOB NOT NULL,
deletion_status VARCHAR DEFAULT 'PENDING' CHECK ( deletion_status in ('PENDING', 'RETRY', 'FAILED') ),
drop_time INTEGER(4) NOT NULL DEFAULT((strftime('%s','now')))
);
Loading