Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add mpi job into arena launcher #1307

Merged
merged 19 commits into from
May 15, 2019
6 changes: 4 additions & 2 deletions components/arena/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ COPY --from=build /usr/local/bin/kubectl /usr/local/bin/kubectl

COPY --from=build /go/src/github.com/kubeflow/arena/charts /charts

COPY arena_launcher.py /root
ENV PYTHONPATH "${PYTHONPATH}:/root"

ADD . /root

WORKDIR /root

ENTRYPOINT ["python","arean_launcher.py"]
ENTRYPOINT ["python","arena_launcher.py"]
230 changes: 11 additions & 219 deletions components/arena/docker/arena_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,216 +25,8 @@
import yaml
from subprocess import Popen,PIPE
from shlex import split

def setup_custom_logging():
logging.basicConfig(format='%(asctime)s %(levelname)-8s %(message)s',
level=logging.INFO,
datefmt='%Y-%m-%d %H:%M:%S')

def _submit_job(command):
logging.info("command: {0}".format(command))
try:
output = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True)
result = output.decode()
except subprocess.CalledProcessError as exc:
print("Status : FAIL", exc.returncode, exc.output)
sys.exit(-1)
logging.info('Submit Job: %s.' % result)

def _is_active_status(status):
logging.info("status: {0}".format(status))
return status == 'PENDING' or status == 'RUNNING'

def _is_pending_status(status):
logging.info("status: {0}".format(status))
return status == 'PENDING'

def _wait_job_done(name, job_type, timeout):
end_time = datetime.datetime.now() + timeout
logging.info("expect done time: {0}".format(end_time))
status = _get_job_status(name, job_type)
while _is_active_status(status):
if datetime.datetime.now() > end_time:
timeoutMsg = "Timeout waiting for job {0} with job type {1} completing.".format(name ,job_type)
logging.error(timeoutMsg)
raise Exception(timeoutMsg)
time.sleep(3)
status = _get_job_status(name, job_type)
logging.info("job {0} with type {1} status is {2}".format(name, job_type, status))

def _wait_job_running(name, job_type, timeout):
end_time = datetime.datetime.now() + timeout
logging.info("expect running time: {0}".format(end_time))
status = _get_job_status(name, job_type)
while _is_pending_status(status):
if datetime.datetime.now() > end_time:
timeoutMsg = "Timeout waiting for job {0} with job type {1} running.".format(name ,job_type)
logging.error(timeoutMsg)
raise Exception(timeoutMsg)
time.sleep(3)
status = _get_job_status(name, job_type)
logging.info("job {0} with type {1} status is {2}".format(name, job_type, status))

def _job_logging(name, job_type):
logging_cmd = "arena logs -f %s" % (name)
process = Popen(split(logging_cmd), stdout = PIPE, stderr = PIPE, encoding='utf8')
while True:
output = process.stdout.readline()
if output == "" and process.poll() is not None:
break
if output:
# print("", output.strip())
logging.info(output.strip())
rc = process.poll()
return rc

def _collect_metrics(name, job_type, metric_name):
metrics_cmd = "arena logs --tail=50 %s | grep -e '%s=' -e '%s:' | tail -1" % (name, metric_name, metric_name)
metric = 0
logging.info("search metric_name %s" % (metric_name))
try:
import re
output = subprocess.check_output(metrics_cmd, stderr=subprocess.STDOUT, shell=True)
result = output.decode().strip()
split_unit=''
if metric_name+"=" in result:
split_unit="="
elif metric_name+":" in result:
split_unit=":"
else:
return 0
array = result.split("%s%s" % (metric_name, split_unit))
if len(array) > 0:
logging.info(array)
result = re.findall(r'\d+\.*\d*',array[-1])
metric = float(array[-1])
except Exception as e:
logging.warning("Failed to get job status due to" + e)
return 0

return metric

def _get_job_status(name, job_type):
get_cmd = "arena get %s --type %s | grep -i STATUS:|awk -F: '{print $NF}'" % (name, job_type)
status = ""
try:
output=subprocess.check_output(get_cmd, stderr=subprocess.STDOUT, shell=True)
status = output.decode()
status = status.strip()
except subprocess.CalledProcessError as e:
logging.warning("Failed to get job status due to" + e)

return status

def _get_tensorboard_url(name, job_type):
get_cmd = "arena get %s --type %s | tail -1" % (name, job_type)
url = "N/A"
try:
output = subprocess.check_output(get_cmd, stderr=subprocess.STDOUT, shell=True)
url = output.decode()
except subprocess.CalledProcessError as e:
logging.warning("Failed to get job status due to" + e)

return url

# Generate common options
def generate_options(args):
gpus = args.gpus
cpu = args.cpu
memory = args.memory
tensorboard = args.tensorboard
output_data = args.output_data
data = args.data
env = args.env
tensorboard_image = args.tensorboard_image
tensorboard = str2bool(args.tensorboard)
log_dir = args.log_dir
sync_source = args.sync_source

options = []

if gpus > 0:
options.extend(['--gpus', str(gpus)])

if cpu > 0:
options.extend(['--cpu', str(cpu)])

if memory >0:
options.extend(['--memory', str(memory)])

if tensorboard_image != "tensorflow/tensorflow:1.12.0":
options.extend(['--tensorboardImage', tensorboard_image])

if tensorboard:
options.append("--tensorboard")

if os.path.isdir(args.log_dir):
options.extend(['--logdir', args.log_dir])
else:
logging.info("skip log dir :{0}".format(args.log_dir))

if len(data) > 0:
for d in data:
options.append("--data={0}".format(d))

if len(env) > 0:
for e in env:
options.append("--env={0}".format(e))

if len(args.workflow_name) > 0:
options.append("--env=WORKFLOW_NAME={0}".format(args.workflow_name))

if len(args.step_name) > 0:
options.append("--env=STEP_NAME={0}".format(args.step_name))

if len(sync_source) > 0:
if not sync_source.endswith(".git"):
raise ValueError("sync_source must be an http git url")
options.extend(['--sync-mode','git'])
options.extend(['--sync-source',sync_source])

return options



# Generate standalone job
def generate_job_command(args):
name = args.name
image = args.image

commandArray = [
'arena', 'submit', 'tfjob',
'--name={0}'.format(name),
'--image={0}'.format(image),
]

commandArray.extend(generate_options(args))

return commandArray, "tfjob"

# Generate mpi job
def generate_mpjob_command(args):
name = args.name
workers = args.workers
image = args.image

commandArray = [
'arena', 'submit', 'mpijob',
'--name={0}'.format(name),
'--workers={0}'.format(workers),
'--image={0}'.format(image),
]

if rdma:
commandArray.append("--rdma")

commandArray.extend(generate_options(args))

return commandArray, "mpijob"

def str2bool(v):
return v.lower() in ("yes", "true", "t", "1")

from utils import *
from job_generator import *

def main(argv=None):
setup_custom_logging()
Expand All @@ -260,8 +52,8 @@ def main(argv=None):

parser.add_argument('--image', type=str)
parser.add_argument('--gpus', type=int, default=0)
parser.add_argument('--cpu', type=int, default=0)
parser.add_argument('--memory', type=int, default=0)
parser.add_argument('--cpu', type=str, default='0')
parser.add_argument('--memory', type=str, default='0')
parser.add_argument('--workers', type=int, default=2)

parser.add_argument('--env', action='append', type=str, default=[])
Expand Down Expand Up @@ -314,7 +106,7 @@ def main(argv=None):

logging.info('Start training {0}.'.format(command))

_submit_job(command)
submit_job(command)

#with open('/mlpipeline-ui-metadata.json', 'w') as f:
# json.dump(metadata, f)
Expand All @@ -323,16 +115,16 @@ def main(argv=None):
succ = True

# wait for job done
# _wait_job_done(fullname, job_type, datetime.timedelta(minutes=timeout_hours))
# wait_job_done(fullname, job_type, datetime.timedelta(minutes=timeout_hours))
pending_timeout_minutes = args.pending_timeout_minutes
_wait_job_running(fullname, job_type, datetime.timedelta(minutes=pending_timeout_minutes))
wait_job_running(fullname, job_type, datetime.timedelta(minutes=pending_timeout_minutes))

rc = _job_logging(fullname, job_type)
rc = job_logging(fullname, job_type)
logging.info("rc: {0}".format(rc))

_wait_job_done(fullname, job_type, datetime.timedelta(hours=timeout_hours))
wait_job_done(fullname, job_type, datetime.timedelta(hours=timeout_hours))

status = _get_job_status(fullname, job_type)
status = get_job_status(fullname, job_type)

if status == "SUCCEEDED":
logging.info("Training Job {0} success.".format(fullname))
Expand All @@ -348,7 +140,7 @@ def main(argv=None):
if len(mArray) > 1:
metric_unit = mArray[1]
logging.info("determine metric name {0} with metric unit {1}".format(metric_name, metric_unit))
value = _collect_metrics(fullname, job_type, metric_name)
value = collect_metrics(fullname, job_type, metric_name)
if value > 0:
import re
p = re.compile('^[a-z]([-a-z0-9]{0,62}[a-z0-9])?')
Expand Down
109 changes: 109 additions & 0 deletions components/arena/docker/job_generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import argparse
import datetime
import json
import os
import sys
import logging
import requests
import subprocess
import six
import time
import yaml
from subprocess import Popen,PIPE
from shlex import split

from utils import *

# Generate common options
def generate_options(args):
gpus = args.gpus
cpu = args.cpu
memory = args.memory
tensorboard = args.tensorboard
output_data = args.output_data
data = args.data
env = args.env
tensorboard_image = args.tensorboard_image
tensorboard = str2bool(args.tensorboard)
log_dir = args.log_dir
sync_source = args.sync_source

options = []

if gpus > 0:
options.extend(['--gpus', str(gpus)])

if cpu != '0':
options.extend(['--cpu', str(cpu)])

if memory != '0':
options.extend(['--memory', str(memory)])

if tensorboard_image != "tensorflow/tensorflow:1.12.0":
options.extend(['--tensorboardImage', tensorboard_image])

if tensorboard:
options.append("--tensorboard")

if os.path.isdir(args.log_dir):
options.extend(['--logdir', args.log_dir])
else:
logging.info("skip log dir :{0}".format(args.log_dir))

if len(data) > 0:
for d in data:
options.append("--data={0}".format(d))

if len(env) > 0:
for e in env:
options.append("--env={0}".format(e))

if len(args.workflow_name) > 0:
options.append("--env=WORKFLOW_NAME={0}".format(args.workflow_name))

if len(args.step_name) > 0:
options.append("--env=STEP_NAME={0}".format(args.step_name))

if len(sync_source) > 0:
if not sync_source.endswith(".git"):
raise ValueError("sync_source must be an http git url")
options.extend(['--sync-mode','git'])
options.extend(['--sync-source',sync_source])

return options

# Generate standalone job
def generate_job_command(args):
name = args.name
image = args.image

commandArray = [
'arena', 'submit', 'tfjob',
'--name={0}'.format(name),
'--image={0}'.format(image),
]

commandArray.extend(generate_options(args))

return commandArray, "tfjob"

# Generate mpi job
def generate_mpjob_command(args):
name = args.name
workers = args.workers
image = args.image
rdma = args.rdma

commandArray = [
'arena', 'submit', 'mpijob',
'--name={0}'.format(name),
'--workers={0}'.format(workers),
'--image={0}'.format(image),
]

if rdma.lower() == "true":
commandArray.append("--rdma")

commandArray.extend(generate_options(args))

return commandArray, "mpijob"
Loading