diff --git a/components/google-cloud/google_cloud_pipeline_components/container/experimental/dataflow/dataflow_python_job_remote_runner.py b/components/google-cloud/google_cloud_pipeline_components/container/experimental/dataflow/dataflow_python_job_remote_runner.py index 958c09d32ee..f85480657dc 100644 --- a/components/google-cloud/google_cloud_pipeline_components/container/experimental/dataflow/dataflow_python_job_remote_runner.py +++ b/components/google-cloud/google_cloud_pipeline_components/container/experimental/dataflow/dataflow_python_job_remote_runner.py @@ -77,7 +77,6 @@ def create_python_job(python_module_path: str, with open(gcp_resources, 'w') as f: f.write(json_format.MessageToJson(job_resources)) break - sub_process.wait_and_check() if not job_id: raise RuntimeError( 'No dataflow job was found when running the python file.') @@ -178,12 +177,3 @@ def read_lines(self): for line in iter(self.process.stdout.readline, b''): logging.info('subprocess: %s', line) yield line - - def wait_and_check(self): - for _ in self.read_lines(): - pass - self.process.stdout.close() - return_code = self.process.wait() - logging.info('Subprocess exit with code %s.', return_code) - if return_code: - raise subprocess.CalledProcessError(return_code, self._cmd) diff --git a/components/google-cloud/tests/container/experimental/dataflow/test_dataflow_python_job_remote_runner.py b/components/google-cloud/tests/container/experimental/dataflow/test_dataflow_python_job_remote_runner.py index 2ea97ff5384..5ed5da851d7 100644 --- a/components/google-cloud/tests/container/experimental/dataflow/test_dataflow_python_job_remote_runner.py +++ b/components/google-cloud/tests/container/experimental/dataflow/test_dataflow_python_job_remote_runner.py @@ -148,7 +148,6 @@ def test_create_python_job_raises_error_on_no_job_id( gcp_resources=self._gcp_resources, location=self._location, temp_location=self._gcs_temp_path) - mock_process_client.wait_and_check.assert_called_once_with() @mock.patch.object( dataflow_python_job_remote_runner, 'stage_file', autospec=True) @@ -177,7 +176,6 @@ def test_create_python_job_parses_with_emtpy_args_list_parses_correctly( temp_location=self._gcs_temp_path) mock_prepare_cmd.assert_called_once_with(self._project, self._location, mock.ANY, [], self._gcs_temp_path) - mock_process_client.wait_and_check.assert_called_once_with() @mock.patch.object( dataflow_python_job_remote_runner, 'stage_file', autospec=True) @@ -208,7 +206,6 @@ def test_create_python_job_parses_with_json_array_args_list_parses_correctly( mock_prepare_cmd.assert_called_once_with(self._project, self._location, mock.ANY, self._args, self._gcs_temp_path) - mock_process_client.wait_and_check.assert_called_once_with() @mock.patch.object( dataflow_python_job_remote_runner, 'stage_file', autospec=True)