From a8374c2758f9e772cf4b7fefab52a5cdf35351d3 Mon Sep 17 00:00:00 2001 From: Greg Sheremeta Date: Sun, 16 Jun 2024 17:23:40 -0400 Subject: [PATCH] feat(Backend + SDK): Update kfp backend and kubernetes sdk to support EmptyDir Update kfp backend and kubernetes sdk to support mounting EmptyDir volumes to task pods. Inspired by #10427 Fixes: #10656 Signed-off-by: Greg Sheremeta --- backend/src/v2/driver/driver.go | 28 +++ backend/src/v2/driver/driver_test.go | 167 +++++++++++++++++- .../python/kfp/kubernetes/__init__.py | 2 + .../python/kfp/kubernetes/empty_dir.py | 56 ++++++ .../test/snapshot/data/empty_dir_mounts.py | 36 ++++ .../test/snapshot/data/empty_dir_mounts.yaml | 60 +++++++ .../python/test/unit/test_empty_dir_mounts.py | 136 ++++++++++++++ 7 files changed, 482 insertions(+), 3 deletions(-) create mode 100644 kubernetes_platform/python/kfp/kubernetes/empty_dir.py create mode 100644 kubernetes_platform/python/test/snapshot/data/empty_dir_mounts.py create mode 100644 kubernetes_platform/python/test/snapshot/data/empty_dir_mounts.yaml create mode 100644 kubernetes_platform/python/test/unit/test_empty_dir_mounts.py diff --git a/backend/src/v2/driver/driver.go b/backend/src/v2/driver/driver.go index ebb194f646e7..e97a7b8cef23 100644 --- a/backend/src/v2/driver/driver.go +++ b/backend/src/v2/driver/driver.go @@ -36,6 +36,7 @@ import ( "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/types/known/structpb" k8score "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" k8sres "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" @@ -665,6 +666,33 @@ func extendPodSpecPatch( podSpec.Volumes = append(podSpec.Volumes, ephemeralVolume) podSpec.Containers[0].VolumeMounts = append(podSpec.Containers[0].VolumeMounts, ephemeralVolumeMount) } + + // EmptyDirMounts + for _, emptyDirVolumeSpec := range kubernetesExecutorConfig.GetEmptyDirMounts() { + var sizeLimitResource *resource.Quantity + if emptyDirVolumeSpec.GetSizeLimit() != "" { + r := k8sres.MustParse(emptyDirVolumeSpec.GetSizeLimit()) + sizeLimitResource = &r + } + + emptyDirVolume := k8score.Volume{ + Name: emptyDirVolumeSpec.GetVolumeName(), + VolumeSource: k8score.VolumeSource{ + EmptyDir: &k8score.EmptyDirVolumeSource{ + Medium: k8score.StorageMedium(emptyDirVolumeSpec.GetMedium()), + SizeLimit: sizeLimitResource, + }, + }, + } + emptyDirVolumeMount := k8score.VolumeMount{ + Name: emptyDirVolumeSpec.GetVolumeName(), + MountPath: emptyDirVolumeSpec.GetMountPath(), + } + + podSpec.Volumes = append(podSpec.Volumes, emptyDirVolume) + podSpec.Containers[0].VolumeMounts = append(podSpec.Containers[0].VolumeMounts, emptyDirVolumeMount) + } + return nil } diff --git a/backend/src/v2/driver/driver_test.go b/backend/src/v2/driver/driver_test.go index bea24890033b..2c9764340af6 100644 --- a/backend/src/v2/driver/driver_test.go +++ b/backend/src/v2/driver/driver_test.go @@ -15,9 +15,11 @@ package driver import ( "encoding/json" + "testing" + + "k8s.io/apimachinery/pkg/api/resource" k8sres "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "testing" "github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec" "github.com/kubeflow/pipelines/backend/src/v2/metadata" @@ -532,7 +534,7 @@ func Test_extendPodSpecPatch_Secret(t *testing.T) { { Name: "secret1", VolumeSource: k8score.VolumeSource{ - Secret: &k8score.SecretVolumeSource{SecretName: "secret1", Optional: &[]bool{false}[0],}, + Secret: &k8score.SecretVolumeSource{SecretName: "secret1", Optional: &[]bool{false}[0]}, }, }, }, @@ -730,7 +732,7 @@ func Test_extendPodSpecPatch_ConfigMap(t *testing.T) { VolumeSource: k8score.VolumeSource{ ConfigMap: &k8score.ConfigMapVolumeSource{ LocalObjectReference: k8score.LocalObjectReference{Name: "cm1"}, - Optional: &[]bool{false}[0],}, + Optional: &[]bool{false}[0]}, }, }, }, @@ -890,6 +892,165 @@ func Test_extendPodSpecPatch_ConfigMap(t *testing.T) { } } +func Test_extendPodSpecPatch_EmptyVolumeMount(t *testing.T) { + medium := "Memory" + sizeLimit := "1Gi" + var sizeLimitResource *resource.Quantity + r := k8sres.MustParse(sizeLimit) + sizeLimitResource = &r + + tests := []struct { + name string + k8sExecCfg *kubernetesplatform.KubernetesExecutorConfig + podSpec *k8score.PodSpec + expected *k8score.PodSpec + }{ + { + "Valid - emptydir mount with no medium or size limit", + &kubernetesplatform.KubernetesExecutorConfig{ + EmptyDirMounts: []*kubernetesplatform.EmptyDirMount{ + { + VolumeName: "emptydir1", + MountPath: "/data/path", + }, + }, + }, + &k8score.PodSpec{ + Containers: []k8score.Container{ + { + Name: "main", + }, + }, + }, + &k8score.PodSpec{ + Containers: []k8score.Container{ + { + Name: "main", + VolumeMounts: []k8score.VolumeMount{ + { + Name: "emptydir1", + MountPath: "/data/path", + }, + }, + }, + }, + Volumes: []k8score.Volume{ + { + Name: "emptydir1", + VolumeSource: k8score.VolumeSource{ + EmptyDir: &k8score.EmptyDirVolumeSource{}, + }, + }, + }, + }, + }, + { + "Valid - emptydir mount with medium and size limit", + &kubernetesplatform.KubernetesExecutorConfig{ + EmptyDirMounts: []*kubernetesplatform.EmptyDirMount{ + { + VolumeName: "emptydir1", + MountPath: "/data/path", + Medium: &medium, + SizeLimit: &sizeLimit, + }, + }, + }, + &k8score.PodSpec{ + Containers: []k8score.Container{ + { + Name: "main", + }, + }, + }, + &k8score.PodSpec{ + Containers: []k8score.Container{ + { + Name: "main", + VolumeMounts: []k8score.VolumeMount{ + { + Name: "emptydir1", + MountPath: "/data/path", + }, + }, + }, + }, + Volumes: []k8score.Volume{ + { + Name: "emptydir1", + VolumeSource: k8score.VolumeSource{ + EmptyDir: &k8score.EmptyDirVolumeSource{ + Medium: k8score.StorageMedium(medium), + SizeLimit: sizeLimitResource, + }, + }, + }, + }, + }, + }, + { + "Valid - multiple emptydir mounts", + &kubernetesplatform.KubernetesExecutorConfig{ + EmptyDirMounts: []*kubernetesplatform.EmptyDirMount{ + { + VolumeName: "emptydir1", + MountPath: "/data/path", + }, + { + VolumeName: "emptydir2", + MountPath: "/data/path2", + }, + }, + }, + &k8score.PodSpec{ + Containers: []k8score.Container{ + { + Name: "main", + }, + }, + }, + &k8score.PodSpec{ + Containers: []k8score.Container{ + { + Name: "main", + VolumeMounts: []k8score.VolumeMount{ + { + Name: "emptydir1", + MountPath: "/data/path", + }, + { + Name: "emptydir2", + MountPath: "/data/path2", + }, + }, + }, + }, + Volumes: []k8score.Volume{ + { + Name: "emptydir1", + VolumeSource: k8score.VolumeSource{ + EmptyDir: &k8score.EmptyDirVolumeSource{}, + }, + }, + { + Name: "emptydir2", + VolumeSource: k8score.VolumeSource{ + EmptyDir: &k8score.EmptyDirVolumeSource{}, + }, + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := extendPodSpecPatch(tt.podSpec, tt.k8sExecCfg, nil, nil) + assert.Nil(t, err) + assert.Equal(t, tt.expected, tt.podSpec) + }) + } +} + func Test_extendPodSpecPatch_ImagePullSecrets(t *testing.T) { tests := []struct { name string diff --git a/kubernetes_platform/python/kfp/kubernetes/__init__.py b/kubernetes_platform/python/kfp/kubernetes/__init__.py index fa149c31c092..91fb119212a2 100644 --- a/kubernetes_platform/python/kfp/kubernetes/__init__.py +++ b/kubernetes_platform/python/kfp/kubernetes/__init__.py @@ -22,6 +22,7 @@ 'add_toleration', 'CreatePVC', 'DeletePVC', + 'empty_dir_mount', 'mount_pvc', 'set_image_pull_policy', 'use_field_path_as_env', @@ -49,3 +50,4 @@ from kfp.kubernetes.volume import CreatePVC from kfp.kubernetes.volume import DeletePVC from kfp.kubernetes.volume import mount_pvc +from kfp.kubernetes.empty_dir import empty_dir_mount diff --git a/kubernetes_platform/python/kfp/kubernetes/empty_dir.py b/kubernetes_platform/python/kfp/kubernetes/empty_dir.py new file mode 100644 index 000000000000..a8873d872cec --- /dev/null +++ b/kubernetes_platform/python/kfp/kubernetes/empty_dir.py @@ -0,0 +1,56 @@ +# Copyright 2024 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Optional + +from google.protobuf import json_format +from kfp.dsl import PipelineTask +from kfp.kubernetes import common +from kfp.kubernetes import kubernetes_executor_config_pb2 as pb + + +def empty_dir_mount( + task: PipelineTask, + volume_name: str, + mount_path: str, + medium: Optional[str] = None, + size_limit: Optional[str] = None, +) -> PipelineTask: + """Mount an EmptyDir volume to the task's container. + + Args: + task: Pipeline task. + volume_name: Name of the EmptyDir volume. + mount_path: Path within the container at which the EmptyDir should be mounted. + medium: Storage medium to back the EmptyDir. Must be one of `Memory` or `HugePages`. Defaults to `None`. + size_limit: Maximum size of the EmptyDir. For example, `5Gi`. Defaults to `None`. + + Returns: + Task object with updated EmptyDir mount configuration. + """ + + msg = common.get_existing_kubernetes_config_as_message(task) + + empty_dir_mount = pb.EmptyDirMount( + volume_name=volume_name, + mount_path=mount_path, + medium=medium, + size_limit=size_limit, + ) + + msg.empty_dir_mounts.append(empty_dir_mount) + + task.platform_config['kubernetes'] = json_format.MessageToDict(msg) + + return task diff --git a/kubernetes_platform/python/test/snapshot/data/empty_dir_mounts.py b/kubernetes_platform/python/test/snapshot/data/empty_dir_mounts.py new file mode 100644 index 000000000000..a86c93d3cb7f --- /dev/null +++ b/kubernetes_platform/python/test/snapshot/data/empty_dir_mounts.py @@ -0,0 +1,36 @@ +# Copyright 2024 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from kfp import dsl +from kfp import kubernetes + + +@dsl.component +def comp(): + pass + +@dsl.pipeline +def my_pipeline(): + task = comp() + kubernetes.empty_dir_mount( + task, + volume_name='emptydir-vol-1', + mount_path='/mnt/my_vol_1', + medium='Memory', + size_limit='1Gi' + ) + +if __name__ == '__main__': + from kfp import compiler + compiler.Compiler().compile(my_pipeline, __file__.replace('.py', '.yaml')) diff --git a/kubernetes_platform/python/test/snapshot/data/empty_dir_mounts.yaml b/kubernetes_platform/python/test/snapshot/data/empty_dir_mounts.yaml new file mode 100644 index 000000000000..072c7af20044 --- /dev/null +++ b/kubernetes_platform/python/test/snapshot/data/empty_dir_mounts.yaml @@ -0,0 +1,60 @@ +# PIPELINE DEFINITION +# Name: my-pipeline +components: + comp-comp: + executorLabel: exec-comp +deploymentSpec: + executors: + exec-comp: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - comp + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.7.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef comp():\n pass\n\n" + image: python:3.7 +pipelineInfo: + name: my-pipeline +root: + dag: + tasks: + comp: + cachingOptions: + enableCache: true + componentRef: + name: comp-comp + taskInfo: + name: comp +schemaVersion: 2.1.0 +sdkVersion: kfp-2.7.0 +--- +platforms: + kubernetes: + deploymentSpec: + executors: + exec-comp: + emptyDirMounts: + - medium: Memory + mountPath: /mnt/my_vol_1 + sizeLimit: 1Gi + volumeName: emptydir-vol-1 diff --git a/kubernetes_platform/python/test/unit/test_empty_dir_mounts.py b/kubernetes_platform/python/test/unit/test_empty_dir_mounts.py new file mode 100644 index 000000000000..dcde2d140c42 --- /dev/null +++ b/kubernetes_platform/python/test/unit/test_empty_dir_mounts.py @@ -0,0 +1,136 @@ +# Copyright 2024 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from google.protobuf import json_format +from kfp import dsl +from kfp import kubernetes + + +class TestEmptyDirMounts: + + def test_add_one(self): + + @dsl.pipeline + def my_pipeline(): + task = comp() + kubernetes.empty_dir_mount( + task, + volume_name='emptydir-vol-1', + mount_path='/mnt/my_vol_1', + medium='Memory', + size_limit='1Gi' + ) + + assert json_format.MessageToDict(my_pipeline.platform_spec) == { + 'platforms': { + 'kubernetes': { + 'deploymentSpec': { + 'executors': { + 'exec-comp': { + 'emptyDirMounts': [{ + 'medium': 'Memory', + 'mountPath': '/mnt/my_vol_1', + 'sizeLimit': '1Gi', + 'volumeName': 'emptydir-vol-1' + }] + } + } + } + } + } + } + + def test_add_two(self): + + @dsl.pipeline + def my_pipeline(): + task = comp() + kubernetes.empty_dir_mount( + task, + volume_name='emptydir-vol-1', + mount_path='/mnt/my_vol_1', + medium='Memory', + size_limit='1Gi' + ) + kubernetes.empty_dir_mount( + task, + volume_name='emptydir-vol-2', + mount_path='/mnt/my_vol_2' + ) + + assert json_format.MessageToDict(my_pipeline.platform_spec) == { + 'platforms': { + 'kubernetes': { + 'deploymentSpec': { + 'executors': { + 'exec-comp': { + 'emptyDirMounts': [{ + 'medium': 'Memory', + 'mountPath': '/mnt/my_vol_1', + 'sizeLimit': '1Gi', + 'volumeName': 'emptydir-vol-1' + }, + { + 'mountPath': '/mnt/my_vol_2', + 'volumeName': 'emptydir-vol-2' + }] + } + } + } + } + } + } + + def test_respects_other_configuration(self): + + @dsl.pipeline + def my_pipeline(): + task = comp() + + kubernetes.empty_dir_mount( + task, + volume_name='emptydir-vol-1', + mount_path='/mnt/my_vol_1', + medium='Memory', + size_limit='1Gi' + ) + + # this should exist too + kubernetes.set_image_pull_secrets(task, ['secret-name']) + + assert json_format.MessageToDict(my_pipeline.platform_spec) == { + 'platforms': { + 'kubernetes': { + 'deploymentSpec': { + 'executors': { + 'exec-comp': { + 'emptyDirMounts': [{ + 'medium': 'Memory', + 'mountPath': '/mnt/my_vol_1', + 'sizeLimit': '1Gi', + 'volumeName': 'emptydir-vol-1' + }], + 'imagePullSecret': [{ + 'secretName': 'secret-name' + }] + } + } + } + } + } + } + +@dsl.component +def comp(): + pass