Skip to content

Commit

Permalink
list files to get latest version
Browse files Browse the repository at this point in the history
  • Loading branch information
eeroel committed Jul 23, 2023
1 parent 54a6691 commit a7608d2
Showing 1 changed file with 42 additions and 20 deletions.
62 changes: 42 additions & 20 deletions rust/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,16 +549,45 @@ impl DeltaTable {
"incremental update with version({}) and max_version({max_version:?})",
self.version(),
);
println!("Starting update");

let max_version = max_version.and_then(|x| {
if x <= self.version() {
// update to latest version
None
} else {
Some(x)
// update to latest version if given max_version is not larger than current version
let max_version = max_version.filter(|x| x > &self.version());
let max_version: i64 = match max_version {
Some(x) => x,
None => {
// list files to find max version
lazy_static! {
static ref DELTA_LOG_REGEX: Regex =
Regex::new(r#"_delta_log/(\d{20})\.(json|checkpoint).*$"#).unwrap();
}

lazy_static! {
static ref DELTA_LOG_PATH: Path = Path::from("_delta_log");
}

let version = async {
let mut ver: i64 = self.version(); // if no files are found, use the current version
let prefix_path = self.storage.log_path();
println!("{prefix_path}");
let prefix = Some(prefix_path);
let offset_path = commit_uri_from_version(self.version()-1);
let mut files = self.storage.list_with_offset(prefix, &offset_path).await?;
while let Some(obj_meta) = files.next().await {
let obj_meta = obj_meta?;
if let Some(captures) = DELTA_LOG_REGEX.captures(obj_meta.location.as_ref()) {
let log_ver_str = captures.get(1).unwrap().as_str();
let log_ver: i64 = log_ver_str.parse().unwrap();
// listing is not ordered
ver = max(ver, log_ver);
}
}
Ok::<i64, DeltaTableError>(ver)
};
version.await?
}
});
};

let buf_size = self.config.log_buffer_size;

// construct stream yielding (version, bytes)
Expand All @@ -573,17 +602,10 @@ impl DeltaTable {
});

let mut log_buffer = {
match max_version {
None => {
log_stream.take(usize::MAX).buffered(buf_size)
}
Some(n) => {
let n_commits = usize::try_from(n - self.version());
match n_commits {
Ok(n) => log_stream.take(n).buffered(buf_size),
Err(err) => return Err(DeltaTableError::GenericError { source: Box::new(err) })
}
}
let n_commits = usize::try_from(max_version - self.version());
match n_commits {
Ok(n) => log_stream.take(n).buffered(buf_size),
Err(err) => return Err(DeltaTableError::GenericError { source: Box::new(err) })
}
};

Expand All @@ -601,7 +623,7 @@ impl DeltaTable {
let s = DeltaTableState::from_actions(actions, new_version)?;
self.state
.merge(s, self.config.require_tombstones, self.config.require_files);
if Some(self.version()) == max_version {
if self.version() == max_version {
return Ok(());
}
}
Expand Down

0 comments on commit a7608d2

Please sign in to comment.