Skip to content

Commit

Permalink
Allow users to specify file extensions in KubeflowDagRunner. This req…
Browse files Browse the repository at this point in the history
…uires KFP 0.1.31.2

PiperOrigin-RevId: 274031839
  • Loading branch information
tfx-copybara authored and tensorflow-extended-team committed Oct 10, 2019
1 parent fd57edb commit d02e2c9
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 22 deletions.
2 changes: 2 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
* Improved naming of standard component inputs and outputs.
* Improved visualization functionality in the experimental TFX notebook
interface.
* Allowed users to specify output file format when compiling TFX pipelines
using KubeflowDagRunner.

## Bug fixes and other changes
* Bumped test dependency to kfp (Kubeflow Pipelines SDK) to
Expand Down
35 changes: 13 additions & 22 deletions tfx/orchestration/kubeflow/kubeflow_dag_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,12 @@
from __future__ import division
from __future__ import print_function

import contextlib
import io
import os
import tarfile

from kfp import compiler
from kfp import dsl
from kfp import gcp
from typing import Callable, List, Optional, Text
import yaml

from tfx import version
from tfx.orchestration import pipeline as tfx_pipeline
Expand Down Expand Up @@ -162,17 +158,24 @@ class KubeflowDagRunner(tfx_runner.TfxRunner):

def __init__(self,
output_dir: Optional[Text] = None,
output_filename: Optional[Text] = None,
config: Optional[KubeflowDagRunnerConfig] = None):
"""Initializes KubeflowDagRunner for compiling a Kubeflow Pipeline.
Args:
output_dir: An optional output directory into which to output the pipeline
definition files. Defaults to the current working directory.
output_filename: An optional output file name for the pipeline definition
file. Defaults to pipeline_name.tar.gz when compiling a TFX pipeline.
Currently supports .tar.gz, .tgz, .zip, .yaml, .yml formats. See
https://github.com/kubeflow/pipelines/blob/181de66cf9fa87bcd0fe9291926790c400140783/sdk/python/kfp/compiler/compiler.py#L851
for format restriction.
config: An optional KubeflowDagRunnerConfig object to specify runtime
configuration when running the pipeline under Kubeflow.
"""
super(KubeflowDagRunner, self).__init__()
self._output_dir = output_dir or os.getcwd()
self._output_filename = output_filename
self._config = config or KubeflowDagRunnerConfig()
self._compiler = compiler.Compiler()
self._params = [] # List of dsl.PipelineParam used in this pipeline.
Expand Down Expand Up @@ -235,24 +238,12 @@ def _construct_pipeline():
"""
self._construct_pipeline_graph(pipeline, pipeline_root)

workflow = self._compiler.create_workflow(
file_name = self._output_filename or pipeline.pipeline_args[
'pipeline_name'] + '.tar.gz'
# Create workflow spec and write out to package.
self._compiler._create_and_write_workflow( # pylint: disable=protected-access
pipeline_func=_construct_pipeline,
pipeline_name=pipeline.pipeline_args['pipeline_name'],
pipeline_description=pipeline.pipeline_args.get('description', ''),
params_list=self._params)

# default_flow_style is set to false to ensure the generated yaml spec in
# compliance with Argo. Otherwise it might output nested collection using
# {}.
yaml_text = yaml.dump(workflow, default_flow_style=False)

pipeline_name = pipeline.pipeline_args['pipeline_name']
# TODO(b/134680219): Allow users to specify the extension. Specifying
# .yaml will compile the pipeline directly into a YAML file. Kubeflow
# backend recognizes .tar.gz, .zip, and .yaml today.
pipeline_file = os.path.join(self._output_dir, pipeline_name + '.tar.gz')
with tarfile.open(pipeline_file, 'w:gz') as tar:
with contextlib.closing(io.BytesIO(yaml_text.encode())) as yaml_file:
tarinfo = tarfile.TarInfo('pipeline.yaml')
tarinfo.size = len(yaml_file.getvalue())
tar.addfile(tarinfo, fileobj=yaml_file)
params_list=self._params,
package_path=os.path.join(self._output_dir, file_name))

0 comments on commit d02e2c9

Please sign in to comment.