Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: buffered reading of transaction logs #1549

Merged
merged 32 commits into from
Sep 10, 2023

Conversation

eeroel
Copy link
Contributor

@eeroel eeroel commented Jul 20, 2023

Description

This PR introduces two changes to how the commit log is read:

  • When reading the commit log, get requests are made concurrently in a buffered stream. This decreases latency when there are many files in the log since the last checkpoint.
  • Before scanning the log, the latest table version is found by listing with list_with_offset. This helps avoid a potential infinite loop in a slow reader/fast writer scenario, and makes it possible to buffer GET requests without making any unnecessary ones. This listing logic is now also used for the time-travel reading.

Related Issue(s)

Documentation

@github-actions github-actions bot added binding/rust Issues for the Rust crate rust labels Jul 20, 2023
@github-actions
Copy link

ACTION NEEDED

delta-rs follows the Conventional Commits specification for release automation.

The PR title and description are used as the merge commit message. Please update your PR title and description to match the specification.

@rtyler rtyler marked this pull request as draft July 20, 2023 13:43

let buf_size = 50; // TODO

// why is mut needed here and is it OK?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To answer the question in the comment here, the mut is needed because of the log_buffer.next() below which will consume from the buffer

// why is mut needed here and is it OK?
let mut log_buffer = {
match max_version {
Some(n) => log_stream.take((n - self.version() + 2).try_into().unwrap()).buffered(buf_size),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Some(n) => log_stream.take((n - self.version() + 2).try_into().unwrap()).buffered(buf_size),
Some(n) => log_stream.take((n - self.version() + 2).try_into()?).buffered(buf_size),

I'm not sure off the top of my head, but I think the ? will work and help ensure that errors get propagated up to the DeltaResult returned by the function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This didn't work directly, but I added explicit handling for the error cases

};

while let Some((new_version, actions)) = {
let next_commit = log_buffer.next().await.unwrap();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let next_commit = log_buffer.next().await.unwrap();
let next_commit = log_buffer.next().await?;

This might be warranted as well, unwraps will panic the caller on errors which is not something we want

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case the await returned an Option so I just handled it as such in the match below. Now the case when the stream ends before expected is explicitly an error

rust/tests/read_delta_test.rs Outdated Show resolved Hide resolved
rust/tests/read_delta_test.rs Outdated Show resolved Hide resolved
Comment on lines 941 to 954
/// TODO
pub async fn open_table_with_version_and_log_buffer(
table_uri: impl AsRef<str>,
version: i64,
log_buffer_size: usize,
) -> Result<DeltaTable, DeltaTableError> {
let table = DeltaTableBuilder::from_uri(table_uri)
.with_version(version)
.with_log_buffer_size(log_buffer_size)
.load()
.await?;
Ok(table)
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have been discouraging more use of these top level methods, I would prefer this function not be added and instead users who are interested in buffering to use the table builder method

@@ -153,6 +169,12 @@ impl DeltaTableBuilder {
self
}

/// Sets `log_buffer_size` to the builder
pub fn with_log_buffer_size(mut self, log_buffer_size: usize) -> Self {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub fn with_log_buffer_size(mut self, log_buffer_size: usize) -> Self {
pub fn with_buffer(mut self, log_buffer_size: usize) -> Self {

IMHO this can be more concise

@rtyler rtyler changed the title (WIP) Feat/buffered log reading feat: buffered reading of transaction logs Jul 21, 2023
@rtyler rtyler self-assigned this Jul 21, 2023
@eeroel eeroel force-pushed the feat/buffered_log_reading branch from e3a913c to 6c64eb2 Compare July 21, 2023 15:43
@github-actions github-actions bot added the binding/python Issues for the Python package label Jul 22, 2023
@eeroel eeroel force-pushed the feat/buffered_log_reading branch from c8e5a79 to e32d71f Compare July 22, 2023 06:28
@eeroel eeroel requested a review from rtyler July 22, 2023 08:06
@eeroel
Copy link
Contributor Author

eeroel commented Jul 22, 2023

Did some refactoring and revisions according to suggestions, and also exposed the parameter to Python. Confirmed the speed-up locally against a table in S3, from_data_catalog takes 13s with no concurrency and <2s with log_buffer_size=30 (87 files in the log since the last checkpoint)

@@ -365,7 +388,7 @@ lazy_static::lazy_static! {
/// Extra slashes will be removed from the end path as well.
///
/// Will return an error if the location is not valid. For example,
pub(crate) fn ensure_table_uri(table_uri: impl AsRef<str>) -> DeltaResult<Url> {
pub fn ensure_table_uri(table_uri: impl AsRef<str>) -> DeltaResult<Url> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This I figured might not be a good idea, but is there some other way of using the function in the test?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To keep it private, we can keep the tests within this file, in the tests section at the bottom.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, then I have some trouble with the mod fs_common where I put the SlowStore implementation. And the tests are not really testing builder per se. Would it make sense to just make a copy of this function in fs_common for the purposes of the test?

Some((v, Ok(x))) => Ok(Some((v, self.get_actions(v, x.bytes().await?).await?))),
Some((_, Err(ObjectStoreError::NotFound { .. }))) => Ok(None),
Some((_, Err(err))) => Err(DeltaTableError::GenericError { source: Box::new(err) }), // TODO ??
None => Err(DeltaTableError::Generic(String::from("Log stream closed unexpectedly!")))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is perhaps a bit hazy, but isn't a None at the end of the stream an expected condition? Won't this always happen once the stream is at the end?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah you're right, this is expected. I made this an Ok case and also updated the .take call to take max_version - self.version() elements instead of +2. Had some confusion there earlier due to this error.

Comment on lines 131 to 133
pub struct SlowStore {
inner: DeltaObjectStore
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😃

let max_iter = 10;
let buf_size = 10;

let location = deltalake::builder::ensure_table_uri(path).unwrap();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm on the fence about letting this API leak out here, but let's roll with it for now

@eeroel eeroel force-pushed the feat/buffered_log_reading branch from 4490249 to caa18f0 Compare July 23, 2023 04:31
while let PeekCommit::New(new_version, actions) =
self.peek_next_commit(self.version()).await?
let max_version = max_version.and_then(|x| {
if x <= self.version() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rtyler I refactored this max_version handling a bit so it's clearer. Here I made explicit the behavior that's on main currently: i.e. the table will be updated to the latest version if max_version is smaller or equal to the current version.

@eeroel
Copy link
Contributor Author

eeroel commented Jul 23, 2023

Did some refactoring and fixes and added a test to check that results are correct with different buffer sizes. I will still think a bit about how to limit the number of requests, as now there can be potentially a lot of unnecessary ones if max_version is not set

@eeroel eeroel force-pushed the feat/buffered_log_reading branch 2 times, most recently from a7608d2 to b187374 Compare July 23, 2023 14:23
@@ -540,14 +550,78 @@ impl DeltaTable {
self.version(),
);

while let PeekCommit::New(new_version, actions) =
self.peek_next_commit(self.version()).await?
// update to latest version if given max_version is not larger than current version
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rtyler I ended up adding this bigger change to list the files to find out what the latest version is. The code is mostly copied over from another module so it needs refactoring. It's somewhat orthogonal to the buffered reading changes, but unless I'm missing something

  • it's necessary in order to make sure the load actually completes
  • it lets get requests to be buffered without creating any wasted requests (and thus the user doesn't need to be made aware of potential extra costs)

I understand though that this is a more major change because listing can be costly with big logs and in particular with no prefix filtering. And as far as I understood the protocol doesn't prescribe how to do this, but it does sort of hint at listing first being the way to read tables.

@eeroel eeroel force-pushed the feat/buffered_log_reading branch from 6da464f to c591971 Compare July 25, 2023 06:13
auto-merge was automatically disabled September 8, 2023 07:10

Head branch was pushed to by a user without write access

@eeroel
Copy link
Contributor Author

eeroel commented Sep 8, 2023

Rebased on main

@wjones127 wjones127 enabled auto-merge (squash) September 9, 2023 23:23
@wjones127 wjones127 merged commit eb36ddf into delta-io:main Sep 10, 2023
21 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
binding/python Issues for the Python package binding/rust Issues for the Rust crate rust
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants