Skip to content

Commit

Permalink
Merge pull request #295 from openvinotoolkit/progress-bar-for-job-mon…
Browse files Browse the repository at this point in the history
…itoring

Change job monitor functions to use progress bars
  • Loading branch information
ljcornel authored Dec 14, 2023
2 parents 39c208d + 3967a71 commit f0a076b
Show file tree
Hide file tree
Showing 4 changed files with 289 additions and 34 deletions.
33 changes: 32 additions & 1 deletion geti_sdk/data_models/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
# See the License for the specific language governing permissions
# and limitations under the License.
import logging
import re
from pprint import pformat
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Tuple

import attr

Expand Down Expand Up @@ -310,3 +311,33 @@ def is_finished(self) -> bool:
Return True if the job finished successfully, False otherwise
"""
return self.status.state == JobState.FINISHED

def _get_step_information(self) -> Tuple[int, int]:
"""
Return the current step and the total number of steps in the job
"""
status_message = self.status.message
step_pattern = re.compile(r"\(Step \d\/\d\)", re.IGNORECASE)
results = re.findall(step_pattern, status_message)
if len(results) != 1:
return 1, 1
result_string = results[0].strip("(").strip(")").split("Step")[-1]
result_string = result_string.strip()
result_nums = result_string.split("/")
current = int(result_nums[0])
total = int(result_nums[1])
return current, total

@property
def total_steps(self) -> int:
"""
Return the total number of steps in the job
"""
return self._get_step_information()[1]

@property
def current_step(self) -> int:
"""
Return the current step for the job
"""
return self._get_step_information()[0]
19 changes: 18 additions & 1 deletion geti_sdk/rest_clients/training_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
StatusRESTConverter,
)
from geti_sdk.utils import get_supported_algorithms
from geti_sdk.utils.job_helpers import get_job_with_timeout, monitor_jobs
from geti_sdk.utils.job_helpers import get_job_with_timeout, monitor_job, monitor_jobs


class TrainingClient:
Expand Down Expand Up @@ -257,6 +257,23 @@ def monitor_jobs(
session=self.session, jobs=jobs, timeout=timeout, interval=interval
)

def monitor_job(self, job: Job, timeout: int = 10000, interval: int = 15) -> Job:
"""
Monitor and print the progress of a `job`. Program execution is
halted until the job has either finished, failed or was cancelled.
Progress will be reported in 15s intervals
:param job: job to monitor
:param timeout: Timeout (in seconds) after which to stop the monitoring
:param interval: Time interval (in seconds) at which the TrainingClient polls
the server to update the status of the jobs. Defaults to 15 seconds
:return: job with it's status updated
"""
return monitor_job(
session=self.session, job=job, timeout=timeout, interval=interval
)

def get_jobs_for_task(self, task: Task, running_only: bool = True) -> List[Job]:
"""
Return a list of current jobs for the task, if any
Expand Down
267 changes: 237 additions & 30 deletions geti_sdk/utils/job_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,35 @@
# and limitations under the License.
import logging
import time
import warnings
from typing import List, Optional

from tqdm import TqdmWarning
from tqdm.auto import tqdm
from tqdm.contrib.logging import logging_redirect_tqdm

from geti_sdk.data_models.enums.job_state import JobState
from geti_sdk.data_models.job import Job
from geti_sdk.http_session import GetiRequestException, GetiSession
from geti_sdk.rest_converters.job_rest_converter import JobRESTConverter


def restrict(value: float, min: float = 0, max: float = 1) -> float:
"""
Restrict the input `value` to a certain range such that `min` < `value` < `max`
:param value: Variable to restrict
:param min: Minimum allowed value. Defaults to zero
:param max: Maximally allowed value. Defaults to one
:return: The value constrained to the specified range
"""
if value < min:
return min
if value > max:
return max
return value


def get_job_by_id(
job_id: str, session: GetiSession, workspace_id: str
) -> Optional[Job]:
Expand Down Expand Up @@ -113,38 +134,224 @@ def monitor_jobs(
JobState.FAILED,
JobState.ERROR,
]
logging.info("---------------- Monitoring progress -------------------")
jobs_to_monitor = [job for job in jobs if job.status.state not in completed_states]
try:
t_start = time.time()
t_elapsed = 0
while monitoring and t_elapsed < timeout:
msg = ""
logging.info(f"Monitoring progress for {len(jobs_to_monitor)} jobs...")
outer_bars = []
inner_bars = []
descriptions = []
progress_values = []
job_steps = []
total_job_steps = []
finished_jobs: List[Job] = []
with warnings.catch_warnings(), logging_redirect_tqdm(tqdm_class=tqdm):
warnings.filterwarnings("ignore", category=TqdmWarning)
for index, job in enumerate(jobs_to_monitor):
inner_description = job.status.message.split("(Step")[0].strip()
outer_description = f"Project `{job.metadata.project.name}` - {job.name}"
outer_bars.append(
tqdm(
total=job.total_steps,
desc=outer_description,
position=2 * index,
unit="step",
initial=job.current_step,
leave=True,
bar_format="{desc}: Step {n_fmt}/{total_fmt} |{bar}| [Total time elapsed: {elapsed}]",
miniters=0,
)
)
inner_bar_format_string = (
"{desc:>"
+ str(len(outer_description) + 2)
+ "}"
+ "{percentage:7.0f}% |{bar}| [{elapsed}<{remaining}, {rate_fmt}]"
)
inner_bar = tqdm(
total=100,
unit="%",
bar_format=inner_bar_format_string,
position=2 * index + 1,
leave=True,
)
inner_bar.set_description(inner_description)
inner_bars.append(inner_bar)
descriptions.append(inner_description)
progress_values.append(0)
job_steps.append(job.current_step)
total_job_steps.append(job.total_steps)
try:
t_start = time.time()
t_elapsed = 0
complete_count = 0
for job in jobs_to_monitor:
while monitoring and t_elapsed < timeout:
for index, job in enumerate(jobs_to_monitor):
if job in finished_jobs:
# Job has completed some time ago, skip further updates
continue

job.update(session)
if job.status.state in completed_states:
# Job has just completed, update progress bars to final state
complete_count += 1
finished_jobs.append(job)
inner_bars[index].set_description(
f"Job completed with state `{job.status.message}`",
refresh=True,
)
inner_bars[index].update(110)
outer_bars[index].update(
total_job_steps[index] - job_steps[index]
)
continue

no_step_message = job.status.message.split("(Step")[0].strip()
if no_step_message != descriptions[index]:
# Next phase of the job, reset progress bar
inner_bars[index].set_description(no_step_message, refresh=True)
inner_bars[index].reset(total=100)
descriptions[index] = no_step_message
progress_values[index] = 0
outer_bars[index].update(job.current_step - job_steps[index])
job_steps[index] = job.current_step

incremental_progress = job.status.progress - progress_values[index]
restrict(incremental_progress, min=0, max=100)
inner_bars[index].update(incremental_progress)
progress_values[index] = job.status.progress
outer_bars[index].update(0)

if complete_count == len(jobs_to_monitor):
break
time.sleep(interval)
t_elapsed = time.time() - t_start
except KeyboardInterrupt:
logging.info("Job monitoring interrupted, stopping...")
for ib, ob in zip(inner_bars, outer_bars):
ib.close()
ob.close()
return jobs
if t_elapsed < timeout:
logging.info("All jobs completed, monitoring stopped.")
else:
logging.info(
f"Monitoring stopped after {t_elapsed:.1f} seconds due to timeout."
)
for ib, ob in zip(inner_bars, outer_bars):
ib.close()
ob.close()
return jobs


def monitor_job(
session: GetiSession, job: Job, timeout: int = 10000, interval: int = 15
) -> List[Job]:
"""
Monitor and print the progress of a single `job`. Execution is
halted until the job has either finished, failed or was cancelled.
Progress will be reported in 15s intervals
:param session: GetiSession instance addressing the Intel® Geti™ platform
:param job: Job to monitor
:param timeout: Timeout (in seconds) after which to stop the monitoring
:param interval: Time interval (in seconds) at which the TrainingClient polls
the server to update the status of the job. Defaults to 15 seconds
:return: The finished (or failed) job with it's status updated
"""
monitoring = True
completed_states = [
JobState.FINISHED,
JobState.CANCELLED,
JobState.FAILED,
JobState.ERROR,
]
try:
logging.info(
f"Monitoring `{job.name}` job for project `{job.metadata.project.name}`: {job.metadata.task.name}"
)
except AttributeError:
logging.info(f"Monitoring `{job.name}` with id {job.id}")
job.update(session)
if job.status.state in completed_states:
logging.info(
f"Job `{job.name}` has already finished with status "
f"{str(job.status.state)}, monitoring stopped"
)
return job

t_start = time.time()
t_elapsed = 0

with logging_redirect_tqdm(tqdm_class=tqdm), warnings.catch_warnings():
warnings.filterwarnings("ignore", category=TqdmWarning)
try:
previous_progress = 0
previous_message = job.status.message.split("(Step")[0].strip()
current_step = job.current_step
outer_description = f"Project `{job.metadata.project.name}` - {job.name}"
total_steps = job.total_steps
outer_bar = tqdm(
total=total_steps,
desc=outer_description,
position=0,
unit="step",
initial=current_step,
leave=True,
bar_format="{desc}: Step {n_fmt}/{total_fmt} |{bar}| [Total time elapsed: {elapsed}]",
miniters=0,
)
inner_bar_format_string = (
"{desc:>"
+ str(len(outer_description) + 2)
+ "}"
+ "{percentage:7.0f}% |{bar}| [{elapsed}<{remaining}, {rate_fmt}]"
)
inner_bar = tqdm(
total=100,
unit="%",
bar_format=inner_bar_format_string,
position=1,
leave=False,
)
inner_bar.set_description(previous_message)
while monitoring and t_elapsed < timeout:
job.update(session)
msg += (
f"{job.name} -- "
f" Phase: {job.status.user_friendly_message} "
f" State: {job.status.state} "
f" Progress: {job.status.progress:.1f}%"
)
if job.status.state in completed_states:
complete_count += 1
if complete_count == len(jobs_to_monitor):
break
logging.info(msg)
time.sleep(interval)
t_elapsed = time.time() - t_start
except KeyboardInterrupt:
logging.info("Job monitoring interrupted, stopping...")
for job in jobs:
outer_bar.update(total_steps - current_step)
break
no_step_message = job.status.message.split("(Step")[0].strip()
if no_step_message != previous_message:
# Next phase of the job, reset progress bar
inner_bar.set_description(f"{no_step_message}", refresh=True)
inner_bar.reset(total=100)
previous_message = no_step_message
previous_progress = 0
outer_bar.update(job.current_step - current_step)
current_step = job.current_step

incremental_progress = job.status.progress - previous_progress
restrict(incremental_progress, min=0, max=100)
inner_bar.update(incremental_progress)
outer_bar.update(0)
previous_progress = job.status.progress
time.sleep(interval)
t_elapsed = time.time() - t_start
inner_bar.close()
outer_bar.close()
except KeyboardInterrupt:
logging.info("Job monitoring interrupted, stopping...")
job.update(session)
return jobs
if t_elapsed < timeout:
logging.info("All jobs completed, monitoring stopped.")
else:
logging.info(
f"Monitoring stopped after {t_elapsed:.1f} seconds due to timeout."
)
return jobs
inner_bar.close()
outer_bar.close()
return job
if t_elapsed < timeout:
logging.info(
f"Job `{job.name}` finished, monitoring stopped. Total time elapsed: "
f"{t_elapsed:.1f} seconds"
)
else:
logging.info(
f"Monitoring stopped after {t_elapsed:.1f} seconds due to timeout. Current "
f"job state: {job.status.state}"
)
return job
4 changes: 2 additions & 2 deletions notebooks/007_train_project.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@
"metadata": {},
"source": [
"### Monitoring the training process\n",
"Using the training_client and the training `job` we just started, we can monitor the training progress on the platform. The `training_client.monitor_jobs()` method can monitor the status of one or several jobs, and will update the job status every 15 seconds. Program execution is halted untill all jobs are completed (either successfully or cancelled/failed). Even if you only want to monitor a single job, be sure to pass it to the monitor_jobs method in a list as shown in the cell below.\n",
"Using the training_client and the training `job` we just started, we can monitor the training progress on the platform. The `training_client.monitor_job()` method can monitor the status of a job, and will update the job status every 15 seconds. Program execution is halted untill the job has completed (either successfully or cancelled/failed).\n",
"\n",
"> **NOTE**: Because training the task will take quite a bit of time, you may want to interrupt the monitoring at some point. This can be done by selecting the cell in which the monitoring is running and pressing the 'Interrupt the kernel' (solid square) button at the top of the page, or by navigating to the 'Kernel' menu in the top menu bar and selecting 'Interrupt the kernel' there. This will not cancel the job on the platform, it will just abort the job progress monitoring in the notebook."
]
Expand All @@ -216,7 +216,7 @@
"metadata": {},
"outputs": [],
"source": [
"training_client.monitor_jobs([job])"
"training_client.monitor_job(job);"
]
},
{
Expand Down

0 comments on commit f0a076b

Please sign in to comment.