-
Notifications
You must be signed in to change notification settings - Fork 86
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
First version of ak._v2.from_parquet #1338
Conversation
Codecov Report
|
@martindurant, I've hit a dead-end with the pyarrow Dataset API: it can't read nested columns. Here's an example of something that works: import fsspec
import pyarrow.parquet
filename = "s3://pivarski-princeton/millionsongs/millionsongs-A-zstd.parquet"
fs, _, _ = fsspec.get_fs_token_paths(filename, mode="rb")
f = fs.open(filename, "rb")
q = pyarrow.parquet.ParquetFile(f)
t = q.read(["hdf5_filename", "analysis.bars.list.item.confidence"]) Now
in which Suppose I want to do that with the Dataset API. import pyarrow.dataset
fs, _, paths = fsspec.get_fs_token_paths(filename, mode="rb")
q = pyarrow.dataset.dataset(paths[0], format="parquet", filesystem=fs) Now It has a I have no problem asking for a non-nested column: q.scanner(columns=["hdf5_filename"]).to_table() but if I want a nested column, I seem to be out of luck: q.scanner(columns=["analysis.bars.list.item.confidence"]) fails with
It has wrapped the whole string q.scanner(columns=["analysis.bars.item.confidence"]) # and
q.scanner(columns=["analysis.bars.confidence"]) don't work, either. This works: q.scanner(columns=["analysis"]) but it's the whole I could go back to the Do you have any ideas? I'm stuck. Last-ditch attempt: maybe I can construct a |
There's quite a lot here, I will try to answer what I can. Firstly, yes, arrow does a little magic in the dataset API, but not as much as you might think. In particular, arrow only implements FS interfaces for local, hdfs and s3. For everything else, you use fsspec and the same file-like API with seeks. fsspec, however, does provide caching of bytes. The normal operation for a file is read-ahead mode (other strategies are available), so you drastically reduce the number of reads depending on the configured blocksize. This works well when you are reading all the columns, so scanning through the file linearly, but it's still a serial process. This performs really badly when you want disparate few columns amongst many. ... but we were ahead of you, you should see fsspec.parquet. This takes a parquet file's metadata and selection of columns, and prospectively fetches all of the pieces concurrently (merging neighbouring ranges) before constructing a file-like which will not need to fetch any more bytes from remote. In some cases, this can provide a 5x speedup over pyarrow/fastparquet. It does not allow for dotted columns, but the code is simple and we can easily change it. |
I guess, then, I should design around I have fsspec version 2022.02.0. Is anything that I need too new to develop with that? Would it be reasonable/unreasonable for me to provide an interface that necessarily fetches the Form (converted from Parquet Schema) as a separate function, and therefore a separate round-trip? If the schema is coming from a ak.form_from_parquet(...)
ak.from_parquet(...) functions simplifies that at the expense of always needing two round-trips and maybe losing some of the caching you're putting into |
Yes, the separate call to get the metadata and therefore the form is probably unavoidable, and the dask variant would use this standalone anyway (because the data loading is deferred to tasks in the graph). You would want to keep hold of the file footer bytes or metadata object get the bytes ranges for the specifically selected columns. Again, this needs some development in fsspec, but something similar is already possible in the dask parquet engines. |
@martindurant I think this is a pretty good interface/implementation. Let me know what you think! First, a function named " >>> import awkward as ak
>>> form, metadata = ak._v2.metadata_from_parquet("s3://pivarski-princeton/millionsongs")
/home/jpivarski/mambaforge/lib/python3.9/site-packages/fsspec/parquet.py:225: UserWarning: Not enough data was used to sample the parquet footer. Try setting footer_sample_size >= 10218222.
warnings.warn( (I like that warning; it's almost good enough to copy-paste into the command.) >>> form, metadata = ak._v2.metadata_from_parquet(
... "s3://pivarski-princeton/millionsongs",
... footer_sample_size=10218222,
... ) The metadata is just the standard metadata object, and this Form has a lot of columns. >>> metadata
<pyarrow._parquet.FileMetaData object at 0x7fc196651c70>
created_by: parquet-cpp version 1.5.1-SNAPSHOT
num_columns: 72
num_rows: 1000000
num_row_groups: 1015
format_version: 1.0
serialized_size: 10218214
>>> for x in form.columns(): print(x)
...
hdf5_filename
analysis.bars.confidence
analysis.bars.start
analysis.beats.confidence
analysis.beats.start
analysis.sections.confidence
analysis.sections.start
analysis.segments.confidence
analysis.segments.start
analysis.segments.loudness_start
analysis.segments.loudness_max
analysis.segments.loudness_max_time
analysis.segments.pitches
analysis.segments.timbre
analysis.tatums.confidence
analysis.tatums.start
analysis.analysis_sample_rate
analysis.audio_md5
analysis.danceability
analysis.duration
analysis.energy
analysis.key
analysis.key_confidence
analysis.loudness
analysis.mode
analysis.mode_confidence
analysis.end_of_fade_in
analysis.start_of_fade_out
analysis.tempo
analysis.time_signature
analysis.time_signature_confidence
analysis.track_id
analysis.idx_bars_confidence
analysis.idx_bars_start
analysis.idx_beats_confidence
analysis.idx_beats_start
analysis.idx_sections_confidence
analysis.idx_sections_start
analysis.idx_segments_confidence
analysis.idx_segments_start
analysis.idx_segments_loudness_start
analysis.idx_segments_loudness_max
analysis.idx_segments_loudness_max_time
analysis.idx_segments_pitches
analysis.idx_segments_timbre
analysis.idx_tatums_confidence
analysis.idx_tatums_start
metadata.song_id
metadata.title
metadata.release
metadata.hotttnesss
metadata.artist_id
metadata.artist_name
metadata.artist_mbid
metadata.artist_playmeid
metadata.artist_familiarity
metadata.artist_hotttnesss
metadata.artist_location
metadata.artist_latitude
metadata.artist_longitude
metadata.artist_terms.term
metadata.artist_terms.freq
metadata.artist_terms.weight
metadata.artist_idx_artist_terms
metadata.artist_idx_similar_artists
metadata.artist_7digitalid
metadata.track_7digitalid
metadata.release_7digitalid
musicbrainz.artist_mbtags
musicbrainz.artist_mbtags_count
musicbrainz.idx_artist_mbtags
musicbrainz.year We can now make subforms by passing a selector (glob patterns, brace expansion, and a list is taken to mean "or"): >>> subform = form.select_columns("analysis.sections.*")
>>> subform.columns()
['analysis.sections.confidence', 'analysis.sections.start']
>>> print(subform)
{
"class": "RecordArray",
"contents": {
"analysis": {
"class": "BitMaskedArray",
"mask": "u8",
"valid_when": true,
"lsb_order": true,
"content": {
"class": "RecordArray",
"contents": {
"sections": {
"class": "ListOffsetArray",
"offsets": "i32",
"content": {
"class": "RecordArray",
"contents": {
"confidence": "float64",
"start": "float64"
}
}
}
}
}
}
}
} This selector is used by >>> array = ak._v2.from_parquet(
... "s3://pivarski-princeton/millionsongs",
... footer_sample_size=10218222,
... columns="analysis.sections.*",
... ) About 2 minutes later, we get an array. (It's a big file, and we're slicing across all million songs.) >>> array
<Array [{analysis: {sections: [...]}}, {...}, ..., {...}] type='1000000 * {...'>
>>> print(array.type)
1000000 * {analysis: {sections: var * {confidence: float64, start: float64}}}
>>> array.show()
{analysis: {sections: [{confidence: 1, start: 0}, ..., {...}]}},
{analysis: {sections: [{confidence: 1, start: 0}, ..., {...}]}},
{analysis: {sections: [{confidence: 1, start: 0}, ..., {...}]}},
{analysis: {sections: [{confidence: 1, start: 0}, ..., {...}]}},
{analysis: {sections: [{confidence: 1, start: 0}, ..., {...}]}},
{analysis: {sections: [{confidence: 1, start: 0}, ..., {...}]}},
{analysis: {sections: [{confidence: 1, start: 0}, ..., {...}]}},
{analysis: {sections: [{confidence: 1, start: 0}, ..., {...}]}},
{analysis: {sections: [{confidence: 1, start: 0}, ..., {...}]}},
{analysis: {sections: [{confidence: 1, start: 0}, ..., {...}]}},
...,
{analysis: {sections: [{confidence: 1, start: 0}, ..., {...}]}},
{analysis: {sections: [{confidence: 1, start: 0}, ..., {...}]}},
{analysis: {sections: [{confidence: 1, start: 0}, ..., {...}]}},
{analysis: {sections: [{confidence: 1, start: 0}, ..., {...}]}},
{analysis: {sections: [{confidence: 1, start: 0}, ..., {...}]}},
{analysis: {sections: [{confidence: 1, start: 0}, ..., {...}]}},
{analysis: {sections: [{confidence: 1, start: 0}, ..., {...}]}},
{analysis: {sections: [{confidence: 1, start: 0}, ..., {...}]}},
{analysis: {sections: [{confidence: 1, start: 0}, ..., {...}]}}]
>>> array.analysis.sections.show()
[{confidence: 1, start: 0}, {...}, ..., {confidence: 0.268, start: 203}],
[{confidence: 1, start: 0}, {...}, ..., {...}, {confidence: 0.768, start: 96}],
[{confidence: 1, start: 0}, {...}, ..., {...}, {confidence: 0.62, start: 198}],
[{confidence: 1, start: 0}, {...}, ..., {...}, {confidence: 0.52, start: 188}],
[{confidence: 1, start: 0}, {...}, ..., {...}, {confidence: 1, start: 140}],
[{confidence: 1, start: 0}, {...}, ..., {confidence: 0.426, start: 238}],
[{confidence: 1, start: 0}, {...}, ..., {confidence: 0.366, start: 194}],
[{confidence: 1, start: 0}, {...}, ..., {confidence: 0.161, start: 377}],
[{confidence: 1, start: 0}, {...}, ..., {confidence: 0.457, start: 143}],
[{confidence: 1, start: 0}, {...}, ..., {confidence: 0.363, start: 169}],
...,
[{confidence: 1, start: 0}, {...}, ..., {confidence: 0.767, start: 184}],
[{confidence: 1, start: 0}, {...}, ..., {confidence: 0.022, start: 438}],
[{confidence: 1, start: 0}, {...}, ..., {confidence: 0.805, start: 300}],
[{confidence: 1, start: 0}, {...}, ..., {confidence: 0.425, start: 315}],
[{confidence: 1, start: 0}, {...}, ..., {...}, {confidence: 0.87, start: 220}],
[{confidence: 1, start: 0}, {...}, ..., {...}, {confidence: 1, start: 247}],
[{confidence: 1, start: 0}, {...}, ..., {confidence: 0.707, start: 114}],
[{confidence: 1, start: 0}, {...}, ..., {confidence: 0.938, start: 197}],
[{confidence: 1, start: 0}, {...}, ..., {confidence: 0.678, start: 470}]] At no point did we need to specify that nested lists are represented by |
Actually, that metadata function should also return the list of matched paths. And maybe the selector for row groups should take a single list of global integers, instead of a list of lists of row groups in each file. |
I'll have a look early next week.
Since this extra nesting level is ALWAYS a "repeated" type and the only member of an "optional list" type, we probably don't need to guess. I seem to recall that we've come across one other besides item/element. |
Just a couple of interface improvements before merging:
Examples: >>> import awkward as ak
>>> meta = ak._v2.metadata_from_parquet("s3://pivarski-princeton/millionsongs")
/home/jpivarski/mambaforge/lib/python3.9/site-packages/fsspec/parquet.py:225: UserWarning: Not enough data was used to sample the parquet footer. Try setting footer_sample_size >= 10218222.
warnings.warn(
>>> meta.paths
['pivarski-princeton/millionsongs/millionsongs-A-zstd.parquet', 'pivarski-princeton/millionsongs/millionsongs-B-zstd.parquet', 'pivarski-princeton/millionsongs/millionsongs-C-zstd.parquet', 'pivarski-princeton/millionsongs/millionsongs-D-zstd.parquet', 'pivarski-princeton/millionsongs/millionsongs-E-zstd.parquet', 'pivarski-princeton/millionsongs/millionsongs-F-zstd.parquet', 'pivarski-princeton/millionsongs/millionsongs-G-zstd.parquet', 'pivarski-princeton/millionsongs/millionsongs-H-zstd.parquet', 'pivarski-princeton/millionsongs/millionsongs-I-zstd.parquet', 'pivarski-princeton/millionsongs/millionsongs-J-zstd.parquet', 'pivarski-princeton/millionsongs/millionsongs-K-zstd.parquet', 'pivarski-princeton/millionsongs/millionsongs-L-zstd.parquet', 'pivarski-princeton/millionsongs/millionsongs-M-zstd.parquet', 'pivarski-princeton/millionsongs/millionsongs-N-zstd.parquet', 'pivarski-princeton/millionsongs/millionsongs-O-zstd.parquet', 'pivarski-princeton/millionsongs/millionsongs-P-zstd.parquet', 'pivarski-princeton/millionsongs/millionsongs-Q-zstd.parquet', 'pivarski-princeton/millionsongs/millionsongs-R-zstd.parquet', 'pivarski-princeton/millionsongs/millionsongs-S-zstd.parquet', 'pivarski-princeton/millionsongs/millionsongs-T-zstd.parquet', 'pivarski-princeton/millionsongs/millionsongs-U-zstd.parquet', 'pivarski-princeton/millionsongs/millionsongs-V-zstd.parquet', 'pivarski-princeton/millionsongs/millionsongs-W-zstd.parquet', 'pivarski-princeton/millionsongs/millionsongs-X-zstd.parquet', 'pivarski-princeton/millionsongs/millionsongs-Y-zstd.parquet', 'pivarski-princeton/millionsongs/millionsongs-Z-zstd.parquet']
>>> meta.metadata
<pyarrow._parquet.FileMetaData object at 0x7f9e4316e220>
created_by: parquet-cpp version 1.5.1-SNAPSHOT
num_columns: 72
num_rows: 1000000
num_row_groups: 1015
format_version: 1.0
serialized_size: 10218214
>>> array = ak._v2.from_parquet("s3://pivarski-princeton/millionsongs", columns="analysis.sections.*", row_groups={41, 40, 39})
>>> array
<Array [{analysis: {sections: [...]}}, {...}, ..., {...}] type='2100 * {ana...'> Now I'll be merging this because I need to be able to read Parquet files for a performance study. |
No description provided.