Skip to content

Commit

Permalink
feat(components): not wait for dataflow job to finish in python compo…
Browse files Browse the repository at this point in the history
…nent

PiperOrigin-RevId: 406292369
  • Loading branch information
IronPan authored and Google Cloud Pipeline Components maintainers committed Oct 29, 2021
1 parent f1bb852 commit eeb0b9c
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ def create_python_job(python_module_path: str,
with open(gcp_resources, 'w') as f:
f.write(json_format.MessageToJson(job_resources))
break
sub_process.wait_and_check()
if not job_id:
raise RuntimeError(
'No dataflow job was found when running the python file.')
Expand Down Expand Up @@ -178,12 +177,3 @@ def read_lines(self):
for line in iter(self.process.stdout.readline, b''):
logging.info('subprocess: %s', line)
yield line

def wait_and_check(self):
for _ in self.read_lines():
pass
self.process.stdout.close()
return_code = self.process.wait()
logging.info('Subprocess exit with code %s.', return_code)
if return_code:
raise subprocess.CalledProcessError(return_code, self._cmd)
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ def test_create_python_job_raises_error_on_no_job_id(
gcp_resources=self._gcp_resources,
location=self._location,
temp_location=self._gcs_temp_path)
mock_process_client.wait_and_check.assert_called_once_with()

@mock.patch.object(
dataflow_python_job_remote_runner, 'stage_file', autospec=True)
Expand Down Expand Up @@ -177,7 +176,6 @@ def test_create_python_job_parses_with_emtpy_args_list_parses_correctly(
temp_location=self._gcs_temp_path)
mock_prepare_cmd.assert_called_once_with(self._project, self._location,
mock.ANY, [], self._gcs_temp_path)
mock_process_client.wait_and_check.assert_called_once_with()

@mock.patch.object(
dataflow_python_job_remote_runner, 'stage_file', autospec=True)
Expand Down Expand Up @@ -208,7 +206,6 @@ def test_create_python_job_parses_with_json_array_args_list_parses_correctly(
mock_prepare_cmd.assert_called_once_with(self._project, self._location,
mock.ANY, self._args,
self._gcs_temp_path)
mock_process_client.wait_and_check.assert_called_once_with()

@mock.patch.object(
dataflow_python_job_remote_runner, 'stage_file', autospec=True)
Expand Down

0 comments on commit eeb0b9c

Please sign in to comment.