Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate codegen from containerbuild 2 #1680

Merged
merged 16 commits into from
Jul 30, 2019
125 changes: 24 additions & 101 deletions sdk/python/kfp/compiler/_component_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import tarfile
import uuid
import os
import inspect
import re
import sys
import tempfile
import logging
import shutil
from collections import OrderedDict
from pathlib import Path
from ..components._components import _create_task_factory_from_component_spec
from ._container_builder import ContainerBuilder

class VersionedDependency(object):
""" DependencyVersion specifies the versions """
Expand Down Expand Up @@ -94,6 +94,7 @@ def generate_pip_requirements(self, target_file):
the generated file follows the order of which the packages are added """
with open(target_file, 'w') as f:
for name, version in self.python_packages.items():
version = self.python_packages[name]
version_str = ''
if version.has_min_version():
version_str += ' >= ' + version.min_version + ','
Expand Down Expand Up @@ -251,106 +252,33 @@ def _func_to_entrypoint(component_func, python_version='python3'):
complete_component_code = dedecorated_component_src + '\n' + wrapper_code + '\n' + codegen.end()
return complete_component_code

class ImageBuilder(object):
class ComponentBuilder(object):
""" Component Builder. """
def __init__(self, gcs_base, target_image):
def __init__(self, gcs_staging, target_image, namespace):
self._arc_docker_filename = 'dockerfile'
self._arc_python_filename = 'main.py'
self._arc_requirement_filename = 'requirements.txt'
self._tarball_filename = str(uuid.uuid4()) + '.tar.gz'
self._gcs_base = gcs_base
if not self._check_gcs_path(self._gcs_base):
raise Exception('ImageBuild __init__ failure.')
self._gcs_path = os.path.join(self._gcs_base, self._tarball_filename)
self._container_builder = ContainerBuilder(gcs_staging, namespace)
self._target_image = target_image

def _wrap_files_in_tarball(self, tarball_path, files={}):
""" _wrap_files_in_tarball creates a tarball for all the input files
with the filename configured as the key of files """
if not tarball_path.endswith('.tar.gz'):
raise ValueError('the tarball path should end with .tar.gz')
with tarfile.open(tarball_path, 'w:gz') as tarball:
for key, value in files.items():
tarball.add(value, arcname=key)

def _prepare_buildfiles(self, local_tarball_path, docker_filename, python_filename=None, requirement_filename=None):
def _prepare_files(self, local_dir, docker_filename, python_filename=None, requirement_filename=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is only called once and most of the parameters are not used in that call. Maybe we should inline it and save 15 lines.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm declaring the function to make it easier in the future to prepare build files.

""" _prepare_buildfiles generates the tarball with all the build files
Args:
local_tarball_path (str): generated tarball file
local_dir (dir): a directory that stores all the build files
docker_filename (str): docker filename
python_filename (str): python filename
requirement_filename (str): requirement filename
"""
file_lists = {self._arc_docker_filename:docker_filename}
dst_docker_filepath = os.path.join(local_dir, self._arc_docker_filename)
shutil.copyfile(docker_filename, dst_docker_filepath)
if python_filename is not None:
file_lists[self._arc_python_filename] = python_filename
dst_python_filepath = os.path.join(local_dir, self._arc_python_filename)
shutil.copyfile(python_filename, dst_python_filepath)
if requirement_filename is not None:
file_lists[self._arc_requirement_filename] = requirement_filename
self._wrap_files_in_tarball(local_tarball_path, file_lists)

def _check_gcs_path(self, gcs_path):
""" _check_gcs_path check both the path validity and write permissions """
logging.info('Checking path: {}...'.format(gcs_path))
if not gcs_path.startswith('gs://'):
logging.error('Error: {} should be a GCS path.'.format(gcs_path))
return False
return True

def _generate_kaniko_spec(self, namespace, arc_dockerfile_name, gcs_path, target_image):
"""_generate_kaniko_yaml generates kaniko job yaml based on a template yaml """
content = {
'apiVersion': 'v1',
'metadata': {
'generateName': 'kaniko-',
'namespace': namespace,
},
'kind': 'Pod',
'spec': {
'restartPolicy': 'Never',
'containers': [{
'name': 'kaniko',
'args': ['--cache=true',
'--dockerfile=' + arc_dockerfile_name,
'--context=' + gcs_path,
'--destination=' + target_image],
'image': 'gcr.io/kaniko-project/executor@sha256:78d44ec4e9cb5545d7f85c1924695c89503ded86a59f92c7ae658afa3cff5400',
'env': [{
'name': 'GOOGLE_APPLICATION_CREDENTIALS',
'value': '/secret/gcp-credentials/user-gcp-sa.json'
}],
'volumeMounts': [{
'mountPath': '/secret/gcp-credentials',
'name': 'gcp-credentials',
}],
}],
'volumes': [{
'name': 'gcp-credentials',
'secret': {
'secretName': 'user-gcp-sa',
},
}],
'serviceAccountName': 'default'}
}
return content

def _build_image(self, local_tarball_path, namespace, timeout):
from ._gcs_helper import GCSHelper
GCSHelper.upload_gcs_file(local_tarball_path, self._gcs_path)
kaniko_spec = self._generate_kaniko_spec(namespace=namespace,
arc_dockerfile_name=self._arc_docker_filename,
gcs_path=self._gcs_path,
target_image=self._target_image)
# Run kaniko job
logging.info('Start a kaniko job for build.')
from ._k8s_helper import K8sHelper
k8s_helper = K8sHelper()
k8s_helper.run_job(kaniko_spec, timeout)
logging.info('Kaniko job complete.')

# Clean up
GCSHelper.remove_gcs_blob(self._gcs_path)

def build_image_from_func(self, component_func, namespace, base_image, timeout, dependency, python_version='python3'):
dst_requirement_filepath = os.path.join(local_dir, self._arc_requirement_filename)
shutil.copyfile(requirement_filename, dst_requirement_filepath)

def build_image_from_func(self, component_func, base_image, timeout, dependency, python_version='python3'):
""" build_image builds an image for the given python function
args:
python_version (str): choose python2 or python3, default is python3
Expand All @@ -374,18 +302,13 @@ def build_image_from_func(self, component_func, namespace, base_image, timeout,

# Prepare build files
logging.info('Generate build files.')
local_tarball_path = os.path.join(local_build_dir, 'docker.tmp.tar.gz')
self._prepare_buildfiles(local_tarball_path, local_docker_filepath, local_python_filepath, local_requirement_filepath)
self._build_image(local_tarball_path, namespace, timeout)
self._container_builder.build(local_build_dir, self._arc_docker_filename, self._target_image, timeout)

def build_image_from_dockerfile(self, docker_filename, timeout, namespace):
def build_image_from_dockerfile(self, docker_filename, timeout):
""" build_image_from_dockerfile builds an image based on the dockerfile """
with tempfile.TemporaryDirectory() as local_build_dir:
# Prepare build files
logging.info('Generate build files.')
local_tarball_path = os.path.join(local_build_dir, 'docker.tmp.tar.gz')
self._prepare_buildfiles(local_tarball_path, docker_filename=docker_filename)
self._build_image(local_tarball_path, namespace, timeout)
self._prepare_files(local_build_dir, docker_filename)
self._container_builder.build(local_build_dir, self._arc_docker_filename, self._target_image, timeout)

def _configure_logger(logger):
""" _configure_logger configures the logger such that the info level logs
Expand Down Expand Up @@ -487,8 +410,8 @@ def build_python_component(component_func, target_image, base_image=None, depend
base_image +
' and push the image to ' +
target_image)
builder = ImageBuilder(gcs_base=staging_gcs_path, target_image=target_image)
builder.build_image_from_func(component_func, namespace=namespace,
builder = ComponentBuilder(gcs_staging=staging_gcs_path, target_image=target_image, namespace=namespace)
builder.build_image_from_func(component_func,
base_image=base_image, timeout=timeout,
python_version=python_version, dependency=dependency)
logging.info('Build component complete.')
Expand All @@ -506,6 +429,6 @@ def build_docker_image(staging_gcs_path, target_image, dockerfile_path, timeout=
namespace (str): the namespace within which to run the kubernetes kaniko job, default is "kubeflow"
"""
_configure_logger(logging.getLogger())
builder = ImageBuilder(gcs_base=staging_gcs_path, target_image=target_image)
builder.build_image_from_dockerfile(docker_filename=dockerfile_path, timeout=timeout, namespace=namespace)
builder = ComponentBuilder(gcs_staging=staging_gcs_path, target_image=target_image, namespace=namespace)
builder.build_image_from_dockerfile(docker_filename=dockerfile_path, timeout=timeout)
logging.info('Build image complete.')
113 changes: 113 additions & 0 deletions sdk/python/kfp/compiler/_container_builder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import tarfile
import tempfile
import os
import uuid

class ContainerBuilder(object):
"""
ContainerBuilder helps build a container image
hongye-sun marked this conversation as resolved.
Show resolved Hide resolved
"""
def __init__(self, gcs_staging, namespace):
"""
Args:
gcs_staging (str): GCS blob that can store temporary build files
"""
if not gcs_staging.startswith('gs://'):
raise ValueError('Error: {} should be a GCS path.'.format(gcs_staging))
self._gcs_staging = gcs_staging
self._namespace = namespace

def _generate_kaniko_spec(self, context, docker_filename, target_image):
"""_generate_kaniko_yaml generates kaniko job yaml based on a template yaml """
content = {
'apiVersion': 'v1',
'metadata': {
'generateName': 'kaniko-',
'namespace': self._namespace,
},
'kind': 'Pod',
'spec': {
'restartPolicy': 'Never',
'containers': [{
'name': 'kaniko',
'args': ['--cache=true',
'--dockerfile=' + docker_filename,
'--context=' + context,
'--destination=' + target_image],
'image': 'gcr.io/kaniko-project/executor@sha256:78d44ec4e9cb5545d7f85c1924695c89503ded86a59f92c7ae658afa3cff5400',
'env': [{
'name': 'GOOGLE_APPLICATION_CREDENTIALS',
gaoning777 marked this conversation as resolved.
Show resolved Hide resolved
'value': '/secret/gcp-credentials/user-gcp-sa.json'
}],
'volumeMounts': [{
'mountPath': '/secret/gcp-credentials',
'name': 'gcp-credentials',
}],
}],
'volumes': [{
'name': 'gcp-credentials',
'secret': {
'secretName': 'user-gcp-sa',
},
}],
'serviceAccountName': 'default'}
}
return content

def _wrap_dir_in_tarball(self, tarball_path, dir_name):
""" _wrap_files_in_tarball creates a tarball for all the files in the directory"""
old_wd = os.getcwd()
os.chdir(dir_name)
gaoning777 marked this conversation as resolved.
Show resolved Hide resolved
if not tarball_path.endswith('.tar.gz'):
raise ValueError('the tarball path should end with .tar.gz')
gaoning777 marked this conversation as resolved.
Show resolved Hide resolved
with tarfile.open(tarball_path, 'w:gz') as tarball:
for f in os.listdir(dir_name):
tarball.add(f)
os.chdir(old_wd)


def build(self, local_dir, docker_filename, target_image, timeout):
"""
Args:
local_dir (str): local directory that stores all the necessary build files
docker_filename (str): the dockerfile name that is in the local_dir
target_image (str): the target image tag to push the final image.
timeout (int): time out in seconds
"""
# Prepare build context
with tempfile.TemporaryDirectory() as local_build_dir:
from ._gcs_helper import GCSHelper
gaoning777 marked this conversation as resolved.
Show resolved Hide resolved
logging.info('Generate build files.')
local_tarball_path = os.path.join(local_build_dir, 'docker.tmp.tar.gz')
self._wrap_dir_in_tarball(local_tarball_path, local_dir)
# Upload to the context
context = os.path.join(self._gcs_staging, str(uuid.uuid4()) + '.tar.gz')
GCSHelper.upload_gcs_file(local_tarball_path, context)

# Run kaniko job
kaniko_spec = self._generate_kaniko_spec(context=context,
docker_filename=docker_filename,
target_image=target_image)
logging.info('Start a kaniko job for build.')
from ._k8s_helper import K8sHelper
k8s_helper = K8sHelper()
k8s_helper.run_job(kaniko_spec, timeout)
logging.info('Kaniko job complete.')

# Clean up
GCSHelper.remove_gcs_blob(context)
48 changes: 0 additions & 48 deletions sdk/python/tests/compiler/component_builder_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

from kfp.compiler._component_builder import _generate_dockerfile, _dependency_to_requirements, _func_to_entrypoint
from kfp.compiler._component_builder import CodeGenerator
from kfp.compiler._component_builder import ImageBuilder
from kfp.compiler._component_builder import VersionedDependency
from kfp.compiler._component_builder import DependencyHelper

Expand All @@ -26,8 +25,6 @@
import inspect
from collections import OrderedDict

GCS_BASE = 'gs://kfp-testing/'

class TestVersionedDependency(unittest.TestCase):

def test_version(self):
Expand Down Expand Up @@ -332,48 +329,3 @@ def test_codegen(self):
codegen.writeline('print("hello")')
generated_codes = codegen.end()
self.assertEqual(generated_codes, inspect.getsource(hello))

class TestImageBuild(unittest.TestCase):

def test_wrap_files_in_tarball(self):
""" Test wrap files in a tarball """

# prepare
test_data_dir = os.path.join(os.path.dirname(__file__), 'testdata')
temp_file_one = os.path.join(test_data_dir, 'test_data_one.tmp')
temp_file_two = os.path.join(test_data_dir, 'test_data_two.tmp')
temp_tarball = os.path.join(test_data_dir, 'test_data.tmp.tar.gz')
with open(temp_file_one, 'w') as f:
f.write('temporary file one content')
with open(temp_file_two, 'w') as f:
f.write('temporary file two content')

# check
builder = ImageBuilder(gcs_base=GCS_BASE, target_image='')
builder._wrap_files_in_tarball(temp_tarball, {'dockerfile':temp_file_one, 'main.py':temp_file_two})
self.assertTrue(os.path.exists(temp_tarball))
with tarfile.open(temp_tarball) as temp_tarball_handle:
temp_files = temp_tarball_handle.getmembers()
self.assertTrue(len(temp_files) == 2)
for temp_file in temp_files:
self.assertTrue(temp_file.name in ['dockerfile', 'main.py'])

# clean up
os.remove(temp_file_one)
os.remove(temp_file_two)
os.remove(temp_tarball)

def test_generate_kaniko_yaml(self):
""" Test generating the kaniko job yaml """

# prepare
test_data_dir = os.path.join(os.path.dirname(__file__), 'testdata')

# check
builder = ImageBuilder(gcs_base=GCS_BASE, target_image='')
generated_yaml = builder._generate_kaniko_spec(namespace='default', arc_dockerfile_name='dockerfile',
gcs_path='gs://mlpipeline/kaniko_build.tar.gz', target_image='gcr.io/mlpipeline/kaniko_image:latest')
with open(os.path.join(test_data_dir, 'kaniko.basic.yaml'), 'r') as f:
golden = yaml.safe_load(f)

self.assertEqual(golden, generated_yaml)
Loading