diff --git a/sdk/python/kfp/compiler/_component_builder.py b/sdk/python/kfp/compiler/_component_builder.py index 7bd9866d89d..50a7c67b262 100644 --- a/sdk/python/kfp/compiler/_component_builder.py +++ b/sdk/python/kfp/compiler/_component_builder.py @@ -12,17 +12,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -import tarfile -import uuid import os import inspect import re import sys import tempfile import logging +import shutil from collections import OrderedDict from pathlib import Path from ..components._components import _create_task_factory_from_component_spec +from ._container_builder import ContainerBuilder class VersionedDependency(object): """ DependencyVersion specifies the versions """ @@ -94,6 +94,7 @@ def generate_pip_requirements(self, target_file): the generated file follows the order of which the packages are added """ with open(target_file, 'w') as f: for name, version in self.python_packages.items(): + version = self.python_packages[name] version_str = '' if version.has_min_version(): version_str += ' >= ' + version.min_version + ',' @@ -251,106 +252,33 @@ def _func_to_entrypoint(component_func, python_version='python3'): complete_component_code = dedecorated_component_src + '\n' + wrapper_code + '\n' + codegen.end() return complete_component_code -class ImageBuilder(object): +class ComponentBuilder(object): """ Component Builder. """ - def __init__(self, gcs_base, target_image): + def __init__(self, gcs_staging, target_image, namespace): self._arc_docker_filename = 'dockerfile' self._arc_python_filename = 'main.py' self._arc_requirement_filename = 'requirements.txt' - self._tarball_filename = str(uuid.uuid4()) + '.tar.gz' - self._gcs_base = gcs_base - if not self._check_gcs_path(self._gcs_base): - raise Exception('ImageBuild __init__ failure.') - self._gcs_path = os.path.join(self._gcs_base, self._tarball_filename) + self._container_builder = ContainerBuilder(gcs_staging, namespace) self._target_image = target_image - def _wrap_files_in_tarball(self, tarball_path, files={}): - """ _wrap_files_in_tarball creates a tarball for all the input files - with the filename configured as the key of files """ - if not tarball_path.endswith('.tar.gz'): - raise ValueError('the tarball path should end with .tar.gz') - with tarfile.open(tarball_path, 'w:gz') as tarball: - for key, value in files.items(): - tarball.add(value, arcname=key) - - def _prepare_buildfiles(self, local_tarball_path, docker_filename, python_filename=None, requirement_filename=None): + def _prepare_files(self, local_dir, docker_filename, python_filename=None, requirement_filename=None): """ _prepare_buildfiles generates the tarball with all the build files Args: - local_tarball_path (str): generated tarball file + local_dir (dir): a directory that stores all the build files docker_filename (str): docker filename python_filename (str): python filename requirement_filename (str): requirement filename """ - file_lists = {self._arc_docker_filename:docker_filename} + dst_docker_filepath = os.path.join(local_dir, self._arc_docker_filename) + shutil.copyfile(docker_filename, dst_docker_filepath) if python_filename is not None: - file_lists[self._arc_python_filename] = python_filename + dst_python_filepath = os.path.join(local_dir, self._arc_python_filename) + shutil.copyfile(python_filename, dst_python_filepath) if requirement_filename is not None: - file_lists[self._arc_requirement_filename] = requirement_filename - self._wrap_files_in_tarball(local_tarball_path, file_lists) - - def _check_gcs_path(self, gcs_path): - """ _check_gcs_path check both the path validity and write permissions """ - logging.info('Checking path: {}...'.format(gcs_path)) - if not gcs_path.startswith('gs://'): - logging.error('Error: {} should be a GCS path.'.format(gcs_path)) - return False - return True - - def _generate_kaniko_spec(self, namespace, arc_dockerfile_name, gcs_path, target_image): - """_generate_kaniko_yaml generates kaniko job yaml based on a template yaml """ - content = { - 'apiVersion': 'v1', - 'metadata': { - 'generateName': 'kaniko-', - 'namespace': namespace, - }, - 'kind': 'Pod', - 'spec': { - 'restartPolicy': 'Never', - 'containers': [{ - 'name': 'kaniko', - 'args': ['--cache=true', - '--dockerfile=' + arc_dockerfile_name, - '--context=' + gcs_path, - '--destination=' + target_image], - 'image': 'gcr.io/kaniko-project/executor@sha256:78d44ec4e9cb5545d7f85c1924695c89503ded86a59f92c7ae658afa3cff5400', - 'env': [{ - 'name': 'GOOGLE_APPLICATION_CREDENTIALS', - 'value': '/secret/gcp-credentials/user-gcp-sa.json' - }], - 'volumeMounts': [{ - 'mountPath': '/secret/gcp-credentials', - 'name': 'gcp-credentials', - }], - }], - 'volumes': [{ - 'name': 'gcp-credentials', - 'secret': { - 'secretName': 'user-gcp-sa', - }, - }], - 'serviceAccountName': 'default'} - } - return content - - def _build_image(self, local_tarball_path, namespace, timeout): - from ._gcs_helper import GCSHelper - GCSHelper.upload_gcs_file(local_tarball_path, self._gcs_path) - kaniko_spec = self._generate_kaniko_spec(namespace=namespace, - arc_dockerfile_name=self._arc_docker_filename, - gcs_path=self._gcs_path, - target_image=self._target_image) - # Run kaniko job - logging.info('Start a kaniko job for build.') - from ._k8s_helper import K8sHelper - k8s_helper = K8sHelper() - k8s_helper.run_job(kaniko_spec, timeout) - logging.info('Kaniko job complete.') - - # Clean up - GCSHelper.remove_gcs_blob(self._gcs_path) - - def build_image_from_func(self, component_func, namespace, base_image, timeout, dependency, python_version='python3'): + dst_requirement_filepath = os.path.join(local_dir, self._arc_requirement_filename) + shutil.copyfile(requirement_filename, dst_requirement_filepath) + + def build_image_from_func(self, component_func, base_image, timeout, dependency, python_version='python3'): """ build_image builds an image for the given python function args: python_version (str): choose python2 or python3, default is python3 @@ -374,18 +302,13 @@ def build_image_from_func(self, component_func, namespace, base_image, timeout, # Prepare build files logging.info('Generate build files.') - local_tarball_path = os.path.join(local_build_dir, 'docker.tmp.tar.gz') - self._prepare_buildfiles(local_tarball_path, local_docker_filepath, local_python_filepath, local_requirement_filepath) - self._build_image(local_tarball_path, namespace, timeout) + self._container_builder.build(local_build_dir, self._arc_docker_filename, self._target_image, timeout) - def build_image_from_dockerfile(self, docker_filename, timeout, namespace): + def build_image_from_dockerfile(self, docker_filename, timeout): """ build_image_from_dockerfile builds an image based on the dockerfile """ with tempfile.TemporaryDirectory() as local_build_dir: - # Prepare build files - logging.info('Generate build files.') - local_tarball_path = os.path.join(local_build_dir, 'docker.tmp.tar.gz') - self._prepare_buildfiles(local_tarball_path, docker_filename=docker_filename) - self._build_image(local_tarball_path, namespace, timeout) + self._prepare_files(local_build_dir, docker_filename) + self._container_builder.build(local_build_dir, self._arc_docker_filename, self._target_image, timeout) def _configure_logger(logger): """ _configure_logger configures the logger such that the info level logs @@ -487,8 +410,8 @@ def build_python_component(component_func, target_image, base_image=None, depend base_image + ' and push the image to ' + target_image) - builder = ImageBuilder(gcs_base=staging_gcs_path, target_image=target_image) - builder.build_image_from_func(component_func, namespace=namespace, + builder = ComponentBuilder(gcs_staging=staging_gcs_path, target_image=target_image, namespace=namespace) + builder.build_image_from_func(component_func, base_image=base_image, timeout=timeout, python_version=python_version, dependency=dependency) logging.info('Build component complete.') @@ -506,6 +429,6 @@ def build_docker_image(staging_gcs_path, target_image, dockerfile_path, timeout= namespace (str): the namespace within which to run the kubernetes kaniko job, default is "kubeflow" """ _configure_logger(logging.getLogger()) - builder = ImageBuilder(gcs_base=staging_gcs_path, target_image=target_image) - builder.build_image_from_dockerfile(docker_filename=dockerfile_path, timeout=timeout, namespace=namespace) + builder = ComponentBuilder(gcs_staging=staging_gcs_path, target_image=target_image, namespace=namespace) + builder.build_image_from_dockerfile(docker_filename=dockerfile_path, timeout=timeout) logging.info('Build image complete.') diff --git a/sdk/python/kfp/compiler/_container_builder.py b/sdk/python/kfp/compiler/_container_builder.py new file mode 100644 index 00000000000..b1233e9945c --- /dev/null +++ b/sdk/python/kfp/compiler/_container_builder.py @@ -0,0 +1,113 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import tarfile +import tempfile +import os +import uuid + +class ContainerBuilder(object): + """ + ContainerBuilder helps build a container image + """ + def __init__(self, gcs_staging, namespace): + """ + Args: + gcs_staging (str): GCS blob that can store temporary build files + """ + if not gcs_staging.startswith('gs://'): + raise ValueError('Error: {} should be a GCS path.'.format(gcs_staging)) + self._gcs_staging = gcs_staging + self._namespace = namespace + + def _generate_kaniko_spec(self, context, docker_filename, target_image): + """_generate_kaniko_yaml generates kaniko job yaml based on a template yaml """ + content = { + 'apiVersion': 'v1', + 'metadata': { + 'generateName': 'kaniko-', + 'namespace': self._namespace, + }, + 'kind': 'Pod', + 'spec': { + 'restartPolicy': 'Never', + 'containers': [{ + 'name': 'kaniko', + 'args': ['--cache=true', + '--dockerfile=' + docker_filename, + '--context=' + context, + '--destination=' + target_image], + 'image': 'gcr.io/kaniko-project/executor@sha256:78d44ec4e9cb5545d7f85c1924695c89503ded86a59f92c7ae658afa3cff5400', + 'env': [{ + 'name': 'GOOGLE_APPLICATION_CREDENTIALS', + 'value': '/secret/gcp-credentials/user-gcp-sa.json' + }], + 'volumeMounts': [{ + 'mountPath': '/secret/gcp-credentials', + 'name': 'gcp-credentials', + }], + }], + 'volumes': [{ + 'name': 'gcp-credentials', + 'secret': { + 'secretName': 'user-gcp-sa', + }, + }], + 'serviceAccountName': 'default'} + } + return content + + def _wrap_dir_in_tarball(self, tarball_path, dir_name): + """ _wrap_files_in_tarball creates a tarball for all the files in the directory""" + old_wd = os.getcwd() + os.chdir(dir_name) + if not tarball_path.endswith('.tar.gz'): + raise ValueError('the tarball path should end with .tar.gz') + with tarfile.open(tarball_path, 'w:gz') as tarball: + for f in os.listdir(dir_name): + tarball.add(f) + os.chdir(old_wd) + + + def build(self, local_dir, docker_filename, target_image, timeout): + """ + Args: + local_dir (str): local directory that stores all the necessary build files + docker_filename (str): the dockerfile name that is in the local_dir + target_image (str): the target image tag to push the final image. + timeout (int): time out in seconds + """ + # Prepare build context + with tempfile.TemporaryDirectory() as local_build_dir: + from ._gcs_helper import GCSHelper + logging.info('Generate build files.') + local_tarball_path = os.path.join(local_build_dir, 'docker.tmp.tar.gz') + self._wrap_dir_in_tarball(local_tarball_path, local_dir) + # Upload to the context + context = os.path.join(self._gcs_staging, str(uuid.uuid4()) + '.tar.gz') + GCSHelper.upload_gcs_file(local_tarball_path, context) + + # Run kaniko job + kaniko_spec = self._generate_kaniko_spec(context=context, + docker_filename=docker_filename, + target_image=target_image) + logging.info('Start a kaniko job for build.') + from ._k8s_helper import K8sHelper + k8s_helper = K8sHelper() + k8s_helper.run_job(kaniko_spec, timeout) + logging.info('Kaniko job complete.') + + # Clean up + GCSHelper.remove_gcs_blob(context) diff --git a/sdk/python/tests/compiler/component_builder_test.py b/sdk/python/tests/compiler/component_builder_test.py index c5b31348c6a..21fb8ba866b 100644 --- a/sdk/python/tests/compiler/component_builder_test.py +++ b/sdk/python/tests/compiler/component_builder_test.py @@ -14,7 +14,6 @@ from kfp.compiler._component_builder import _generate_dockerfile, _dependency_to_requirements, _func_to_entrypoint from kfp.compiler._component_builder import CodeGenerator -from kfp.compiler._component_builder import ImageBuilder from kfp.compiler._component_builder import VersionedDependency from kfp.compiler._component_builder import DependencyHelper @@ -26,8 +25,6 @@ import inspect from collections import OrderedDict -GCS_BASE = 'gs://kfp-testing/' - class TestVersionedDependency(unittest.TestCase): def test_version(self): @@ -332,48 +329,3 @@ def test_codegen(self): codegen.writeline('print("hello")') generated_codes = codegen.end() self.assertEqual(generated_codes, inspect.getsource(hello)) - -class TestImageBuild(unittest.TestCase): - - def test_wrap_files_in_tarball(self): - """ Test wrap files in a tarball """ - - # prepare - test_data_dir = os.path.join(os.path.dirname(__file__), 'testdata') - temp_file_one = os.path.join(test_data_dir, 'test_data_one.tmp') - temp_file_two = os.path.join(test_data_dir, 'test_data_two.tmp') - temp_tarball = os.path.join(test_data_dir, 'test_data.tmp.tar.gz') - with open(temp_file_one, 'w') as f: - f.write('temporary file one content') - with open(temp_file_two, 'w') as f: - f.write('temporary file two content') - - # check - builder = ImageBuilder(gcs_base=GCS_BASE, target_image='') - builder._wrap_files_in_tarball(temp_tarball, {'dockerfile':temp_file_one, 'main.py':temp_file_two}) - self.assertTrue(os.path.exists(temp_tarball)) - with tarfile.open(temp_tarball) as temp_tarball_handle: - temp_files = temp_tarball_handle.getmembers() - self.assertTrue(len(temp_files) == 2) - for temp_file in temp_files: - self.assertTrue(temp_file.name in ['dockerfile', 'main.py']) - - # clean up - os.remove(temp_file_one) - os.remove(temp_file_two) - os.remove(temp_tarball) - - def test_generate_kaniko_yaml(self): - """ Test generating the kaniko job yaml """ - - # prepare - test_data_dir = os.path.join(os.path.dirname(__file__), 'testdata') - - # check - builder = ImageBuilder(gcs_base=GCS_BASE, target_image='') - generated_yaml = builder._generate_kaniko_spec(namespace='default', arc_dockerfile_name='dockerfile', - gcs_path='gs://mlpipeline/kaniko_build.tar.gz', target_image='gcr.io/mlpipeline/kaniko_image:latest') - with open(os.path.join(test_data_dir, 'kaniko.basic.yaml'), 'r') as f: - golden = yaml.safe_load(f) - - self.assertEqual(golden, generated_yaml) \ No newline at end of file diff --git a/sdk/python/tests/compiler/container_builder_test.py b/sdk/python/tests/compiler/container_builder_test.py new file mode 100644 index 00000000000..debebcc7738 --- /dev/null +++ b/sdk/python/tests/compiler/container_builder_test.py @@ -0,0 +1,65 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import tarfile +import unittest +import yaml +import tempfile +from kfp.compiler._component_builder import ContainerBuilder + +GCS_BASE = 'gs://kfp-testing/' + +class TestContainerBuild(unittest.TestCase): + + def test_wrap_dir_in_tarball(self): + """ Test wrap files in a tarball """ + + # prepare + temp_tarball = os.path.join(os.path.dirname(__file__), 'test_data.tmp.tar.gz') + with tempfile.TemporaryDirectory() as test_data_dir: + temp_file_one = os.path.join(test_data_dir, 'test_data_one.tmp') + temp_file_two = os.path.join(test_data_dir, 'test_data_two.tmp') + with open(temp_file_one, 'w') as f: + f.write('temporary file one content') + with open(temp_file_two, 'w') as f: + f.write('temporary file two content') + + # check + builder = ContainerBuilder(gcs_staging=GCS_BASE, namespace='') + builder._wrap_dir_in_tarball(temp_tarball, test_data_dir) + self.assertTrue(os.path.exists(temp_tarball)) + with tarfile.open(temp_tarball) as temp_tarball_handle: + temp_files = temp_tarball_handle.getmembers() + self.assertTrue(len(temp_files) == 2) + for temp_file in temp_files: + self.assertTrue(temp_file.name in ['test_data_one.tmp', 'test_data_two.tmp']) + + # clean up + os.remove(temp_tarball) + + def test_generate_kaniko_yaml(self): + """ Test generating the kaniko job yaml """ + + # prepare + test_data_dir = os.path.join(os.path.dirname(__file__), 'testdata') + + # check + builder = ContainerBuilder(gcs_staging=GCS_BASE, namespace='default') + generated_yaml = builder._generate_kaniko_spec(docker_filename='dockerfile', + context='gs://mlpipeline/kaniko_build.tar.gz', target_image='gcr.io/mlpipeline/kaniko_image:latest') + with open(os.path.join(test_data_dir, 'kaniko.basic.yaml'), 'r') as f: + golden = yaml.safe_load(f) + + self.assertEqual(golden, generated_yaml) \ No newline at end of file diff --git a/sdk/python/tests/compiler/main.py b/sdk/python/tests/compiler/main.py index 80a15acbedb..d851fd17750 100644 --- a/sdk/python/tests/compiler/main.py +++ b/sdk/python/tests/compiler/main.py @@ -18,6 +18,7 @@ import compiler_tests import component_builder_test +import container_builder_test import k8s_helper_tests @@ -25,6 +26,7 @@ suite = unittest.TestSuite() suite.addTests(unittest.defaultTestLoader.loadTestsFromModule(compiler_tests)) suite.addTests(unittest.defaultTestLoader.loadTestsFromModule(component_builder_test)) + suite.addTests(unittest.defaultTestLoader.loadTestsFromModule(container_builder_test)) suite.addTests(unittest.defaultTestLoader.loadTestsFromModule(k8s_helper_tests)) runner = unittest.TextTestRunner() if not runner.run(suite).wasSuccessful():