Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC: Add ParquetMetaDataReader #6392

Draft
wants to merge 27 commits into
base: master
Choose a base branch
from
Draft

Conversation

etseidl
Copy link
Contributor

@etseidl etseidl commented Sep 13, 2024

Which issue does this PR close?

Relates to #6002

Rationale for this change

This is an attempt to consolidate Parquet footer/page index reading/parsing into a single place.

What changes are included in this PR?

The new ParquetMetaDataReader basically takes the code in parquet/src/file/footer.rs and parquet/src/arrow/async_reader/metadata.rs and mashes them together into a single API. Using this, the read_metadata_from_file call from #6081 would become:

fn read_metadata_from_file(file: impl AsRef<Path>) -> ParquetMetaData {
    let reader = ParquetMetaDataReader::new()
        .with_page_indexes(true);
    let mut file = std::fs::File::open(file).unwrap();
    reader.try_parse(file).unwrap();
    // return ParquetMetaData with page indexes populated
    reader.finish().unwrap()
}

Also included are two async functions try_load() and try_load_from_tail(). The former is a combination of MetadataLoader::load() and MetadataLoader::load_page_index. The latter is an attempt at addressing the issue of loading the footer when the file size is not known, so it requires being able to seek from the end of the file.

This implementation is very rough, with not enough safety checking and documentation. At this point I'm hoping for feedback on the approach. If this seems at all useful, then a path forward would be to first add ParquetMetaDataReader alone, and then in subsequent PRs begin to use it as a replacement for other functions which could then be deprecated. The idea is to get as much in without breaking changes, and then introduce the breaking changes once 54.0.0 is open.

Are there any user-facing changes?

Eventually, yes.

@github-actions github-actions bot added the parquet Changes to the parquet crate label Sep 13, 2024
@etseidl
Copy link
Contributor Author

etseidl commented Sep 13, 2024

@alamb @adriangb I'm hoping you'll have time to take a look and see if this is a step in the right direction.

@alamb
Copy link
Contributor

alamb commented Sep 15, 2024

Thank you @etseidl this looks amazing. I hope to give it a look over the next few days

Copy link
Contributor

@adriangb adriangb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This generally looks great to me! The new API with the sync / async method is perfect. Amazing work!

The TODO with the ability to load from a byte range that only has the metadata is the one piece missing for this to satisfy our use case, but we can easily work around that by passing in a ChunkReader that lies about the offsets it's loading from and errors if you try to access an offset outside of it's range.

@etseidl
Copy link
Contributor Author

etseidl commented Sep 17, 2024

Thanks @adriangb. I've added the functionality from the TODO (try_parse_range()), will this work for you?

@adriangb
Copy link
Contributor

adriangb commented Sep 17, 2024

Amazing speed, yes that's perfect!

Copy link
Member

@Xuanwo Xuanwo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much for working on this. Here are some suggestions from my side.

prefetch_hint: Option<usize>,
}

impl Default for ParquetMetaDataReader {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we can derive(Default) and use Self::default() in fn new()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course. Done.

Comment on lines 164 to 167
if self.metadata.is_none() {
return Err(general_err!("could not parse parquet metadata"));
}
Ok(self.metadata.take().unwrap())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, I know the unwrap() here is safe, but what about replacing it with

self.metadata
    .take()
    .ok_or_else(|| general_err!("could not parse parquet metadata"))

I believe it can eliminate an extra check and improve readability.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, still learning Rust idioms :)


// Get bounds needed for page indexes (if any are present in the file).
let range = self.range_for_page_index();
let range = match range {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using:

let Some(range) = self.range_for_page_index() else {
    return Ok(());
};

/// least two fetches, regardless of the value of `prefetch_hint`, if the page indexes are
/// requested.
#[cfg(feature = "async")]
pub async fn try_load_from_tail<R: AsyncFileReader + AsyncRead + AsyncSeek + Unpin + Send>(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, requiring an additional bound of AsyncRead + AsyncSeek is a bit confusing. Could you provide more context?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm echoing the bounds from here. AsyncFileReader isn't really necessary, though.

Copy link
Member

@Xuanwo Xuanwo Sep 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this design is correct. How about removing this API until we resolve #6157? We can bring this API back while we have native suffix read support.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, removed. TBH I don't fully understand the issue in #6157 and thought the approach in AsyncFileReader::get_metadata could be an alternative solution.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much @etseidl -- I basically agree with @adriangb and @Xuanwo that this is wonderful work

This is an attempt to consolidate Parquet footer/page index reading/parsing into a single place.

It is amazing

This implementation is very rough, with not enough safety checking and documentation. At this point I'm hoping for feedback on the approach.

My feedback is it is great -- thank you to @adriangb and @Xuanwo for their earlier feedback.

If this seems at all useful, then a path forward would be to first add ParquetMetaDataReader alone, and then in subsequent PRs begin to use it as a replacement for other functions which could then be deprecated. The idea is to get as much in without breaking changes, and then introduce the breaking changes once 54.0.0 is open.

I think this plan makes a lot of sense to me (and I think we can avoid most breaking changes -- deprecating is not a breaking change in my perspective)

@@ -61,6 +66,8 @@ impl std::fmt::Display for ParquetError {
write!(fmt, "Index {index} out of bound: {bound}")
}
ParquetError::External(e) => write!(fmt, "External: {e}"),
ParquetError::NeedMoreData(needed) => write!(fmt, "NeedMoreData: {needed}"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A nitpick is that these seems pretty similar . I wonder if it would make sense to combine them somehow 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't intending two, it just turned out that way. I could make the second usize optional with the understanding that a range is being requested.

Also, does adding to the enum make this a breaking change? If so, I could go back to my tortured use of IndexOutOfBound until it's open season on breaking changes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, does adding to the enum make this a breaking change? If so, I could go back to my tortured use of IndexOutOfBound until it's open season on breaking changes.

Yes, unfortunately, it does make it a breaking change

https://github.com/apache/arrow-rs/blob/master/parquet/src/errors.rs#L29

We should probably mark the error type as "non exhaustive" which would make it a non breaking change in the future

@@ -237,8 +236,10 @@ where
Fut: Future<Output = Result<Bytes>> + Send,
{
let fetch = MetadataFetchFn(fetch);
let loader = MetadataLoader::load(fetch, file_size, prefetch).await?;
Ok(loader.finish())
// TODO(ets): should add option to read page index to this function
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An alternative perhaps would be to deprecate fetch_parquet_metadata entirely and suggest people use ParquetMetaDataReader which s more complete and full featured -- I think we could deprecate this function in a minor release (we can't remover it until a major release)

/// arguments).
///
/// [Page Index]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
pub fn parquet_metadata_from_file<R: ChunkReader>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A nitpick is maybe we can call this "parquet_metadata_from_reader`

Also I wonder if instead of a new API it would make sense to always directly use ParquetMetaDataReader directly. That would certainly be more verbose, but it also might be more explicit.

For the common case that the wrapping code won't retry (aka all the callsites of parquet_metadata_from_file, we could also add some sort of consuming API too that combines try_parse and finish to make it less verbose. Something like

    let metadata = ParquetMetaDataReader::new()
        .with_column_indexes(column_index)
        .with_offset_indexes(offset_index)
        .parse(file)?;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that seems reasonable. And yes, I struggle with naming things 😄.

}

/// Same as [`Self::try_parse()`], but only `file_range` bytes of the original file are
/// available. `file_range.end` must point to the end of the file.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was a little confused about how file_range works in this case (given that it seems to me that ChunkReader would in theory allow reading arbitrary ranges)

Is the idea that try_parse_range limits the requests to the reader so they are only within file_range?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's for the case of impl ChunkReader for Bytes. Say you've speculatively read the last 1000 bytes of a file into a buffer, and let's say that's actually sufficient for the page indexes. But the locations of the page indexes are absolute (in this case they're in the range 1100..1500), so you can't actually find the indexes in the buffer unless you know the file offset of the beginning of the buffer (trying to seek to 1100 in a 1000 byte buffer clearly won't work...you need to subtract 1000 from the absolute offsets and read 100..500 from the buffer).

I also wanted different error behavior for the File vs Bytes use cases. If a File is passed, there's no sense in asking for a larger file; if the metadata can't be read there's either some I/O error going on or a corrupted file.

Perhaps we could instead just pass in the file size, while still mandating that if Bytes are passed they must include the footer. We could then infer the range as file_size-reader.len()..file_size, and then errors could simply return the number of bytes needed from the tail of the file. This would perhaps solve the above issue with two new and very similar errors types.

}

/// Attempts to (asynchronously) parse the footer metadata (and optionally page indexes)
/// given a [`MetadataFetch`]. The file size must be known to use this function.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might also be good to note here that try_load will attempt to minimize the number of calls to fetch by prefetching but may make potentially multiple requests depending on how the data is laid out.

As an aside (and not changed in this PR), I found the use of MetadataFetch as basically an async version of ChunkReader confusing when trying to understand this API

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might also be good to note here that try_load will attempt to minimize the number of calls to fetch by prefetching but may make potentially multiple requests depending on how the data is laid out.

I can add a reference back to with_prefetch_hint where this is explained already.

As an aside (and not changed in this PR), I found the use of MetadataFetch as basically an async version of ChunkReader confusing when trying to understand this API

I'll admit to not being well versed in the subtleties of async code. And I am trying for a drop-in replacement for MetadataLoader initially. Do you think using AsyncFileReader would be cleaner/clearer?

@alamb
Copy link
Contributor

alamb commented Sep 19, 2024

The TODO with the ability to load from a byte range that only has the metadata is the one piece missing for this to satisfy our use case, but we can easily work around that by passing in a ChunkReader that lies about the offsets it's loading from and errors if you try to access an offset outside of it's range.

@adriangb I also found the use of "offsets" confusing as they parquet metadata has and uses offsets that are always "absolute" offsets within the overall file. Maybe we can make / update the example in #6081. Speaking of which, I will go do that now.

@etseidl
Copy link
Contributor Author

etseidl commented Sep 19, 2024

Thank you @adriangb @Xuanwo @alamb for the helpful comments 🙏 . I'll incorporate your suggestions and then resubmit a PR with just the ParquetMetaDataReader. Once that's mergeworthy I can start deprecating things.

I'll keep this draft open for a time in case others want to chime in.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
parquet Changes to the parquet crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants