Skip to content

Commit

Permalink
Merge pull request #362 from openvinotoolkit/update-job-datamodel
Browse files Browse the repository at this point in the history
Update job datamodel
  • Loading branch information
ljcornel authored Mar 26, 2024
2 parents e8b3ab0 + f4ea653 commit df4313d
Show file tree
Hide file tree
Showing 9 changed files with 58 additions and 33 deletions.
50 changes: 37 additions & 13 deletions geti_sdk/data_models/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
str_to_optional_enum_converter,
)
from geti_sdk.http_session import GetiRequestException, GetiSession
from geti_sdk.platform_versions import GETI_18_VERSION, GetiVersion
from geti_sdk.platform_versions import GETI_18_VERSION, GETI_116_VERSION, GetiVersion


@attr.define(slots=False)
Expand Down Expand Up @@ -189,20 +189,20 @@ class Job:
Representation of a job running on the GETi cluster.
:var name: Name of the job
:var description: Description of the job
:var description: Description of the job [deprecated in Geti v1.16]
:var id: Unique database ID of the job
:var project_id: Unique database ID of the project from which the job originates
:var status: JobStatus object holding the current status of the job
:var status: JobStatus object holding the current status of the job [deprecated in Geti v1.16]
:var type: Type of the job
:var metadata: JobMetadata object holding metadata for the job
"""

name: str
description: str
id: str
status: JobStatus
type: str = attr.field(converter=str_to_enum_converter(JobType))
metadata: JobMetadata
description: Optional[str] = None # deprecated in Geti v1.16
status: Optional[JobStatus] = None # deprecated in Geti v1.16
project_id: Optional[str] = None
creation_time: Optional[str] = attr.field(converter=str_to_datetime, default=None)
start_time: Optional[str] = attr.field(
Expand Down Expand Up @@ -285,10 +285,19 @@ def update(self, session: GetiSession) -> "Job":
else:
raise job_error

updated_status = JobStatus.from_dict(response["status"])
self.status = updated_status
self.state = updated_status.state
self.steps = response.get("steps", None)
if session.version < GETI_116_VERSION:
updated_status = JobStatus.from_dict(response["status"])
self.status = updated_status
self.state = updated_status.state
else:
self.state = JobState(response["state"])
self.status = JobStatus(
progress=self.current_step / self.total_steps,
message=self.current_step_message,
state=self.state,
)

if self._geti_version is None:
self.geti_version = session.version
return self
Expand All @@ -305,13 +314,13 @@ def cancel(self, session: GetiSession) -> "Job":
session.get_rest_response(
url=self.relative_url, method="DELETE", allow_text_response=True
)
self.status.state = JobState.CANCELLED
self.state = JobState.CANCELLED
except GetiRequestException as error:
if error.status_code == 404:
logging.info(
f"Job '{self.name}' is not active anymore, unable to delete."
)
self.status.state = JobState.INACTIVE
self.state = JobState.INACTIVE
else:
raise error
return self
Expand All @@ -338,14 +347,14 @@ def is_finished(self) -> bool:
"""
Return True if the job finished successfully, False otherwise
"""
return self.status.state == JobState.FINISHED
return self.state == JobState.FINISHED

@property
def is_running(self) -> bool:
"""
Return True if the job is currently running, False otherwise
"""
return self.status.state == JobState.RUNNING
return self.state == JobState.RUNNING

def _get_step_information(self) -> Tuple[int, int]:
"""
Expand Down Expand Up @@ -402,12 +411,27 @@ def current_step_message(self) -> str:
else:
current_step_index = self.current_step - 1
if current_step_index < 0 or current_step_index >= len(self.steps):
if self.status.state != JobState.SCHEDULED:
if self.state != JobState.SCHEDULED:
return ""
else:
return "Awaiting job execution"
return self.steps[current_step_index].get("step_name", "")

@property
def current_step_progress(self) -> float:
"""
Return the progress of the current step for the job
:return: float indicating the progress of the current step in the job
"""
if self.geti_version <= GETI_18_VERSION:
return self.status.progress
else:
current_step_index = self.current_step - 1
if current_step_index < 0 or current_step_index >= len(self.steps):
return 0.0
return self.steps[current_step_index].get("progress", 0.0)

@property
def geti_version(self) -> GetiVersion:
"""
Expand Down
2 changes: 1 addition & 1 deletion geti_sdk/data_models/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class StatusSummary:
"""

progress: float
time_remaining: float
time_remaining: Optional[float] = None # Deprecated in Geti v1.16
message: Optional[str] = None

def __attrs_post_init__(self):
Expand Down
2 changes: 1 addition & 1 deletion geti_sdk/demos/demo_projects/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def ensure_project_is_trained(geti: Geti, project: Project) -> bool:
)
# If there are no jobs running for the project, we launch them
jobs = training_client.get_jobs(project_only=True)
running_jobs = [job for job in jobs if job.status.state == JobState.RUNNING]
running_jobs = [job for job in jobs if job.state == JobState.RUNNING]
tasks = project.get_trainable_tasks()

new_jobs = []
Expand Down
2 changes: 1 addition & 1 deletion geti_sdk/rest_clients/model_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ def get_model_for_job(self, job: Job, check_status: bool = True) -> Model:
f"Cannot get model for job `{job.description}`. This job does not "
f"belong to the project managed by this ModelClient instance."
)
if job.status.state != JobState.FINISHED:
if job.state != JobState.FINISHED:
raise ValueError(
f"Job `{job.description}` is not finished yet, unable to retrieve "
f"model for the job. Please wait until job is finished"
Expand Down
2 changes: 1 addition & 1 deletion geti_sdk/rest_clients/training_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ def get_jobs_for_task(self, task: Task, running_only: bool = True) -> List[Job]:
if job.metadata.task is not None:
if job.metadata.task.task_id == task.id:
if running_only:
if job.status.state == JobState.RUNNING:
if job.state == JobState.RUNNING:
task_jobs.append(job)
else:
task_jobs.append(job)
Expand Down
22 changes: 12 additions & 10 deletions geti_sdk/utils/job_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def monitor_jobs(
JobState.FAILED,
JobState.ERROR,
]
jobs_to_monitor = [job for job in jobs if job.status.state not in completed_states]
jobs_to_monitor = [job for job in jobs if job.state not in completed_states]
logging.info(f"Monitoring progress for {len(jobs_to_monitor)} jobs...")
outer_bars = []
inner_bars = []
Expand Down Expand Up @@ -217,7 +217,7 @@ def monitor_jobs(
)
jobs_with_error.append(job)
complete_count += 1
if job.status.state in completed_states:
if job.state in completed_states:
# Job has just completed, update progress bars to final state
complete_count += 1
finished_jobs.append(job)
Expand All @@ -242,10 +242,12 @@ def monitor_jobs(
outer_bars[index].update(job.current_step - job_steps[index])
job_steps[index] = job.current_step

incremental_progress = job.status.progress - progress_values[index]
incremental_progress = (
job.current_step_progress - progress_values[index]
)
restrict(incremental_progress, min=0, max=100)
inner_bars[index].update(incremental_progress)
progress_values[index] = job.status.progress
progress_values[index] = job.current_step_progress
outer_bars[index].update(0)

if complete_count == len(jobs_to_monitor):
Expand Down Expand Up @@ -308,10 +310,10 @@ def monitor_job(
f"found on the Intel Geti instance. Monitoring is skipped "
f"for this job."
)
if job.status.state in completed_states:
if job.state in completed_states:
logging.info(
f"Job `{job.name}` has already finished with status "
f"{str(job.status.state)}, monitoring stopped"
f"{str(job.state)}, monitoring stopped"
)
return job

Expand Down Expand Up @@ -352,7 +354,7 @@ def monitor_job(
inner_bar.set_description(previous_message)
while monitoring and t_elapsed < timeout:
job.update(session)
if job.status.state in completed_states:
if job.state in completed_states:
outer_bar.update(total_steps - current_step)
inner_bar.update(100 - previous_progress)
monitoring = False
Expand All @@ -367,11 +369,11 @@ def monitor_job(
outer_bar.update(job.current_step - current_step)
current_step = job.current_step

incremental_progress = job.status.progress - previous_progress
incremental_progress = job.current_step_progress - previous_progress
restrict(incremental_progress, min=0, max=100)
inner_bar.update(incremental_progress)
outer_bar.update(0)
previous_progress = job.status.progress
previous_progress = job.current_step_progress
time.sleep(interval)
t_elapsed = time.time() - t_start
inner_bar.close()
Expand All @@ -390,6 +392,6 @@ def monitor_job(
else:
logging.info(
f"Monitoring stopped after {t_elapsed:.1f} seconds due to timeout. Current "
f"job state: {job.status.state}"
f"job state: {job.state}"
)
return job
2 changes: 1 addition & 1 deletion tests/nightly/test_nightly_project.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def test_monitor_jobs(self, fxt_project_service_no_vcr: ProjectService):

jobs = training_client.monitor_jobs(jobs=jobs, timeout=10000)
for job in jobs:
assert job.status.state == JobState.FINISHED
assert job.state == JobState.FINISHED

def test_upload_and_predict_image(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def test_project_setup_and_get_model_by_job(

# Test that getting model for the train job works
if fxt_test_mode == SdkTestMode.OFFLINE:
job.status.state = JobState.FINISHED
job.state = JobState.FINISHED
model = fxt_project_service.model_client.get_model_for_job(
job=job, check_status=False
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,15 @@ def test_train_task_and_get_jobs(

# Update job status
job.update(fxt_project_service.session)
job_state = job.status.state
if job_state in JobState.active_states():
if job.state in JobState.active_states():
# Cancel the job
logging.info(f"Job '{job.name}' is still active, cancelling...")
job.cancel(fxt_project_service.session)
else:
logging.info(
f"Job '{job.name}' has already excited with status {job_state}."
f"Job '{job.name}' has already excited with status {job.state}."
)
logging.info(job_state)
logging.info(job.state)

@pytest.mark.vcr()
def test_get_status(
Expand Down

0 comments on commit df4313d

Please sign in to comment.