Skip to content

Commit

Permalink
Add volume, volumemount and env to container op (#300)
Browse files Browse the repository at this point in the history
* [WIP] change deployment platform to gcp

* debug

* revert test

* add volume

* update test

* to list

* fix

* to list

* to list

* to list

* to list

* stage

* update

* update

* Undid style changes

* address comments

* update comments
  • Loading branch information
IronPan authored and k8s-ci-robot committed Nov 17, 2018
1 parent ce67707 commit a47eb10
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 50 deletions.
77 changes: 56 additions & 21 deletions sdk/python/kfp/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,19 +144,10 @@ def _op_to_template(self, op):
if op.cpu_request:
template['container']['resources']['requests']['cpu'] = op.cpu_request

if op.gcp_secret:
template['container']['env'] = [
{
'name': 'GOOGLE_APPLICATION_CREDENTIALS',
'value': ('/secret/gcp-credentials/%s.json' % op.gcp_secret),
},
]
template['container']['volumeMounts'] = [
{
'name': op.name + '-gcp-credentials',
'mountPath': '/secret/gcp-credentials',
},
]
if op.env_variables:
template['container']['env'] = list(map(self._convert_k8s_obj_to_dic, op.env_variables))
if op.volume_mounts:
template['container']['volumeMounts'] = list(map(self._convert_k8s_obj_to_dic, op.volume_mounts))
return template

def _get_groups_for_ops(self, root_group):
Expand Down Expand Up @@ -441,14 +432,9 @@ def _create_volumes(self, pipeline):
"""Create volumes required for the templates"""
volumes = []
for op in pipeline.ops.values():
if op.gcp_secret:
volume = {
'name': op.name + '-gcp-credentials',
'secret': {
'secretName': op.gcp_secret,
}
}
volumes.append(volume)
if op.volumes:
for v in op.volumes:
volumes.append(self._convert_k8s_obj_to_dic(v))
volumes.sort(key=lambda x: x['name'])
return volumes

Expand Down Expand Up @@ -548,6 +534,55 @@ def _compile(self, pipeline_func):
workflow = self._create_pipeline_workflow(args_list_with_defaults, p)
return workflow

def _convert_k8s_obj_to_dic(self, obj):
"""
Builds a JSON K8s object.
If obj is None, return None.
If obj is str, int, long, float, bool, return directly.
If obj is datetime.datetime, datetime.date
convert to string in iso8601 format.
If obj is list, sanitize each element in the list.
If obj is dict, return the dict.
If obj is swagger model, return the properties dict.
Args:
obj: The data to serialize.
Returns: The serialized form of data.
"""

from six import text_type, integer_types
PRIMITIVE_TYPES = (float, bool, bytes, text_type) + integer_types
from datetime import date, datetime
if obj is None:
return None
elif isinstance(obj, PRIMITIVE_TYPES):
return obj
elif isinstance(obj, list):
return [self._convert_k8s_obj_to_dic(sub_obj)
for sub_obj in obj]
elif isinstance(obj, tuple):
return tuple(self._convert_k8s_obj_to_dic(sub_obj)
for sub_obj in obj)
elif isinstance(obj, (datetime, date)):
return obj.isoformat()

if isinstance(obj, dict):
obj_dict = obj
else:
# Convert model obj to dict except
# attributes `swagger_types`, `attribute_map`
# and attributes which value is not None.
# Convert attribute name to json key in
# model definition for request.
from six import iteritems
obj_dict = {obj.attribute_map[attr]: getattr(obj, attr)
for attr, _ in iteritems(obj.swagger_types)
if getattr(obj, attr) is not None}

return {key: self._convert_k8s_obj_to_dic(val)
for key, val in iteritems(obj_dict)}

def compile(self, pipeline_func, package_path):
"""Compile the given pipeline function into workflow yaml.
Expand Down
38 changes: 36 additions & 2 deletions sdk/python/kfp/dsl/_container_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ def __init__(self, name: str, image: str, command: str=None, arguments: str=None
file_outputs: Maps output labels to local file paths. At pipeline run time,
the value of a PipelineParam is saved to its corresponding local file. It's
one way for outside world to receive outputs of the container.
gcp_secret: Specifying what secret to mount to the container for accessing
GCP APIs.
is_exit_handler: Whether it is used as an exit handler.
"""

Expand All @@ -60,6 +58,9 @@ def __init__(self, name: str, image: str, command: str=None, arguments: str=None
self.memory_request = None
self.cpu_limit = None
self.cpu_request = None
self.volumes = None
self.volume_mounts = None
self.env_variables = None

matches = []
if arguments:
Expand Down Expand Up @@ -154,5 +155,38 @@ def set_cpu_limit(self, cpu):
self._validate_cpu_string(cpu)
self.cpu_limit = cpu

def set_volumes(self, volumes):
"""Specifying what K8s volumes the container depends on
Args:
volumes: a list of Kubernetes volumes
For detailed spec, check volume definition
https://github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_volume.py
"""

self.volumes = volumes

def set_volume_mounts(self, volume_mounts):
"""Specifying how volumes are mounted to the container
Args:
volume_mounts: a list of Kubernetes volume mounts
For detailed spec, check volume mount definition
https://github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_volume_mount.py
"""

self.volume_mounts = volume_mounts

def set_env_variables(self, env_variables):
"""Set environment variables available in the container.
Args:
env_variables: a list of Kubernetes environment variable
For detailed spec, check environment variable definition
https://github.com/kubernetes-client/python/blob/master/kubernetes/client/models/v1_env_var.py
"""

self.env_variables = env_variables

def __repr__(self):
return str({self.__class__.__name__: self.__dict__})
20 changes: 13 additions & 7 deletions sdk/python/tests/compiler/compiler_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,25 @@
import unittest
import yaml


class TestCompiler(unittest.TestCase):

def test_operator_to_template(self):
"""Test converting operator to template"""

from kubernetes import client as k8s_client

with dsl.Pipeline('somename') as p:
msg1 = dsl.PipelineParam('msg1')
msg2 = dsl.PipelineParam('msg2', value='value2')
op = dsl.ContainerOp(name='echo', image='image', command=['sh', '-c'],
arguments=['echo %s %s | tee /tmp/message.txt' % (msg1, msg2)],
file_outputs={'merged': '/tmp/message.txt'}, gcp_secret='user-gcp-sa')
file_outputs={'merged': '/tmp/message.txt'})
op.set_volume_mounts([k8s_client.V1VolumeMount(
mount_path='/secret/gcp-credentials',
name='gcp-credentials')])
op.set_env_variables([k8s_client.V1EnvVar(
name='GOOGLE_APPLICATION_CREDENTIALS',
value='/secret/gcp-credentials/user-gcp-sa.json')])
golden_output = {
'container': {
'image': 'image',
Expand All @@ -52,7 +59,7 @@ def test_operator_to_template(self):
'volumeMounts':[
{
'mountPath': '/secret/gcp-credentials',
'name': 'echo-gcp-credentials'
'name': 'gcp-credentials',
}
]
},
Expand Down Expand Up @@ -238,7 +245,6 @@ def test_py_compile_default_value(self):
"""Test a pipeline with a parameter with default value."""
self._test_py_compile('default_value')

def test_py_secret(self):
"""Test a pipeline with a GCP secret."""
self._test_py_compile('secret')

def test_py_volume(self):
"""Test a pipeline with a volume and volume mount."""
self._test_py_compile('volume')
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,30 @@


import kfp.dsl as dsl
from kubernetes import client as k8s_client


@dsl.pipeline(
name='GCP Secret',
description='A pipeline with gcp secret.'
name='Volume',
description='A pipeline with volume.'
)
def gcp_secret_pipeline():
def volume_pipeline():
op1 = dsl.ContainerOp(
name='download',
image='google/cloud-sdk',
command=['sh', '-c'],
arguments=['ls | tee /tmp/results.txt'],
gcp_secret='user-gcp-sa',
file_outputs={'downloaded': '/tmp/results.txt'})
op1.set_volumes([k8s_client.V1Volume(name='gcp-credentials',
secret=k8s_client.V1SecretVolumeSource(
secret_name='user-gcp-sa'))])
op1.set_volume_mounts([k8s_client.V1VolumeMount(
mount_path='/secret/gcp-credentials', name='gcp-credentials')])
op1.set_env_variables([k8s_client.V1EnvVar(
name='GOOGLE_APPLICATION_CREDENTIALS',
value='/secret/gcp-credentials/user-gcp-sa.json')])
op2 = dsl.ContainerOp(
name='echo',
image='library/bash',
command=['sh', '-c'],
arguments=['echo %s' % op1.output],
gcp_secret='admin-gcp-sa')
arguments=['echo %s' % op1.output])
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: gcp-secret-
generateName: volume-
spec:
arguments:
parameters: []
entrypoint: gcp-secret
entrypoint: volume
serviceAccountName: pipeline-runner
templates:
- container:
Expand All @@ -33,7 +33,7 @@ spec:
image: google/cloud-sdk
volumeMounts:
- mountPath: /secret/gcp-credentials
name: download-gcp-credentials
name: gcp-credentials
name: download
outputs:
artifacts:
Expand Down Expand Up @@ -73,13 +73,7 @@ spec:
command:
- sh
- -c
env:
- name: GOOGLE_APPLICATION_CREDENTIALS
value: /secret/gcp-credentials/admin-gcp-sa.json
image: library/bash
volumeMounts:
- mountPath: /secret/gcp-credentials
name: echo-gcp-credentials
inputs:
parameters:
- name: download-downloaded
Expand Down Expand Up @@ -124,11 +118,8 @@ spec:
- download
name: echo
template: echo
name: gcp-secret
name: volume
volumes:
- name: download-gcp-credentials
- name: gcp-credentials
secret:
secretName: user-gcp-sa
- name: echo-gcp-credentials
secret:
secretName: admin-gcp-sa

0 comments on commit a47eb10

Please sign in to comment.