diff --git a/prow_config.yaml b/prow_config.yaml index 13665e3c4a3..b5a92a1a912 100644 --- a/prow_config.yaml +++ b/prow_config.yaml @@ -4,3 +4,10 @@ workflows: - app_dir: kubeflow/testing/workflows component: workflows name: unittests + + - py_func: kubeflow.testing.ci.kf_unittests.create_workflow + name: pyfunctest + # can optionally take keyword arguments + # kw_args: + # a: 1 + # b: 2 diff --git a/py/kubeflow/testing/ci/__init__.py b/py/kubeflow/testing/ci/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/py/kubeflow/testing/ci/kf_unittests.py b/py/kubeflow/testing/ci/kf_unittests.py new file mode 100644 index 00000000000..74efb2f4065 --- /dev/null +++ b/py/kubeflow/testing/ci/kf_unittests.py @@ -0,0 +1,15 @@ +"""Test Argo workflow""" + +import requests +import yaml + +def create_workflow(): + """ Loads Argo example YAML and returns dictionary object """ + # TODO: define workflow to run unittests + argo_hello_world = requests.get("https://raw.githubusercontent.com/argoproj/" + "argo/master/examples/hello-world.yaml") + yaml_result = yaml.safe_load(argo_hello_world.text) + return yaml_result + +if __name__ == "__main__": + create_workflow() diff --git a/py/kubeflow/testing/run_e2e_workflow.py b/py/kubeflow/testing/run_e2e_workflow.py index 999f3ac31f1..7916001d51f 100644 --- a/py/kubeflow/testing/run_e2e_workflow.py +++ b/py/kubeflow/testing/run_e2e_workflow.py @@ -3,7 +3,7 @@ This script submits Argo workflows to run the E2E tests and waits for them to finish. It is intended to be invoked by prow jobs. -It requires the workflow to be expressed as a ksonnet app. +It requires the workflow to be expressed as a ksonnet app or a Python function. The script can take a config file via --config_file. The --config_file is expected to be a YAML file as follows: @@ -17,9 +17,10 @@ include_dirs: tensorflow/* - - name: lint - app_dir: kubeflow/kubeflow/testing/workflows - component: workflows + - name: workflow-test + py_func: my_test_package.my_test_module.my_test_workflow + kw_args: + arg1: argument app_dir is expected to be in the form of {REPO_OWNER}/{REPO_NAME}/path/to/ksonnet/app @@ -33,6 +34,10 @@ include_dirs (optional) is an array of strings that specify which directories, if modified, should run this workflow. +py_func is the Python method to invoke Argo workflows + +kw_args is an array of arguments passed to the Python method + The script expects that the directories {repos_dir}/{app_dir} exists. Where repos_dir is provided as a command line argument. @@ -41,17 +46,20 @@ import argparse import datetime import fnmatch +import importlib import logging import os import tempfile -from kubeflow.testing import argo_client -from kubeflow.testing import ks_util -from kubeflow.testing import prow_artifacts -from kubeflow.testing import util import uuid import subprocess import sys import yaml +from kubernetes import config +from kubernetes import client as k8s_client +from kubeflow.testing import argo_client +from kubeflow.testing import ks_util +from kubeflow.testing import prow_artifacts +from kubeflow.testing import util # The namespace to launch the Argo workflow in. def get_namespace(args): @@ -62,7 +70,14 @@ def get_namespace(args): return "kubeflow-releasing" return "kubeflow-test-infra" -class WorkflowComponent(object): +# imports py_func +def py_func_import(py_func, kwargs): + path, module = py_func.rsplit('.', 1) + mod = importlib.import_module(path) + met = getattr(mod, module) + return met(**kwargs) + +class WorkflowKSComponent(object): """Datastructure to represent a ksonnet component to submit a workflow.""" def __init__(self, name, app_dir, component, job_types, include_dirs, params): @@ -73,6 +88,14 @@ def __init__(self, name, app_dir, component, job_types, include_dirs, params): self.include_dirs = include_dirs self.params = params +class WorkflowPyComponent(object): + """Datastructure to represent a Python function to submit a workflow.""" + + def __init__(self, name, py_func, kw_args): + self.name = name + self.py_func = py_func + self.args = kw_args + def _get_src_dir(): return os.path.abspath(os.path.join(__file__, "..",)) @@ -89,9 +112,13 @@ def parse_config_file(config_file, root_dir): components = [] for i in results["workflows"]: - components.append(WorkflowComponent( - i["name"], os.path.join(root_dir, i["app_dir"]), i["component"], i.get("job_types", []), - i.get("include_dirs", []), i.get("params", {}))) + if i.get("app_dir"): + components.append(WorkflowKSComponent( + i["name"], os.path.join(root_dir, i["app_dir"]), i["component"], + i.get("job_types", []), i.get("include_dirs", []), i.get("params", {}))) + if i.get("py_func"): + components.append(WorkflowPyComponent( + i["name"], i["py_func"], i.get("kw_args", {}))) return components def generate_env_from_head(args): @@ -175,43 +202,12 @@ def run(args, file_handler): # pylint: disable=too-many-statements,too-many-bran workflow_names = [] ui_urls = {} - for w in workflows: + for w in workflows: # pylint: disable=too-many-nested-blocks # Create the name for the workflow # We truncate sha numbers to prevent the workflow name from being too large. # Workflow name should not be more than 63 characters because its used # as a label on the pods. workflow_name = os.getenv("JOB_NAME") + "-" + w.name - ks_cmd = ks_util.get_ksonnet_cmd(w.app_dir) - - # Print ksonnet version - util.run([ks_cmd, "version"]) - - # Skip this workflow if it is scoped to a different job type. - if w.job_types and not job_type in w.job_types: - logging.info("Skipping workflow %s because job type %s is not one of " - "%s.", w.name, job_type, w.job_types) - continue - - # If we are scoping this workflow to specific directories, check if any files - # modified match the specified regex patterns. - dir_modified = False - if w.include_dirs: - for f in changed_files: - for d in w.include_dirs: - if fnmatch.fnmatch(f, d): - dir_modified = True - logging.info("Triggering workflow %s because %s in dir %s is modified.", - w.name, f, d) - break - if dir_modified: - break - - # Only consider modified files when the job is pre or post submit, and if - # the include_dirs stanza is defined. - if job_type != "periodic" and w.include_dirs and not dir_modified: - logging.info("Skipping workflow %s because no code modified in %s.", - w.name, w.include_dirs) - continue if job_type == "presubmit": workflow_name += "-{0}".format(os.getenv("PULL_NUMBER")) @@ -228,56 +224,111 @@ def run(args, file_handler): # pylint: disable=too-many-statements,too-many-bran # are submitting jobs manually for testing/debugging. Since the prow should # vend unique build numbers for each job. workflow_name += "-{0}".format(salt) - workflow_names.append(workflow_name) - # Create a new environment for this run - env = workflow_name - util.run([ks_cmd, "env", "add", env, "--namespace=" + get_namespace(args)], - cwd=w.app_dir) + # check if ks workflow and run + if hasattr(w, "app_dir"): + ks_cmd = ks_util.get_ksonnet_cmd(w.app_dir) + + # Print ksonnet version + util.run([ks_cmd, "version"]) - util.run([ks_cmd, "param", "set", "--env=" + env, w.component, - "name", workflow_name], - cwd=w.app_dir) + # Skip this workflow if it is scoped to a different job type. + if w.job_types and not job_type in w.job_types: + logging.info("Skipping workflow %s because job type %s is not one of " + "%s.", w.name, job_type, w.job_types) + continue - # Set the prow environment variables. - prow_env = [] + # If we are scoping this workflow to specific directories, check if any files + # modified match the specified regex patterns. + dir_modified = False + if w.include_dirs: + for f in changed_files: + for d in w.include_dirs: + if fnmatch.fnmatch(f, d): + dir_modified = True + logging.info("Triggering workflow %s because %s in dir %s is modified.", + w.name, f, d) + break + if dir_modified: + break - names = ["JOB_NAME", "JOB_TYPE", "BUILD_ID", "BUILD_NUMBER", - "PULL_BASE_SHA", "PULL_NUMBER", "PULL_PULL_SHA", "REPO_OWNER", - "REPO_NAME"] - names.sort() - for v in names: - if not os.getenv(v): + # Only consider modified files when the job is pre or post submit, and if + # the include_dirs stanza is defined. + if job_type != "periodic" and w.include_dirs and not dir_modified: + logging.info("Skipping workflow %s because no code modified in %s.", + w.name, w.include_dirs) continue - prow_env.append("{0}={1}".format(v, os.getenv(v))) - - util.run([ks_cmd, "param", "set", "--env=" + env, w.component, "prow_env", - ",".join(prow_env)], cwd=w.app_dir) - util.run([ks_cmd, "param", "set", "--env=" + env, w.component, "namespace", - get_namespace(args)], cwd=w.app_dir) - util.run([ks_cmd, "param", "set", "--env=" + env, w.component, "bucket", - args.bucket], cwd=w.app_dir) - if args.release: - util.run([ks_cmd, "param", "set", "--env=" + env, w.component, "versionTag", - os.getenv("VERSION_TAG")], cwd=w.app_dir) - - # Set any extra params. We do this in alphabetical order to make it easier to verify in - # the unittest. - param_names = w.params.keys() - param_names.sort() - for k in param_names: - util.run([ks_cmd, "param", "set", "--env=" + env, w.component, k, - "{0}".format(w.params[k])], cwd=w.app_dir) - - # For debugging print out the manifest - util.run([ks_cmd, "show", env, "-c", w.component], cwd=w.app_dir) - util.run([ks_cmd, "apply", env, "-c", w.component], cwd=w.app_dir) - - ui_url = ("http://testing-argo.kubeflow.org/workflows/kubeflow-test-infra/{0}" + + # Create a new environment for this run + env = workflow_name + + util.run([ks_cmd, "env", "add", env, "--namespace=" + get_namespace(args)], + cwd=w.app_dir) + + util.run([ks_cmd, "param", "set", "--env=" + env, w.component, + "name", workflow_name], + cwd=w.app_dir) + + # Set the prow environment variables. + prow_env = [] + + names = ["JOB_NAME", "JOB_TYPE", "BUILD_ID", "BUILD_NUMBER", + "PULL_BASE_SHA", "PULL_NUMBER", "PULL_PULL_SHA", "REPO_OWNER", + "REPO_NAME"] + names.sort() + for v in names: + if not os.getenv(v): + continue + prow_env.append("{0}={1}".format(v, os.getenv(v))) + + util.run([ks_cmd, "param", "set", "--env=" + env, w.component, "prow_env", + ",".join(prow_env)], cwd=w.app_dir) + util.run([ks_cmd, "param", "set", "--env=" + env, w.component, "namespace", + get_namespace(args)], cwd=w.app_dir) + util.run([ks_cmd, "param", "set", "--env=" + env, w.component, "bucket", + args.bucket], cwd=w.app_dir) + if args.release: + util.run([ks_cmd, "param", "set", "--env=" + env, w.component, "versionTag", + os.getenv("VERSION_TAG")], cwd=w.app_dir) + + # Set any extra params. We do this in alphabetical order to make it easier to verify in + # the unittest. + param_names = w.params.keys() + param_names.sort() + for k in param_names: + util.run([ks_cmd, "param", "set", "--env=" + env, w.component, k, + "{0}".format(w.params[k])], cwd=w.app_dir) + + # For debugging print out the manifest + util.run([ks_cmd, "show", env, "-c", w.component], cwd=w.app_dir) + util.run([ks_cmd, "apply", env, "-c", w.component], cwd=w.app_dir) + + ui_url = ("http://testing-argo.kubeflow.org/workflows/kubeflow-test-infra/{0}" + "?tab=workflow".format(workflow_name)) + ui_urls[workflow_name] = ui_url + logging.info("URL for workflow: %s", ui_url) + else: + wf_result = py_func_import(w.py_func, w.args) + group, version = wf_result['apiVersion'].split('/') + config.load_kube_config() + k8s_co = k8s_client.CustomObjectsApi() + if "metadata" in wf_result: + if "generateName" in wf_result["metadata"]: + wf_result["metadata"].pop("generateName") + wf_result["metadata"]["name"] = workflow_name + py_func_result = k8s_co.create_namespaced_custom_object( + group=group, + version=version, + namespace=get_namespace(args), + plural='workflows', + body=wf_result) + logging.info("py_func_result: %s", py_func_result) + + ui_url = ("http://testing-argo.kubeflow.org/workflows/kubeflow-test-infra/{0}" "?tab=workflow".format(workflow_name)) - ui_urls[workflow_name] = ui_url - logging.info("URL for workflow: %s", ui_url) + ui_urls[workflow_name] = ui_url + logging.info("URL for workflow: %s", ui_url) # We delay creating started.json until we know the Argo workflow URLs create_started_file(args.bucket, ui_urls)