Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Quickly fail if an existing job has the same ID #40

Merged
merged 6 commits into from
Dec 6, 2023
Merged
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 55 additions & 13 deletions buildstockbatch/gcp/gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,9 +456,7 @@ def show_jobs(self):
Show the existing GCP Batch and Cloud Run jobs that match the provided project, if they exist.
"""
# GCP Batch job that runs the simulations
client = batch_v1.BatchServiceClient()
try:
job = client.get_job(batch_v1.GetJobRequest(name=self.gcp_batch_job_name))
if job := self.get_existing_batch_job():
logger.info("Batch job")
logger.info(f" Name: {job.name}")
logger.info(f" UID: {job.uid}")
Expand All @@ -469,7 +467,7 @@ def show_jobs(self):
task_counts[status] += count
logger.info(f" Task statuses: {dict(task_counts)}")
logger.debug(f"Full job info:\n{job}")
except exceptions.NotFound:
else:
logger.info(f"No existing Batch jobs match: {self.gcp_batch_job_name}")
logger.info(f"See all Batch jobs at https://console.cloud.google.com/batch/jobs?project={self.gcp_project}")

Expand All @@ -489,6 +487,49 @@ def show_jobs(self):
logger.info(f"No existing Cloud Run jobs match {self.postprocessing_job_name}")
logger.info(f"See all Cloud Run jobs at https://console.cloud.google.com/run/jobs?project={self.gcp_project}")

def get_existing_batch_job(self):
client = batch_v1.BatchServiceClient()
try:
job = client.get_job(batch_v1.GetJobRequest(name=self.gcp_batch_job_name))
return job
except exceptions.NotFound:
return None

def get_existing_postprocessing_job(self):
jobs_client = run_v2.JobsClient()
try:
job = jobs_client.get_job(name=self.postprocessing_job_name)
return job
except exceptions.NotFound:
return False

def check_for_existing_jobs(self, pp_only=False):
"""If there are existing jobs with the same ID as this job, logs them as errors and returns True.

Otherwise, returns False.

:param pp_only: If true, only check for the post-processing job.
"""
if pp_only:
existing_batch_job = None
else:
if existing_batch_job := self.get_existing_batch_job():
logger.error(
f"A Batch job with this ID ({self.job_identifier}) already exists "
f"(status: {existing_batch_job.status.state.name}). Choose a new job_identifier or run with "
"--clean to delete the existing job."
)
if existing_pp_job := self.get_existing_postprocessing_job():
status = "Running"
if existing_pp_job.latest_created_execution.completion_time:
status = "Completed"
logger.error(
f"A Cloud Run job with this ID ({self.postprocessing_job_id}) already exists "
f"(status: {status}). Choose a new job_identifier or run with --clean "
"to delete the existing job."
)
return bool(existing_batch_job or existing_pp_job)

def upload_batch_files_to_cloud(self, tmppath):
"""Implements :func:`DockerBase.upload_batch_files_to_cloud`"""
logger.info("Uploading Batch files to Cloud Storage")
Expand Down Expand Up @@ -963,11 +1004,10 @@ def start_combine_results_job_on_cloud(self, results_dir, do_timeseries=True):
# Monitor job/execution status, starting by polling the Job for an Execution
logger.info("Waiting for execution to begin...")
job_start_time = datetime.now()
jobs_client = run_v2.JobsClient()
job = jobs_client.get_job(name=self.postprocessing_job_name)
job = self.get_existing_postprocessing_job()
while not job.latest_created_execution:
time.sleep(1)
job = jobs_client.get_job(name=self.postprocessing_job_name)
job = self.get_existing_postprocessing_job()
execution_start_time = datetime.now()
logger.info(
f"Execution has started (after {str(execution_start_time - job_start_time)} "
Expand Down Expand Up @@ -1030,15 +1070,13 @@ def start_combine_results_job_on_cloud(self, results_dir, do_timeseries=True):
tsv_logger.log_stats(logging.INFO)

def clean_postprocessing_job(self):
jobs_client = run_v2.JobsClient()
logger.info(
"Cleaning post-processing Cloud Run job with "
f"job_identifier='{self.job_identifier}'; "
f"job name={self.postprocessing_job_name}..."
)
try:
job = jobs_client.get_job(name=self.postprocessing_job_name)
except Exception:
job = self.get_existing_postprocessing_job()
if not job:
logger.warning(
"Post-processing Cloud Run job not found for "
f"job_identifier='{self.job_identifier}' "
Expand Down Expand Up @@ -1069,6 +1107,7 @@ def clean_postprocessing_job(self):
return

# ... The job succeeded or its execution was deleted successfully; it can be deleted
jobs_client = run_v2.JobsClient()
try:
jobs_client.delete_job(name=self.postprocessing_job_name)
except Exception:
Expand Down Expand Up @@ -1186,14 +1225,17 @@ def main():
batch.show_jobs()
return
elif args.postprocessonly:
if batch.check_for_existing_jobs(pp_only=True):
return
batch.build_image()
batch.push_image()
batch.process_results()
elif args.crawl:
batch.process_results(skip_combine=True, use_dask_cluster=False)
else:
# TODO: check whether this job ID already exists. If so, don't try to start a new job, and maybe reattach
# to the existing one if it's still running.
if batch.check_for_existing_jobs():
return

batch.build_image()
batch.push_image()
batch.run_batch()
Expand Down