Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: integrate unity catalog with datafusion #1338

Merged
merged 28 commits into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
8649548
feat: extend unit catalog support
roeap May 6, 2023
ac9bec1
chore: draft datafusion integration
roeap May 6, 2023
ddc1098
fix: allow passing catalog options from python
roeap May 6, 2023
e7178f4
chore: clippy
roeap May 6, 2023
c57d8db
feat: add more azure credentials
roeap May 7, 2023
ab59f9a
fix: add defaults for return types
roeap May 7, 2023
05642da
fix: simpler defaults
roeap May 7, 2023
a1ea5aa
Update rust/src/data_catalog/unity/mod.rs
roeap May 13, 2023
052f60b
fix: imports
roeap May 13, 2023
18317e1
fix: add some defaults
roeap May 13, 2023
5d5fd91
test: add failing provider test
roeap May 14, 2023
697b66f
feat: list catalogs
roeap May 15, 2023
75c745a
Merge branch 'main' into unity-catalog
roeap May 15, 2023
5930da1
Merge branch 'main' into unity-catalog
roeap May 21, 2023
a04b45f
Merge branch 'main' into unity-catalog
roeap May 22, 2023
378e41c
Merge branch 'main' into unity-catalog
roeap May 27, 2023
66e83cb
merge main
roeap Jun 2, 2023
1020898
Merge branch 'main' into unity-catalog
roeap Jun 2, 2023
80b25ca
fix: remove artifact
roeap Jun 2, 2023
83dcb6d
fix: errors after merge with main
roeap Jun 3, 2023
719266e
Merge remote-tracking branch 'upstream/main' into unity-catalog
rtyler Sep 19, 2023
b2ba0d7
Correct some merge related errors with redundant package names from t…
rtyler Sep 19, 2023
b1fab03
Address some latent clippy failures after merging main
rtyler Sep 19, 2023
9119ee6
Correct the incorrect documentation for `Backoff`
rtyler Sep 19, 2023
fff05c7
Merge branch 'main' into unity-catalog
rtyler Sep 19, 2023
51db313
Merge branch 'main' into unity-catalog
rtyler Sep 19, 2023
645abb9
Merge remote-tracking branch 'upstream/main' into unity-catalog
rtyler Sep 19, 2023
9b3a0e5
Merge remote-tracking branch 'roeap/unity-catalog' into unity-catalog
rtyler Sep 19, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/dev_pr/labeler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,3 @@ proofs:

tlaplus:
- tlaplus/**/*

6 changes: 4 additions & 2 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,17 @@ impl RawDeltaTable {
}

#[classmethod]
#[pyo3(signature = (data_catalog, database_name, table_name, data_catalog_id, catalog_options = None))]
fn get_table_uri_from_data_catalog(
_cls: &PyType,
data_catalog: &str,
database_name: &str,
table_name: &str,
data_catalog_id: Option<String>,
catalog_options: Option<HashMap<String, String>>,
) -> PyResult<String> {
let data_catalog =
deltalake::data_catalog::get_data_catalog(data_catalog).map_err(|_| {
let data_catalog = deltalake::data_catalog::get_data_catalog(data_catalog, catalog_options)
.map_err(|_| {
PyValueError::new_err(format!("Catalog '{}' not available.", data_catalog))
})?;
let table_uri = rt()?
Expand Down
22 changes: 13 additions & 9 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,8 @@ tokio = { workspace = true, features = [

# other deps (these should be organized and pulled into workspace.dependencies as necessary)
cfg-if = "1"
datafusion-objectstore-hdfs = { version = "0.1.3", default-features = false, features = [
"hdfs3",
"try_spawn_blocking",
], optional = true }
errno = "0.3"
hyper = { version = "0.14", optional = true }
itertools = "0.11"
lazy_static = "1"
log = "0"
Expand All @@ -78,6 +75,14 @@ once_cell = "1.16.0"
parking_lot = "0.12"
parquet2 = { version = "0.17", optional = true }
percent-encoding = "2"
tracing = { version = "0.1", optional = true }
rand = "0.8"

# hdfs
datafusion-objectstore-hdfs = { version = "0.1.3", default-features = false, features = [
"hdfs3",
"try_spawn_blocking",
], optional = true }

# S3 lock client
rusoto_core = { version = "0.47", default-features = false, optional = true }
Expand All @@ -93,8 +98,6 @@ reqwest = { version = "0.11.18", default-features = false, features = [
"rustls-tls",
"json",
], optional = true }
reqwest-middleware = { version = "0.2.1", optional = true }
reqwest-retry = { version = "0.2.2", optional = true }

# Datafusion
dashmap = { version = "5", optional = true }
Expand All @@ -117,6 +120,7 @@ tempdir = "0"
tempfile = "3"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
utime = "0.3"
hyper = { version = "0.14", features = ["server"] }

[features]
azure = ["object_store/azure"]
Expand Down Expand Up @@ -145,8 +149,8 @@ datafusion = [
]
datafusion-ext = ["datafusion"]
gcs = ["object_store/gcp"]
glue = ["s3", "rusoto_glue/rustls"]
glue-native-tls = ["s3-native-tls", "rusoto_glue"]
glue = ["s3", "rusoto_glue/rustls", "tracing", "hyper"]
glue-native-tls = ["s3-native-tls", "rusoto_glue", "tracing", "hyper"]
hdfs = ["datafusion-objectstore-hdfs"]
# used only for integration testing
integration_test = ["fs_extra", "tempdir"]
Expand All @@ -168,7 +172,7 @@ s3 = [
"dynamodb_lock/rustls",
"object_store/aws",
]
unity-experimental = ["reqwest", "reqwest-middleware", "reqwest-retry"]
unity-experimental = ["reqwest", "tracing", "hyper"]

[[bench]]
name = "read_checkpoint"
Expand Down
135 changes: 135 additions & 0 deletions rust/src/data_catalog/client/backoff.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
//! Exponential backoff with jitter
use rand::prelude::*;
use std::time::Duration;

/// Exponential backoff with jitter
///
/// See <https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/>
#[allow(missing_copy_implementations)]
#[derive(Debug, Clone)]
pub struct BackoffConfig {
/// The initial backoff duration
pub init_backoff: Duration,
/// The maximum backoff duration
pub max_backoff: Duration,
/// The base of the exponential to use
pub base: f64,
}

impl Default for BackoffConfig {
fn default() -> Self {
Self {
init_backoff: Duration::from_millis(100),
max_backoff: Duration::from_secs(15),
base: 2.,
}
}
}

/// [`Backoff`] can be created from a [`BackoffConfig`]
///
/// Consecutive calls to [`Backoff::tick`] will return the next backoff interval
pub struct Backoff {
init_backoff: f64,
next_backoff_secs: f64,
max_backoff_secs: f64,
base: f64,
rng: Option<Box<dyn RngCore + Sync + Send>>,
}

impl std::fmt::Debug for Backoff {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Backoff")
.field("init_backoff", &self.init_backoff)
.field("next_backoff_secs", &self.next_backoff_secs)
.field("max_backoff_secs", &self.max_backoff_secs)
.field("base", &self.base)
.finish()
}
}

impl Backoff {
/// Create a new [`Backoff`] from the provided [`BackoffConfig`]
pub fn new(config: &BackoffConfig) -> Self {
Self::new_with_rng(config, None)
}

/// Creates a new `Backoff` with the optional `rng`
///
/// Used [`rand::thread_rng()`] if no rng provided
pub fn new_with_rng(
config: &BackoffConfig,
rng: Option<Box<dyn RngCore + Sync + Send>>,
) -> Self {
let init_backoff = config.init_backoff.as_secs_f64();
Self {
init_backoff,
next_backoff_secs: init_backoff,
max_backoff_secs: config.max_backoff.as_secs_f64(),
base: config.base,
rng,
}
}

/// Returns the next backoff duration to wait for
pub fn tick(&mut self) -> Duration {
let range = self.init_backoff..(self.next_backoff_secs * self.base);

let rand_backoff = match self.rng.as_mut() {
Some(rng) => rng.gen_range(range),
None => thread_rng().gen_range(range),
};

let next_backoff = self.max_backoff_secs.min(rand_backoff);
Duration::from_secs_f64(std::mem::replace(&mut self.next_backoff_secs, next_backoff))
}
}

#[cfg(test)]
mod tests {
use super::*;
use rand::rngs::mock::StepRng;

#[test]
fn test_backoff() {
let init_backoff_secs = 1.;
let max_backoff_secs = 500.;
let base = 3.;

let config = BackoffConfig {
init_backoff: Duration::from_secs_f64(init_backoff_secs),
max_backoff: Duration::from_secs_f64(max_backoff_secs),
base,
};

let assert_fuzzy_eq = |a: f64, b: f64| assert!((b - a).abs() < 0.0001, "{a} != {b}");

// Create a static rng that takes the minimum of the range
let rng = Box::new(StepRng::new(0, 0));
let mut backoff = Backoff::new_with_rng(&config, Some(rng));

for _ in 0..20 {
assert_eq!(backoff.tick().as_secs_f64(), init_backoff_secs);
}

// Create a static rng that takes the maximum of the range
let rng = Box::new(StepRng::new(u64::MAX, 0));
let mut backoff = Backoff::new_with_rng(&config, Some(rng));

for i in 0..20 {
let value = (base.powi(i) * init_backoff_secs).min(max_backoff_secs);
assert_fuzzy_eq(backoff.tick().as_secs_f64(), value);
}

// Create a static rng that takes the mid point of the range
let rng = Box::new(StepRng::new(u64::MAX / 2, 0));
let mut backoff = Backoff::new_with_rng(&config, Some(rng));

let mut value = init_backoff_secs;
for _ in 0..20 {
assert_fuzzy_eq(backoff.tick().as_secs_f64(), value);
value =
(init_backoff_secs + (value * base - init_backoff_secs) / 2.).min(max_backoff_secs);
}
}
}
94 changes: 94 additions & 0 deletions rust/src/data_catalog/client/mock_server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
use std::collections::VecDeque;
use std::convert::Infallible;
use std::net::SocketAddr;
use std::sync::Arc;

use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response, Server};
use parking_lot::Mutex;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;

pub type ResponseFn = Box<dyn FnOnce(Request<Body>) -> Response<Body> + Send>;

/// A mock server
pub struct MockServer {
responses: Arc<Mutex<VecDeque<ResponseFn>>>,
shutdown: oneshot::Sender<()>,
handle: JoinHandle<()>,
url: String,
}

impl Default for MockServer {
fn default() -> Self {
Self::new()
}
}

impl MockServer {
pub fn new() -> Self {
let responses: Arc<Mutex<VecDeque<ResponseFn>>> =
Arc::new(Mutex::new(VecDeque::with_capacity(10)));

let r = Arc::clone(&responses);
let make_service = make_service_fn(move |_conn| {
let r = Arc::clone(&r);
async move {
Ok::<_, Infallible>(service_fn(move |req| {
let r = Arc::clone(&r);
async move {
Ok::<_, Infallible>(match r.lock().pop_front() {
Some(r) => r(req),
None => Response::new(Body::from("Hello World")),
})
}
}))
}
});

let (shutdown, rx) = oneshot::channel::<()>();
let server = Server::bind(&SocketAddr::from(([127, 0, 0, 1], 0))).serve(make_service);

let url = format!("http://{}", server.local_addr());

let handle = tokio::spawn(async move {
server
.with_graceful_shutdown(async {
rx.await.ok();
})
.await
.unwrap()
});

Self {
responses,
shutdown,
handle,
url,
}
}

/// The url of the mock server
pub fn url(&self) -> &str {
&self.url
}

/// Add a response
pub fn push(&self, response: Response<Body>) {
self.push_fn(|_| response)
}

/// Add a response function
pub fn push_fn<F>(&self, f: F)
where
F: FnOnce(Request<Body>) -> Response<Body> + Send + 'static,
{
self.responses.lock().push_back(Box::new(f))
}

/// Shutdown the mock server
pub async fn shutdown(self) {
let _ = self.shutdown.send(());
self.handle.await.unwrap()
}
}
Loading
Loading