-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add BatchConverter implementations for pandas types, use Batched DoFns in DataFrame convert utilities #22575
Conversation
Run Python 3.8 PostCommit |
Run Python 3.7 PostCommit |
Codecov Report
@@ Coverage Diff @@
## master #22575 +/- ##
==========================================
- Coverage 74.19% 74.17% -0.02%
==========================================
Files 709 712 +3
Lines 93499 93802 +303
==========================================
+ Hits 69367 69582 +215
- Misses 22855 22943 +88
Partials 1277 1277
Flags with carried forward coverage won't be shown. Click here to find out more.
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
|
5c15cd2
to
09238a5
Compare
49e915a
to
1e2c800
Compare
1e2c800
to
865b23b
Compare
CC: @robertwb |
Run Python 3.8 PostCommit |
Assigning reviewers. If you would like to opt out of this review, comment R: @y1chi for label python. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
Reminder, please take a look at this pr: @y1chi |
@y1chi do you have time to review this? |
R: @yeandy |
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, LGTM. Left a few questions/comments
else: | ||
raise TypeError(f"Encountered unexpected type, left is a {type(left)!r}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also be checking against the type of right
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert_series_equal
or assert_frame_equal
will raise if right
isn't the appropriate type.
typehints.validate_composite_type_param(self.batch_typehint, '') | ||
typehints.validate_composite_type_param(self.element_typehint, '') | ||
|
||
def test_type_check(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def test_type_check(self): | |
def test_type_check_batch(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
raise NotImplementedError | ||
|
||
def explode_batch(self, batch: pd.DataFrame): | ||
# TODO: Only do null checks for nullable types |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there an issue for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is now :) (#22948)
|
||
def produce_batch(self, elements): | ||
# Note from_records has an index= parameter | ||
batch = pd.DataFrame.from_records(elements, columns=self._columns) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we use index=
parameter here? Is it so it's easier to set the data type in the next 2 lines?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that's right, I think the above comment was just a note to self as I was iterating on this. I dropped the comment.
def estimate_byte_size(self, batch: pd.DataFrame): | ||
return batch.memory_usage().sum() | ||
|
||
def get_length(self, batch: pd.DataFrame): | ||
return len(batch) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add tests for these? And also for SeriesBatchConverter
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, thank you! I also filed #22950 - we should have a standard test suite to test all the BatchConverter
implementations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
def explode_batch(self, batch: pd.Series): | ||
raise NotImplementedError( | ||
"explode_batch should be generated in SeriesBatchConverter.__init__") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is should this generated in __init__
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We branch on is_nullable
one time in __init__
and assign explode_batch with a null-checking or non-null-checking alternative.
all_series = self._get_series(batch) | ||
iterators = [make_null_checking_generator(series) for series in all_series] | ||
|
||
for values in zip(*iterators): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we zip the self._columns
along with the iterators
? Might make it harder to read though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Im not quite sure what this would look like, could you clarify?
Related: It would be good to add a microbenchmark for produce_batch and explode_batch so we can easily evaluate alternative implementations. But I'd prefer to leave that as future work. For now this just preserves the implementation from apache_beam.dataframe.schemas
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was originally thinking of
for values, columns in zip(*iterators, self._columns):
...
But I had to take a look again to wrap my head around it. Looks like you're zipping to create the rows first, and then in the second zip, you line them up with the column names. The length of an individual iterator in iterators
isn't necessarily the same as the length of self._columns
. Plus, we'd probably get too many values to unpack
error if we had values, columns
.
return SeriesBatchConverter.from_typehints( | ||
element_type=element_type, batch_type=batch_type) | ||
|
||
return None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happens if we return None
? Do we have checks in other places to detect for a None
BatchConvertor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, this is handled when we try construction all the registered implementations:
beam/sdks/python/apache_beam/typehints/batch.py
Lines 77 to 90 in 7cc48e9
def from_typehints(*, element_type, batch_type) -> 'BatchConverter': | |
element_type = typehints.normalize(element_type) | |
batch_type = typehints.normalize(batch_type) | |
for constructor in BATCH_CONVERTER_REGISTRY: | |
result = constructor(element_type, batch_type) | |
if result is not None: | |
return result | |
# TODO(https://github.com/apache/beam/issues/21654): Aggregate error | |
# information from the failed BatchConverter matches instead of this | |
# generic error. | |
raise TypeError( | |
f"Unable to find BatchConverter for element_type {element_type!r} and " | |
f"batch_type {batch_type!r}") |
Note this is very naive right now. In the future this should include helpful debug information to handle cases where one or more implementations almost matches. Tracked in #21654
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, thanks!
yield element | ||
|
||
def infer_output_type(self, input_element_type): | ||
# Raise a TypeError if proxy has an unknown type |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I may have missed this, but where does the error get raised?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, this comment references behavior that was removed in 2b0597e
Now we will just shunt to Any in this case. I removed the comment. Thanks for raising this!
self.assertTrue(self.converter == self.create_batch_converter()) | ||
self.assertTrue(self.create_batch_converter() == self.converter) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain the purpose of checking the equality both ways?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just being overly cautious - in theory the instances on either side could be a different type and could have a different __eq__
implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
if is_nullable(element_type): | ||
|
||
def unbatch(series): | ||
for isnull, value in zip(pd.isnull(series), series): | ||
yield None if isnull else value | ||
else: | ||
|
||
def unbatch(series): | ||
yield from series |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit. I actually don't mind the extra lines, especially since we're defining functions here, so it's easier to read. I'll leave it up to you.
if is_nullable(element_type): | |
def unbatch(series): | |
for isnull, value in zip(pd.isnull(series), series): | |
yield None if isnull else value | |
else: | |
def unbatch(series): | |
yield from series | |
if is_nullable(element_type): | |
def unbatch(series): | |
for isnull, value in zip(pd.isnull(series), series): | |
yield None if isnull else value | |
else: | |
def unbatch(series): | |
yield from series |
(3, ), | ||
(10, ), | ||
]) | ||
def test_get_lenth(self, N): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def test_get_lenth(self, N): | |
def test_get_length(self, N): |
def estimate_byte_size(self, batch: pd.DataFrame): | ||
return batch.memory_usage().sum() | ||
|
||
def get_length(self, batch: pd.DataFrame): | ||
return len(batch) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
Run Python 3.8 PostCommit |
Run Python Examples_Direct |
Run Python Examples_Dataflow |
retest this please |
1 similar comment
retest this please |
Run Python Examples_Direct |
Run Python Examples_Dataflow |
Run Python 3.8 PostCommit |
PythonDocs PreCommit has passed (https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Commit/9575/), merging |
Fixes #22678
This PR moves the pandas - Beam type mapping from
apache_beam.dataframe.schemas
toapache_beam.typehints.pandas_type_compatibility
, and modifiesapache_beam.dataframe.convert
to leverage that mapping by defining batch-producing and -consuming DoFns.The new module now provides
BatchConverter
implementations that can be re-used in other DoFns that wish to process structured data using the pandas API. It also makes one slight modification in the type mapping: we now have a special field option,beam:dataframe:index:v1
. This option is used to indicate that a Beam schema field should map to an index in the pandas DataFrame type system. If a Beam schema has no fields identified as an index, then we assume the user does not care about the index, and a "meaningless" one will be generated when mapping to DataFrames. Similarly when mapping a DataFrame back to the Beam type system, the index will be dropped.Note
apache_beam.dataframe.schemas
still exists, for two purposes:BatchRowsAsDataFrame
andUnbatchPandas
transforms. These transforms are no longer used inapache_beam.dataframe.convert
though.generate_proxy
,element_type_from_dataframe
). These functions are still used inapache_beam.dataframe.convert
.All of the logic in
apache_beam.dataframe.schemas
defers toapache_beam.typehints.pandas_type_compatibility
as much as possible.The following PRs were separated from this one to ease review:
dataframe.schemas_test
#22630apache_beam.dataframe.schemas
#22680GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.