Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Support rerun failed or canceled jobs in train_benchmark.py #1259

Merged
merged 6 commits into from
Oct 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ workflows:
tools/.* lint_only false
configs/.* lint_only false
.circleci/.* lint_only false
.dev_scripts/.* lint_only true
base-revision: 1.x
# this is the path of the configuration we should trigger once
# path filtering and pipeline parameter value updates are
Expand Down
29 changes: 28 additions & 1 deletion .dev_scripts/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,34 @@ python .dev_scripts/train_benchmark.py mm_lol \

Specifically, you need to enable `--skip`, and specify the list of models to skip by `--skip-list`

## Automatically check links
## 7. Train failed or canceled jobs

If you want to rerun failed or canceled jobs in the last run, you can combine `--rerun` flag with `--rerun-failure` and `--rerun-cancel` flags.

For example, the log file of the last run is `train-20221009-211904.log`, and now you want to rerun the failed jobs. You can use the following command:

```bash
python .dev_scripts/train_benchmark.py mm_lol \
--job-name RERUN \
--rerun train-20221009-211904.log \
--rerun-fail \
--run
```

We can combine `--rerun-fail` and `--rerun-cancel` with flag `---models` to rerun a **subset** of failed or canceled model.

```bash
python .dev_scripts/train_benchmark.py mm_lol \
--job-name RERUN \
--rerun train-20221009-211904.log \
--rerun-fail \
--models sagan \ # only rerun 'sagan' models in all failed tasks
--run
```

Specifically, `--rerun-fail` and `--rerun-cancel` can be used together to rerun both failed and cancaled jobs.

## 8. Automatically check links

Use the following script to check whether the links in documentations are valid:

Expand Down
2 changes: 1 addition & 1 deletion .dev_scripts/job_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from pygments.util import ClassNotFound
from simple_term_menu import TerminalMenu

CACHE_DIR = '~/.task_watcher'
CACHE_DIR = osp.join(osp.abspath('~'), '.task_watcher')


def show_job_out(name, root, job_name_list):
Expand Down
29 changes: 23 additions & 6 deletions .dev_scripts/train_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from rich.syntax import Syntax
from rich.table import Table
from tqdm import tqdm
from utils import filter_jobs, parse_job_list_from_file

console = Console()
MMEDIT_ROOT = Path(__file__).absolute().parents[1]
Expand Down Expand Up @@ -91,8 +92,13 @@ def parse_args():
parser.add_argument('--skip', type=str, default=None)
parser.add_argument('--skip-list', default=None)
parser.add_argument('--rerun', type=str, default=None)
parser.add_argument(
'--rerun-fail', action='store_true', help='only rerun failed tasks')
parser.add_argument(
'--rerun-cancel', action='store_true', help='only rerun cancel tasks')
parser.add_argument('--rerun-list', default=None)
parser.add_argument('--gpus-per-job', type=int, default=None)
parser.add_argument('--cpus-per-job', type=int, default=16)
parser.add_argument(
'--amp', action='store_true', help='Whether to use amp.')
parser.add_argument(
Expand Down Expand Up @@ -145,11 +151,22 @@ def parse_args():
args.skip_list = skip_list
print('skip_list: ', args.skip_list)
elif args.rerun is not None:
with open(args.rerun, 'r') as fp:
rerun_list = fp.readlines()
rerun_list = [j.split('\n')[0] for j in rerun_list]
args.rerun_list = rerun_list
print('rerun_list: ', args.rerun_list)
job_id_list_full, job_name_list_full = parse_job_list_from_file(
args.rerun)
filter_target = []

if args.rerun_fail:
filter_target += ['FAILED']
if args.rerun_cancel:
filter_target += ['CANCELLED']

_, job_name_list = filter_jobs(
job_id_list_full,
job_name_list_full,
filter_target,
show_table=True,
table_name='Rerun List')
args.rerun_list = job_name_list

return args

Expand Down Expand Up @@ -222,7 +239,7 @@ def create_train_job_batch(commands, model_info, args, port, script_name):
job_script += (f'#SBATCH --gres=gpu:{n_gpus}\n'
f'#SBATCH --ntasks-per-node={min(n_gpus, 8)}\n'
f'#SBATCH --ntasks={n_gpus}\n'
f'#SBATCH --cpus-per-task=5\n\n')
f'#SBATCH --cpus-per-task={args.cpus_per_job}\n\n')
else:
job_script += '\n\n' + 'export CUDA_VISIBLE_DEVICES=-1\n'

Expand Down
118 changes: 118 additions & 0 deletions .dev_scripts/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import os
import os.path as osp
from typing import Tuple

from rich import print as pprint
from rich.table import Table


def parse_job_list(job_list) -> Tuple[list, list]:
"""Parse task name and job id from list. All elements in `job_list` must.

be formatted as `JOBID @ JOBNAME`.

Args:
job_list (list[str]): Job list.

Returns:
Tuple[list, list]: Job ID list and Job name list.
"""
assert all([
' @ ' in job for job in job_list
]), ('Each line of job list must be formatted like \'JOBID @ JOBNAME\'.')
job_id_list, job_name_list = [], []
for job_info in job_list:
job_id, job_name = job_info.split(' @ ')
job_id_list.append(job_id)
job_name_list.append(job_name)
return job_id_list, job_name_list


def parse_job_list_from_file(job_list_file: str) -> Tuple[list, list]:
"""Parse job list from file and return a tuple contains list of job id and
job name.

Args:
job_list_file (str): The path to the file list.

Returns:
Tuple[list, list]: A tuple contains list of job id and job name.
"""
if not osp.exists(job_list_file):
return False
with open(job_list_file, 'r') as file:
job_list = [job.strip() for job in file.readlines()]
return parse_job_list(job_list)


def get_info_from_id(job_id: str) -> dict:
"""Get the basic information of a job id with `swatch examine` command.

Args:
job_id (str): The ID of the job.

Returns:
dict: A dict contains information of the corresponding job id.
"""
# NOTE: do not have exception handling here
info_stream = os.popen(f'swatch examine {job_id}')
info_str = [line.strip() for line in info_stream.readlines()]
status_info = info_str[2].split()
try:
status_dict = {
'JobID': status_info[0],
'JobName': status_info[1],
'Partition': status_info[2],
'NNodes': status_info[3],
'AllocCPUS': status_info[4],
'State': status_info[5]
}
except Exception:
print(job_id)
print(info_str)
return status_dict


def filter_jobs(job_id_list: list,
job_name_list: list,
select: list = ['FAILED'],
show_table: bool = False,
table_name: str = 'Filter Results') -> Tuple[list, list]:
"""Filter the job which status not belong to :attr:`select`.

Args:
job_id_list (list): The list of job ids.
job_name_list (list): The list of job names.
select (list, optional): Which kind of jobs will be selected.
Defaults to ['FAILED'].
show_table (bool, optional): Whether display the filter result in a
table. Defaults to False.
table_name (str, optional): The name of the table. Defaults to
'Filter Results'.

Returns:
Tuple[list]: A tuple contains selected job ids and job names.
"""
# if ignore is not passed, return the original id list and name list
if not select:
return job_id_list, job_name_list
filtered_id_list, filtered_name_list = [], []
job_info_list = []
for id_, name_ in zip(job_id_list, job_name_list):
info = get_info_from_id(id_)
job_info_list.append(info)
if info['State'] in select:
filtered_id_list.append(id_)
filtered_name_list.append(name_)

if show_table:
filter_table = Table(title=table_name)
for field in ['Name', 'ID', 'State', 'Is Selected']:
filter_table.add_column(field)
for id_, name_, info_ in zip(job_id_list, job_name_list,
job_info_list):
selected = '[green]True' \
if info_['State'] in select else '[red]False'
filter_table.add_row(name_, id_, info_['State'], selected)
pprint(filter_table)
return filtered_id_list, filtered_name_list