Skip to content

Commit

Permalink
Support Python functions in workflows (kubeflow#431)
Browse files Browse the repository at this point in the history
* Support Python functions in workflows

* Use imp library and include args

* Import ci directory

* Import kf_unittests

* Add kwargs

* Fix pylint issues

* Remove unused import

* Add more logging

* Include Kam fix
  • Loading branch information
scottilee authored and k8s-ci-robot committed Aug 28, 2019
1 parent 54db950 commit 45511ce
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 87 deletions.
7 changes: 7 additions & 0 deletions prow_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,10 @@ workflows:
- app_dir: kubeflow/testing/workflows
component: workflows
name: unittests

- py_func: kubeflow.testing.ci.kf_unittests.create_workflow
name: pyfunctest
# can optionally take keyword arguments
# kw_args:
# a: 1
# b: 2
Empty file.
15 changes: 15 additions & 0 deletions py/kubeflow/testing/ci/kf_unittests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
"""Test Argo workflow"""

import requests
import yaml

def create_workflow():
""" Loads Argo example YAML and returns dictionary object """
# TODO: define workflow to run unittests
argo_hello_world = requests.get("https://raw.githubusercontent.com/argoproj/"
"argo/master/examples/hello-world.yaml")
yaml_result = yaml.safe_load(argo_hello_world.text)
return yaml_result

if __name__ == "__main__":
create_workflow()
225 changes: 138 additions & 87 deletions py/kubeflow/testing/run_e2e_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
This script submits Argo workflows to run the E2E tests and waits for
them to finish. It is intended to be invoked by prow jobs.
It requires the workflow to be expressed as a ksonnet app.
It requires the workflow to be expressed as a ksonnet app or a Python function.
The script can take a config file via --config_file.
The --config_file is expected to be a YAML file as follows:
Expand All @@ -17,9 +17,10 @@
include_dirs:
tensorflow/*
- name: lint
app_dir: kubeflow/kubeflow/testing/workflows
component: workflows
- name: workflow-test
py_func: my_test_package.my_test_module.my_test_workflow
kw_args:
arg1: argument
app_dir is expected to be in the form of
{REPO_OWNER}/{REPO_NAME}/path/to/ksonnet/app
Expand All @@ -33,6 +34,10 @@
include_dirs (optional) is an array of strings that specify which directories, if modified,
should run this workflow.
py_func is the Python method to invoke Argo workflows
kw_args is an array of arguments passed to the Python method
The script expects that the directories
{repos_dir}/{app_dir} exists. Where repos_dir is provided
as a command line argument.
Expand All @@ -41,17 +46,20 @@
import argparse
import datetime
import fnmatch
import importlib
import logging
import os
import tempfile
from kubeflow.testing import argo_client
from kubeflow.testing import ks_util
from kubeflow.testing import prow_artifacts
from kubeflow.testing import util
import uuid
import subprocess
import sys
import yaml
from kubernetes import config
from kubernetes import client as k8s_client
from kubeflow.testing import argo_client
from kubeflow.testing import ks_util
from kubeflow.testing import prow_artifacts
from kubeflow.testing import util

# The namespace to launch the Argo workflow in.
def get_namespace(args):
Expand All @@ -62,7 +70,14 @@ def get_namespace(args):
return "kubeflow-releasing"
return "kubeflow-test-infra"

class WorkflowComponent(object):
# imports py_func
def py_func_import(py_func, kwargs):
path, module = py_func.rsplit('.', 1)
mod = importlib.import_module(path)
met = getattr(mod, module)
return met(**kwargs)

class WorkflowKSComponent(object):
"""Datastructure to represent a ksonnet component to submit a workflow."""

def __init__(self, name, app_dir, component, job_types, include_dirs, params):
Expand All @@ -73,6 +88,14 @@ def __init__(self, name, app_dir, component, job_types, include_dirs, params):
self.include_dirs = include_dirs
self.params = params

class WorkflowPyComponent(object):
"""Datastructure to represent a Python function to submit a workflow."""

def __init__(self, name, py_func, kw_args):
self.name = name
self.py_func = py_func
self.args = kw_args

def _get_src_dir():
return os.path.abspath(os.path.join(__file__, "..",))

Expand All @@ -89,9 +112,13 @@ def parse_config_file(config_file, root_dir):

components = []
for i in results["workflows"]:
components.append(WorkflowComponent(
i["name"], os.path.join(root_dir, i["app_dir"]), i["component"], i.get("job_types", []),
i.get("include_dirs", []), i.get("params", {})))
if i.get("app_dir"):
components.append(WorkflowKSComponent(
i["name"], os.path.join(root_dir, i["app_dir"]), i["component"],
i.get("job_types", []), i.get("include_dirs", []), i.get("params", {})))
if i.get("py_func"):
components.append(WorkflowPyComponent(
i["name"], i["py_func"], i.get("kw_args", {})))
return components

def generate_env_from_head(args):
Expand Down Expand Up @@ -175,43 +202,12 @@ def run(args, file_handler): # pylint: disable=too-many-statements,too-many-bran
workflow_names = []
ui_urls = {}

for w in workflows:
for w in workflows: # pylint: disable=too-many-nested-blocks
# Create the name for the workflow
# We truncate sha numbers to prevent the workflow name from being too large.
# Workflow name should not be more than 63 characters because its used
# as a label on the pods.
workflow_name = os.getenv("JOB_NAME") + "-" + w.name
ks_cmd = ks_util.get_ksonnet_cmd(w.app_dir)

# Print ksonnet version
util.run([ks_cmd, "version"])

# Skip this workflow if it is scoped to a different job type.
if w.job_types and not job_type in w.job_types:
logging.info("Skipping workflow %s because job type %s is not one of "
"%s.", w.name, job_type, w.job_types)
continue

# If we are scoping this workflow to specific directories, check if any files
# modified match the specified regex patterns.
dir_modified = False
if w.include_dirs:
for f in changed_files:
for d in w.include_dirs:
if fnmatch.fnmatch(f, d):
dir_modified = True
logging.info("Triggering workflow %s because %s in dir %s is modified.",
w.name, f, d)
break
if dir_modified:
break

# Only consider modified files when the job is pre or post submit, and if
# the include_dirs stanza is defined.
if job_type != "periodic" and w.include_dirs and not dir_modified:
logging.info("Skipping workflow %s because no code modified in %s.",
w.name, w.include_dirs)
continue

if job_type == "presubmit":
workflow_name += "-{0}".format(os.getenv("PULL_NUMBER"))
Expand All @@ -228,56 +224,111 @@ def run(args, file_handler): # pylint: disable=too-many-statements,too-many-bran
# are submitting jobs manually for testing/debugging. Since the prow should
# vend unique build numbers for each job.
workflow_name += "-{0}".format(salt)

workflow_names.append(workflow_name)
# Create a new environment for this run
env = workflow_name

util.run([ks_cmd, "env", "add", env, "--namespace=" + get_namespace(args)],
cwd=w.app_dir)
# check if ks workflow and run
if hasattr(w, "app_dir"):
ks_cmd = ks_util.get_ksonnet_cmd(w.app_dir)

# Print ksonnet version
util.run([ks_cmd, "version"])

util.run([ks_cmd, "param", "set", "--env=" + env, w.component,
"name", workflow_name],
cwd=w.app_dir)
# Skip this workflow if it is scoped to a different job type.
if w.job_types and not job_type in w.job_types:
logging.info("Skipping workflow %s because job type %s is not one of "
"%s.", w.name, job_type, w.job_types)
continue

# Set the prow environment variables.
prow_env = []
# If we are scoping this workflow to specific directories, check if any files
# modified match the specified regex patterns.
dir_modified = False
if w.include_dirs:
for f in changed_files:
for d in w.include_dirs:
if fnmatch.fnmatch(f, d):
dir_modified = True
logging.info("Triggering workflow %s because %s in dir %s is modified.",
w.name, f, d)
break
if dir_modified:
break

names = ["JOB_NAME", "JOB_TYPE", "BUILD_ID", "BUILD_NUMBER",
"PULL_BASE_SHA", "PULL_NUMBER", "PULL_PULL_SHA", "REPO_OWNER",
"REPO_NAME"]
names.sort()
for v in names:
if not os.getenv(v):
# Only consider modified files when the job is pre or post submit, and if
# the include_dirs stanza is defined.
if job_type != "periodic" and w.include_dirs and not dir_modified:
logging.info("Skipping workflow %s because no code modified in %s.",
w.name, w.include_dirs)
continue
prow_env.append("{0}={1}".format(v, os.getenv(v)))

util.run([ks_cmd, "param", "set", "--env=" + env, w.component, "prow_env",
",".join(prow_env)], cwd=w.app_dir)
util.run([ks_cmd, "param", "set", "--env=" + env, w.component, "namespace",
get_namespace(args)], cwd=w.app_dir)
util.run([ks_cmd, "param", "set", "--env=" + env, w.component, "bucket",
args.bucket], cwd=w.app_dir)
if args.release:
util.run([ks_cmd, "param", "set", "--env=" + env, w.component, "versionTag",
os.getenv("VERSION_TAG")], cwd=w.app_dir)

# Set any extra params. We do this in alphabetical order to make it easier to verify in
# the unittest.
param_names = w.params.keys()
param_names.sort()
for k in param_names:
util.run([ks_cmd, "param", "set", "--env=" + env, w.component, k,
"{0}".format(w.params[k])], cwd=w.app_dir)

# For debugging print out the manifest
util.run([ks_cmd, "show", env, "-c", w.component], cwd=w.app_dir)
util.run([ks_cmd, "apply", env, "-c", w.component], cwd=w.app_dir)

ui_url = ("http://testing-argo.kubeflow.org/workflows/kubeflow-test-infra/{0}"

# Create a new environment for this run
env = workflow_name

util.run([ks_cmd, "env", "add", env, "--namespace=" + get_namespace(args)],
cwd=w.app_dir)

util.run([ks_cmd, "param", "set", "--env=" + env, w.component,
"name", workflow_name],
cwd=w.app_dir)

# Set the prow environment variables.
prow_env = []

names = ["JOB_NAME", "JOB_TYPE", "BUILD_ID", "BUILD_NUMBER",
"PULL_BASE_SHA", "PULL_NUMBER", "PULL_PULL_SHA", "REPO_OWNER",
"REPO_NAME"]
names.sort()
for v in names:
if not os.getenv(v):
continue
prow_env.append("{0}={1}".format(v, os.getenv(v)))

util.run([ks_cmd, "param", "set", "--env=" + env, w.component, "prow_env",
",".join(prow_env)], cwd=w.app_dir)
util.run([ks_cmd, "param", "set", "--env=" + env, w.component, "namespace",
get_namespace(args)], cwd=w.app_dir)
util.run([ks_cmd, "param", "set", "--env=" + env, w.component, "bucket",
args.bucket], cwd=w.app_dir)
if args.release:
util.run([ks_cmd, "param", "set", "--env=" + env, w.component, "versionTag",
os.getenv("VERSION_TAG")], cwd=w.app_dir)

# Set any extra params. We do this in alphabetical order to make it easier to verify in
# the unittest.
param_names = w.params.keys()
param_names.sort()
for k in param_names:
util.run([ks_cmd, "param", "set", "--env=" + env, w.component, k,
"{0}".format(w.params[k])], cwd=w.app_dir)

# For debugging print out the manifest
util.run([ks_cmd, "show", env, "-c", w.component], cwd=w.app_dir)
util.run([ks_cmd, "apply", env, "-c", w.component], cwd=w.app_dir)

ui_url = ("http://testing-argo.kubeflow.org/workflows/kubeflow-test-infra/{0}"
"?tab=workflow".format(workflow_name))
ui_urls[workflow_name] = ui_url
logging.info("URL for workflow: %s", ui_url)
else:
wf_result = py_func_import(w.py_func, w.args)
group, version = wf_result['apiVersion'].split('/')
config.load_kube_config()
k8s_co = k8s_client.CustomObjectsApi()
if "metadata" in wf_result:
if "generateName" in wf_result["metadata"]:
wf_result["metadata"].pop("generateName")
wf_result["metadata"]["name"] = workflow_name
py_func_result = k8s_co.create_namespaced_custom_object(
group=group,
version=version,
namespace=get_namespace(args),
plural='workflows',
body=wf_result)
logging.info("py_func_result: %s", py_func_result)

ui_url = ("http://testing-argo.kubeflow.org/workflows/kubeflow-test-infra/{0}"
"?tab=workflow".format(workflow_name))
ui_urls[workflow_name] = ui_url
logging.info("URL for workflow: %s", ui_url)
ui_urls[workflow_name] = ui_url
logging.info("URL for workflow: %s", ui_url)

# We delay creating started.json until we know the Argo workflow URLs
create_started_file(args.bucket, ui_urls)
Expand Down

0 comments on commit 45511ce

Please sign in to comment.