From aeb7661f2726c2fda62992f7b94453b04d53208f Mon Sep 17 00:00:00 2001 From: Anand Inguva Date: Wed, 5 Jul 2023 21:24:58 -0400 Subject: [PATCH 1/8] Create empty ListBuffer when buffer is none --- .../apache_beam/runners/portability/fn_api_runner/fn_runner.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py index 8d957068d08b0..4d5ff6b998627 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py @@ -852,6 +852,9 @@ def _execute_bundle(self, (consuming_stage_name, consuming_transform, buffer_id)) # We enqueue all of the pending output buffers to be scheduled at the # MAX_TIMESTAMP for the downstream stage. + # buffer is None. Empty PCollection. Bug + if not buffer: + buffer = ListBuffer(None) runner_execution_context.queues.watermark_pending_inputs.enque( ((consuming_stage_name, timestamp.MAX_TIMESTAMP), DataInput({consuming_transform: buffer}, {}))) # type: ignore From 22c4cc216e7ddfb69a1497ca2b6b06b0943ce57a Mon Sep 17 00:00:00 2001 From: Anand Inguva Date: Mon, 10 Jul 2023 11:09:56 -0400 Subject: [PATCH 2/8] Replace empty buffer with a List/GroupBuffer --- .../portability/fn_api_runner/fn_runner.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py index 4d5ff6b998627..92991e9481510 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py @@ -825,10 +825,11 @@ def _execute_bundle(self, buffers_to_clean = set() known_consumers = set() - for _, buffer_id in bundle_context_manager.stage_data_outputs.items(): - for (consuming_stage_name, consuming_transform) in \ - runner_execution_context.buffer_id_to_consumer_pairs.get(buffer_id, - []): + for transform_id, buffer_id in ( + bundle_context_manager.stage_data_outputs.items()): + for (consuming_stage_name, consuming_transform + ) in runner_execution_context.buffer_id_to_consumer_pairs.get( + buffer_id, []): buffer = runner_execution_context.pcoll_buffers.get(buffer_id, None) if (buffer_id in runner_execution_context.pcoll_buffers and @@ -840,6 +841,12 @@ def _execute_bundle(self, # so we create a copy of the buffer for every new stage. runner_execution_context.pcoll_buffers[buffer_id] = buffer.copy() buffer = runner_execution_context.pcoll_buffers[buffer_id] + # When the buffer is not in the pcoll_buffers, it means that the + # it could be an empty PCollection. In this case, get the buffer using + # the buffer id and transform id + if buffer is None: + buffer = bundle_context_manager.get_buffer(buffer_id, transform_id) + assert buffer is not None # If the buffer has already been added to be consumed by # (stage, transform), then we don't need to add it again. This case From cac431679197ec9f0ec69a6ac29bdcd58f2ee5d9 Mon Sep 17 00:00:00 2001 From: Anand Inguva <34158215+AnandInguva@users.noreply.github.com> Date: Mon, 10 Jul 2023 11:14:17 -0400 Subject: [PATCH 3/8] Apply suggestions from code review --- .../apache_beam/runners/portability/fn_api_runner/fn_runner.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py index 92991e9481510..893cffba8cce8 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py @@ -859,9 +859,6 @@ def _execute_bundle(self, (consuming_stage_name, consuming_transform, buffer_id)) # We enqueue all of the pending output buffers to be scheduled at the # MAX_TIMESTAMP for the downstream stage. - # buffer is None. Empty PCollection. Bug - if not buffer: - buffer = ListBuffer(None) runner_execution_context.queues.watermark_pending_inputs.enque( ((consuming_stage_name, timestamp.MAX_TIMESTAMP), DataInput({consuming_transform: buffer}, {}))) # type: ignore From ff3019b691253270676c07ccaee5d7c187eff215 Mon Sep 17 00:00:00 2001 From: Anand Inguva Date: Mon, 10 Jul 2023 11:29:11 -0400 Subject: [PATCH 4/8] Fix mypy error --- .../apache_beam/runners/portability/fn_api_runner/fn_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py index 893cffba8cce8..c61f6951b4045 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py @@ -861,7 +861,7 @@ def _execute_bundle(self, # MAX_TIMESTAMP for the downstream stage. runner_execution_context.queues.watermark_pending_inputs.enque( ((consuming_stage_name, timestamp.MAX_TIMESTAMP), - DataInput({consuming_transform: buffer}, {}))) # type: ignore + DataInput({consuming_transform: buffer}, {}))) for bid in buffers_to_clean: if bid in runner_execution_context.pcoll_buffers: From 87634de3675aaed01b32fb55187998eda77c1dfb Mon Sep 17 00:00:00 2001 From: Anand Inguva <34158215+AnandInguva@users.noreply.github.com> Date: Mon, 10 Jul 2023 13:29:37 -0400 Subject: [PATCH 5/8] Update sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py --- .../apache_beam/runners/portability/fn_api_runner/fn_runner.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py index c61f6951b4045..be7f99dc61f4c 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py @@ -846,7 +846,6 @@ def _execute_bundle(self, # the buffer id and transform id if buffer is None: buffer = bundle_context_manager.get_buffer(buffer_id, transform_id) - assert buffer is not None # If the buffer has already been added to be consumed by # (stage, transform), then we don't need to add it again. This case From 60e7ed144be5d80974af76b9b0dcd1594e0f1365 Mon Sep 17 00:00:00 2001 From: Anand Inguva Date: Tue, 11 Jul 2023 12:28:00 -0400 Subject: [PATCH 6/8] Add test --- .../runners/portability/fn_api_runner/fn_runner_test.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py index ed09bb8f2236d..b55c7162aea74 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py @@ -1831,6 +1831,15 @@ def create_pipeline(self, is_drain=False): p._options.view_as(DebugOptions).experiments.remove('beam_fn_api') return p + def test_group_by_key_with_empty_pcoll_elements(self): + with self.create_pipeline() as p: + res = ( + p + | beam.Create([('test_key', 'test_value')]) + | beam.Filter(lambda x: False) + | beam.GroupByKey()) + assert_that(res, equal_to([])) + def test_metrics(self): raise unittest.SkipTest("This test is for a single worker only.") From aba5673215bbe4ced05033b5e7ba19595ca45bcd Mon Sep 17 00:00:00 2001 From: Anand Inguva Date: Tue, 11 Jul 2023 12:32:39 -0400 Subject: [PATCH 7/8] Add note to CHANGES.md --- CHANGES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index dac9654f23311..8bef46f745258 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -77,7 +77,7 @@ ## Bugfixes -* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* Fixed DirectRunner bug in Python SDK where GroupByKey gets empty PCollection and fails when `direct_num_workers!=1`. ([#27373]https://github.com/apache/beam/pull/27373) ## Known Issues From c77587f18eddd0f8c95ee2160f80ce005fd2c0c4 Mon Sep 17 00:00:00 2001 From: Anand Inguva Date: Tue, 11 Jul 2023 12:34:34 -0400 Subject: [PATCH 8/8] Fix link in CHANGES.md --- CHANGES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 8bef46f745258..4296436544a8a 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -77,7 +77,7 @@ ## Bugfixes -* Fixed DirectRunner bug in Python SDK where GroupByKey gets empty PCollection and fails when `direct_num_workers!=1`. ([#27373]https://github.com/apache/beam/pull/27373) +* Fixed DirectRunner bug in Python SDK where GroupByKey gets empty PCollection and fails when pipeline option `direct_num_workers!=1`. ([#27373](https://github.com/apache/beam/pull/27373)) ## Known Issues