Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First version of ak._v2.from_parquet #1338

Merged
merged 12 commits into from
Mar 15, 2022
Merged

Conversation

jpivarski
Copy link
Member

No description provided.

@codecov
Copy link

codecov bot commented Mar 3, 2022

Codecov Report

Merging #1338 (97ab44c) into main (b2fd2be) will decrease coverage by 0.82%.
The diff coverage is 47.31%.

Impacted Files Coverage Δ
src/awkward/_v2/_connect/cling.py 0.00% <0.00%> (ø)
src/awkward/_v2/_lookup.py 97.50% <0.00%> (ø)
src/awkward/_v2/_prettyprint.py 66.09% <0.00%> (+2.29%) ⬆️
src/awkward/_v2/_typetracer.py 69.14% <0.00%> (ø)
src/awkward/_v2/identifier.py 55.69% <0.00%> (ø)
src/awkward/_v2/index.py 83.59% <0.00%> (ø)
src/awkward/_v2/operations/convert/ak_from_jax.py 75.00% <0.00%> (ø)
src/awkward/_v2/operations/convert/ak_to_jax.py 75.00% <0.00%> (ø)
src/awkward/_v2/operations/convert/ak_to_pandas.py 75.00% <0.00%> (ø)
src/awkward/_v2/operations/io/ak_from_parquet.py 75.00% <0.00%> (ø)
... and 143 more

@jpivarski
Copy link
Member Author

@martindurant, I've hit a dead-end with the pyarrow Dataset API: it can't read nested columns.

Here's an example of something that works:

import fsspec
import pyarrow.parquet

filename = "s3://pivarski-princeton/millionsongs/millionsongs-A-zstd.parquet"
fs, _, _ = fsspec.get_fs_token_paths(filename, mode="rb")
f = fs.open(filename, "rb")

q = pyarrow.parquet.ParquetFile(f)
t = q.read(["hdf5_filename", "analysis.bars.list.item.confidence"])

Now t is

pyarrow.Table
hdf5_filename: string not null
analysis: struct<bars: list<item: struct<confidence: double not null>> not null> not null
  child 0, bars: list<item: struct<confidence: double not null>> not null
      child 0, item: struct<confidence: double not null>
          child 0, confidence: double not null

in which hdf5_filename was a non-nested column and confidence was drawn from variable-length lists of the bars field of the analysis column. This allowed us to avoid reading all of the other analysis and bars substructure.

Suppose I want to do that with the Dataset API.

import pyarrow.dataset
fs, _, paths = fsspec.get_fs_token_paths(filename, mode="rb")
q = pyarrow.dataset.dataset(paths[0], format="parquet", filesystem=fs)

Now q is a FileSystemDataset (Dataset docs) that can give me the schema (and it's an Arrow schema, not Parquet, so I can run ak._v2._connect.pyarrow.form_handle_arrow on it), but it doesn't have a function like read where I can give it Parquet column names.

It has a scanner() (Scanner docs), which takes a list of columns, but these are not Parquet column names.

I have no problem asking for a non-nested column:

q.scanner(columns=["hdf5_filename"]).to_table()

but if I want a nested column, I seem to be out of luck:

q.scanner(columns=["analysis.bars.list.item.confidence"])

fails with

pyarrow.lib.ArrowInvalid: No match for FieldRef.Name(analysis.bars.list.item.confidence) in
... (whole schema)

It has wrapped the whole string "analysis.bars.list.item.confidence" in FieldRef.Name, and that's wrong: only "analysis" is the field name in the Arrow schema. (The long string is a Parquet column name.)

q.scanner(columns=["analysis.bars.item.confidence"])  # and
q.scanner(columns=["analysis.bars.confidence"])

don't work, either.

This works:

q.scanner(columns=["analysis"])

but it's the whole "analysis" subtree, which is a lot of columns. I can't seem to find any

I could go back to the pyarrow.parquet.ParquetFile class, since it does take a file-like object as a source. That's why it accepted the fs.open(filename, "rb"), even though fs was on S3. But if it's going through a file-like interface, then every seek + read is a remote round-trip, and I assume that the Dataset API is doing something better, like batching requests.

Do you have any ideas? I'm stuck.

Last-ditch attempt: maybe I can construct a pyarrow.compute function? There's a struct_field function (docs), but it doesn't seem to exist in 6.0.1. If that's a pyarrow 7.0.0 thing and that's the only way to get this feature, then so be it. But it would be a big pain to construct those compute expressions! And I just wrote a whole nested-column-selecting thing for Forms based on the dot syntax (with or without the spurious "list.item." infixes).

@martindurant
Copy link
Contributor

There's quite a lot here, I will try to answer what I can.

Firstly, yes, arrow does a little magic in the dataset API, but not as much as you might think. In particular, arrow only implements FS interfaces for local, hdfs and s3. For everything else, you use fsspec and the same file-like API with seeks.

fsspec, however, does provide caching of bytes. The normal operation for a file is read-ahead mode (other strategies are available), so you drastically reduce the number of reads depending on the configured blocksize. This works well when you are reading all the columns, so scanning through the file linearly, but it's still a serial process. This performs really badly when you want disparate few columns amongst many.

... but we were ahead of you, you should see fsspec.parquet. This takes a parquet file's metadata and selection of columns, and prospectively fetches all of the pieces concurrently (merging neighbouring ranges) before constructing a file-like which will not need to fetch any more bytes from remote. In some cases, this can provide a 5x speedup over pyarrow/fastparquet. It does not allow for dotted columns, but the code is simple and we can easily change it.

@jpivarski
Copy link
Member Author

I guess, then, I should design around fsspec.parquet! The PyarrowEngine uses pyarrow.parquet.ParquetFile, which has all the interface I need, so it should recognize my columns specification:

https://github.com/fsspec/filesystem_spec/blob/732caa0591937152ae6793220df1a2cab6d25297/fsspec/parquet.py#L504

I have fsspec version 2022.02.0. Is anything that I need too new to develop with that?

Would it be reasonable/unreasonable for me to provide an interface that necessarily fetches the Form (converted from Parquet Schema) as a separate function, and therefore a separate round-trip? If the schema is coming from a _metadata file, I think it might be a separate round-trip anyway. The reason I'd rather provide two functions is because a user is going to need a way to find out what columns exist in order to know which ones to ask for. Doing that while maintaining an open connection means introducing an OpenParquetFile object that users have to know isn't the same thing as an Array object. Providing separate

ak.form_from_parquet(...)
ak.from_parquet(...)

functions simplifies that at the expense of always needing two round-trips and maybe losing some of the caching you're putting into fsspec.parquet. (dask-awkward will be a user of these functions, just as single-threaded Awkward users will be.)

@martindurant
Copy link
Contributor

Yes, the separate call to get the metadata and therefore the form is probably unavoidable, and the dask variant would use this standalone anyway (because the data loading is deferred to tasks in the graph). You would want to keep hold of the file footer bytes or metadata object get the bytes ranges for the specifically selected columns. Again, this needs some development in fsspec, but something similar is already possible in the dask parquet engines.

@jpivarski jpivarski marked this pull request as ready for review March 11, 2022 21:32
@jpivarski
Copy link
Member Author

@martindurant I think this is a pretty good interface/implementation. Let me know what you think!

First, a function named "form_from_parquet" is likely to be misspelled, and we'd probably want more than just the Form, anyway, so ak._v2.metadata_from_parquet returns the Awkward Form and the Parquet metadata.

>>> import awkward as ak
>>> form, metadata = ak._v2.metadata_from_parquet("s3://pivarski-princeton/millionsongs")
/home/jpivarski/mambaforge/lib/python3.9/site-packages/fsspec/parquet.py:225: UserWarning: Not enough data was used to sample the parquet footer. Try setting footer_sample_size >= 10218222.
  warnings.warn(

(I like that warning; it's almost good enough to copy-paste into the command.)

>>> form, metadata = ak._v2.metadata_from_parquet(
...     "s3://pivarski-princeton/millionsongs",
...     footer_sample_size=10218222,
... )

The metadata is just the standard metadata object, and this Form has a lot of columns.

>>> metadata
<pyarrow._parquet.FileMetaData object at 0x7fc196651c70>
  created_by: parquet-cpp version 1.5.1-SNAPSHOT
  num_columns: 72
  num_rows: 1000000
  num_row_groups: 1015
  format_version: 1.0
  serialized_size: 10218214

>>> for x in form.columns(): print(x)
... 
hdf5_filename
analysis.bars.confidence
analysis.bars.start
analysis.beats.confidence
analysis.beats.start
analysis.sections.confidence
analysis.sections.start
analysis.segments.confidence
analysis.segments.start
analysis.segments.loudness_start
analysis.segments.loudness_max
analysis.segments.loudness_max_time
analysis.segments.pitches
analysis.segments.timbre
analysis.tatums.confidence
analysis.tatums.start
analysis.analysis_sample_rate
analysis.audio_md5
analysis.danceability
analysis.duration
analysis.energy
analysis.key
analysis.key_confidence
analysis.loudness
analysis.mode
analysis.mode_confidence
analysis.end_of_fade_in
analysis.start_of_fade_out
analysis.tempo
analysis.time_signature
analysis.time_signature_confidence
analysis.track_id
analysis.idx_bars_confidence
analysis.idx_bars_start
analysis.idx_beats_confidence
analysis.idx_beats_start
analysis.idx_sections_confidence
analysis.idx_sections_start
analysis.idx_segments_confidence
analysis.idx_segments_start
analysis.idx_segments_loudness_start
analysis.idx_segments_loudness_max
analysis.idx_segments_loudness_max_time
analysis.idx_segments_pitches
analysis.idx_segments_timbre
analysis.idx_tatums_confidence
analysis.idx_tatums_start
metadata.song_id
metadata.title
metadata.release
metadata.hotttnesss
metadata.artist_id
metadata.artist_name
metadata.artist_mbid
metadata.artist_playmeid
metadata.artist_familiarity
metadata.artist_hotttnesss
metadata.artist_location
metadata.artist_latitude
metadata.artist_longitude
metadata.artist_terms.term
metadata.artist_terms.freq
metadata.artist_terms.weight
metadata.artist_idx_artist_terms
metadata.artist_idx_similar_artists
metadata.artist_7digitalid
metadata.track_7digitalid
metadata.release_7digitalid
musicbrainz.artist_mbtags
musicbrainz.artist_mbtags_count
musicbrainz.idx_artist_mbtags
musicbrainz.year

We can now make subforms by passing a selector (glob patterns, brace expansion, and a list is taken to mean "or"):

>>> subform = form.select_columns("analysis.sections.*")

>>> subform.columns()
['analysis.sections.confidence', 'analysis.sections.start']

>>> print(subform)
{
    "class": "RecordArray",
    "contents": {
        "analysis": {
            "class": "BitMaskedArray",
            "mask": "u8",
            "valid_when": true,
            "lsb_order": true,
            "content": {
                "class": "RecordArray",
                "contents": {
                    "sections": {
                        "class": "ListOffsetArray",
                        "offsets": "i32",
                        "content": {
                            "class": "RecordArray",
                            "contents": {
                                "confidence": "float64",
                                "start": "float64"
                            }
                        }
                    }
                }
            }
        }
    }
}

This selector is used by ak._v2.from_parquet.

>>> array = ak._v2.from_parquet(
...     "s3://pivarski-princeton/millionsongs",
...     footer_sample_size=10218222,
...     columns="analysis.sections.*",
... )

About 2 minutes later, we get an array. (It's a big file, and we're slicing across all million songs.)

>>> array
<Array [{analysis: {sections: [...]}}, {...}, ..., {...}] type='1000000 * {...'>

>>> print(array.type)
1000000 * {analysis: {sections: var * {confidence: float64, start: float64}}}

>>> array.show()
{analysis: {sections: [{confidence: 1, start: 0}, ..., {...}]}},
{analysis: {sections: [{confidence: 1, start: 0}, ..., {...}]}},
{analysis: {sections: [{confidence: 1, start: 0}, ..., {...}]}},
{analysis: {sections: [{confidence: 1, start: 0}, ..., {...}]}},
{analysis: {sections: [{confidence: 1, start: 0}, ..., {...}]}},
{analysis: {sections: [{confidence: 1, start: 0}, ..., {...}]}},
{analysis: {sections: [{confidence: 1, start: 0}, ..., {...}]}},
{analysis: {sections: [{confidence: 1, start: 0}, ..., {...}]}},
{analysis: {sections: [{confidence: 1, start: 0}, ..., {...}]}},
{analysis: {sections: [{confidence: 1, start: 0}, ..., {...}]}},
...,
{analysis: {sections: [{confidence: 1, start: 0}, ..., {...}]}},
{analysis: {sections: [{confidence: 1, start: 0}, ..., {...}]}},
{analysis: {sections: [{confidence: 1, start: 0}, ..., {...}]}},
{analysis: {sections: [{confidence: 1, start: 0}, ..., {...}]}},
{analysis: {sections: [{confidence: 1, start: 0}, ..., {...}]}},
{analysis: {sections: [{confidence: 1, start: 0}, ..., {...}]}},
{analysis: {sections: [{confidence: 1, start: 0}, ..., {...}]}},
{analysis: {sections: [{confidence: 1, start: 0}, ..., {...}]}},
{analysis: {sections: [{confidence: 1, start: 0}, ..., {...}]}}]

>>> array.analysis.sections.show()
[{confidence: 1, start: 0}, {...}, ..., {confidence: 0.268, start: 203}],
[{confidence: 1, start: 0}, {...}, ..., {...}, {confidence: 0.768, start: 96}],
[{confidence: 1, start: 0}, {...}, ..., {...}, {confidence: 0.62, start: 198}],
[{confidence: 1, start: 0}, {...}, ..., {...}, {confidence: 0.52, start: 188}],
[{confidence: 1, start: 0}, {...}, ..., {...}, {confidence: 1, start: 140}],
[{confidence: 1, start: 0}, {...}, ..., {confidence: 0.426, start: 238}],
[{confidence: 1, start: 0}, {...}, ..., {confidence: 0.366, start: 194}],
[{confidence: 1, start: 0}, {...}, ..., {confidence: 0.161, start: 377}],
[{confidence: 1, start: 0}, {...}, ..., {confidence: 0.457, start: 143}],
[{confidence: 1, start: 0}, {...}, ..., {confidence: 0.363, start: 169}],
...,
[{confidence: 1, start: 0}, {...}, ..., {confidence: 0.767, start: 184}],
[{confidence: 1, start: 0}, {...}, ..., {confidence: 0.022, start: 438}],
[{confidence: 1, start: 0}, {...}, ..., {confidence: 0.805, start: 300}],
[{confidence: 1, start: 0}, {...}, ..., {confidence: 0.425, start: 315}],
[{confidence: 1, start: 0}, {...}, ..., {...}, {confidence: 0.87, start: 220}],
[{confidence: 1, start: 0}, {...}, ..., {...}, {confidence: 1, start: 247}],
[{confidence: 1, start: 0}, {...}, ..., {confidence: 0.707, start: 114}],
[{confidence: 1, start: 0}, {...}, ..., {confidence: 0.938, start: 197}],
[{confidence: 1, start: 0}, {...}, ..., {confidence: 0.678, start: 470}]]

At no point did we need to specify that nested lists are represented by "list.item" in this Parquet file. That's an option (list_indicator), and hopefully, we can get its value from the metadata somehow. The brute force approach would be to try "list.item" and "element" and any other standards, seeing if applying them to the Form results in matches in the Parquet metadata's column list. Or just by looking up a known list type and seeing what's there.

@jpivarski
Copy link
Member Author

Actually, that metadata function should also return the list of matched paths.

And maybe the selector for row groups should take a single list of global integers, instead of a list of lists of row groups in each file.

@martindurant
Copy link
Contributor

I'll have a look early next week.

At no point did we need to specify that nested lists are represented by "list.item" in this Parquet file. That's an option (list_indicator), and hopefully, we can get its value from the metadata somehow. The brute force approach would be to try "list.item" and "element" and any other standards

Since this extra nesting level is ALWAYS a "repeated" type and the only member of an "optional list" type, we probably don't need to guess. I seem to recall that we've come across one other besides item/element.

@jpivarski
Copy link
Member Author

Just a couple of interface improvements before merging:

  • ak._v2.metadata_from_parquet returns a namedtuple with form, fs, paths, and Parquet metadata, leaving open the possibility that we'll want to add more items.
  • ak._v2.from_parquet takes row_groups as a simple list (or set, because order is ignored) of global values.
  • list_indicator is still hard-coded as "list.item". That's a FIXME for after this PR.

Examples:

>>> import awkward as ak
>>> meta = ak._v2.metadata_from_parquet("s3://pivarski-princeton/millionsongs")
/home/jpivarski/mambaforge/lib/python3.9/site-packages/fsspec/parquet.py:225: UserWarning: Not enough data was used to sample the parquet footer. Try setting footer_sample_size >= 10218222.
  warnings.warn(
>>> meta.paths
['pivarski-princeton/millionsongs/millionsongs-A-zstd.parquet', 'pivarski-princeton/millionsongs/millionsongs-B-zstd.parquet', 'pivarski-princeton/millionsongs/millionsongs-C-zstd.parquet', 'pivarski-princeton/millionsongs/millionsongs-D-zstd.parquet', 'pivarski-princeton/millionsongs/millionsongs-E-zstd.parquet', 'pivarski-princeton/millionsongs/millionsongs-F-zstd.parquet', 'pivarski-princeton/millionsongs/millionsongs-G-zstd.parquet', 'pivarski-princeton/millionsongs/millionsongs-H-zstd.parquet', 'pivarski-princeton/millionsongs/millionsongs-I-zstd.parquet', 'pivarski-princeton/millionsongs/millionsongs-J-zstd.parquet', 'pivarski-princeton/millionsongs/millionsongs-K-zstd.parquet', 'pivarski-princeton/millionsongs/millionsongs-L-zstd.parquet', 'pivarski-princeton/millionsongs/millionsongs-M-zstd.parquet', 'pivarski-princeton/millionsongs/millionsongs-N-zstd.parquet', 'pivarski-princeton/millionsongs/millionsongs-O-zstd.parquet', 'pivarski-princeton/millionsongs/millionsongs-P-zstd.parquet', 'pivarski-princeton/millionsongs/millionsongs-Q-zstd.parquet', 'pivarski-princeton/millionsongs/millionsongs-R-zstd.parquet', 'pivarski-princeton/millionsongs/millionsongs-S-zstd.parquet', 'pivarski-princeton/millionsongs/millionsongs-T-zstd.parquet', 'pivarski-princeton/millionsongs/millionsongs-U-zstd.parquet', 'pivarski-princeton/millionsongs/millionsongs-V-zstd.parquet', 'pivarski-princeton/millionsongs/millionsongs-W-zstd.parquet', 'pivarski-princeton/millionsongs/millionsongs-X-zstd.parquet', 'pivarski-princeton/millionsongs/millionsongs-Y-zstd.parquet', 'pivarski-princeton/millionsongs/millionsongs-Z-zstd.parquet']
>>> meta.metadata
<pyarrow._parquet.FileMetaData object at 0x7f9e4316e220>
  created_by: parquet-cpp version 1.5.1-SNAPSHOT
  num_columns: 72
  num_rows: 1000000
  num_row_groups: 1015
  format_version: 1.0
  serialized_size: 10218214
>>> array = ak._v2.from_parquet("s3://pivarski-princeton/millionsongs", columns="analysis.sections.*", row_groups={41, 40, 39})
>>> array
<Array [{analysis: {sections: [...]}}, {...}, ..., {...}] type='2100 * {ana...'>

Now I'll be merging this because I need to be able to read Parquet files for a performance study.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants