Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add BatchConverter implementations for pandas types, use Batched DoFns in DataFrame convert utilities #22575

Merged
merged 10 commits into from
Aug 31, 2022
4 changes: 1 addition & 3 deletions sdks/python/apache_beam/dataframe/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,7 @@ def process(self, element: pd.Series) -> Iterable[pd.Series]:
yield element

def infer_output_type(self, input_element_type):
# Raise a TypeError if proxy has an unknown type
output_type = dtype_to_fieldtype(self._proxy.dtype)
return output_type
return dtype_to_fieldtype(self._proxy.dtype)


# TODO: Or should this be called from_dataframe?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,8 @@ def _get_series(self, batch: pd.DataFrame):
raise NotImplementedError

def explode_batch(self, batch: pd.DataFrame):
# TODO: Only do null checks for nullable types
# TODO(https://github.com/apache/beam/issues/22948): Only do null checks for
# nullable types
def make_null_checking_generator(series):
nulls = pd.isnull(series)
return (None if isnull else value for isnull, value in zip(nulls, series))
Expand Down Expand Up @@ -216,7 +217,6 @@ def _get_series(self, batch: pd.DataFrame):
return [batch[column] for column in batch.columns]

def produce_batch(self, elements):
# Note from_records has an index= parameter
batch = pd.DataFrame.from_records(elements, columns=self._columns)

for column, typehint in self._element_type._fields:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def test_typehint_validates(self):
typehints.validate_composite_type_param(self.batch_typehint, '')
typehints.validate_composite_type_param(self.element_typehint, '')

def test_type_check(self):
def test_type_check_batch(self):
typehints.check_constraint(self.normalized_batch_typehint, self.batch)

def test_type_check_element(self):
Expand All @@ -159,26 +159,46 @@ def test_explode_rebatch(self):
typehints.check_constraint(self.normalized_batch_typehint, rebatched)
self.equality_check(self.batch, rebatched)

def _split_batch_into_n_partitions(self, N):
elements = list(self.converter.explode_batch(self.batch))

# Split elements into N contiguous partitions
element_batches = [
elements[len(elements) * i // N:len(elements) * (i + 1) // N]
for i in range(N)
]

lengths = [len(element_batch) for element_batch in element_batches]
batches = [self.converter.produce_batch(element_batch)
for element_batch in element_batches]

return batches, lengths

@parameterized.expand([
(2, ),
(3, ),
(10, ),
])
def test_combine_batches(self, N):
elements = list(self.converter.explode_batch(self.batch))

# Split elements into N contiguous partitions, create a batch out of each
batches = [
self.converter.produce_batch(
elements[len(elements) * i // N:len(elements) * (i + 1) // N])
for i in range(N)
]
batches, _ = self._split_batch_into_n_partitions(N)

# Combine the batches, output should be equivalent to the original batch
combined = self.converter.combine_batches(batches)

self.equality_check(self.batch, combined)

@parameterized.expand([
(2, ),
(3, ),
(10, ),
])
def test_get_lenth(self, N):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def test_get_lenth(self, N):
def test_get_length(self, N):

batches, lengths = self._split_batch_into_n_partitions(N)

for batch, expected_length in zip(batches, lengths):
self.assertEqual(self.converter.get_length(batch), expected_length)


def test_equals(self):
self.assertTrue(self.converter == self.create_batch_converter())
self.assertTrue(self.create_batch_converter() == self.converter)
Expand Down