Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Backend + SDK): Update kfp backend and kubernetes sdk to support EmptyDir #10913

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions backend/src/v2/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,33 @@ func extendPodSpecPatch(
podSpec.Volumes = append(podSpec.Volumes, ephemeralVolume)
podSpec.Containers[0].VolumeMounts = append(podSpec.Containers[0].VolumeMounts, ephemeralVolumeMount)
}

// EmptyDirMounts
for _, emptyDirVolumeSpec := range kubernetesExecutorConfig.GetEmptyDirMounts() {
var sizeLimitResource *k8sres.Quantity
gregsheremeta marked this conversation as resolved.
Show resolved Hide resolved
gregsheremeta marked this conversation as resolved.
Show resolved Hide resolved
if emptyDirVolumeSpec.GetSizeLimit() != "" {
r := k8sres.MustParse(emptyDirVolumeSpec.GetSizeLimit())
sizeLimitResource = &r
}

emptyDirVolume := k8score.Volume{
Name: emptyDirVolumeSpec.GetVolumeName(),
VolumeSource: k8score.VolumeSource{
EmptyDir: &k8score.EmptyDirVolumeSource{
Medium: k8score.StorageMedium(emptyDirVolumeSpec.GetMedium()),
SizeLimit: sizeLimitResource,
},
},
}
emptyDirVolumeMount := k8score.VolumeMount{
Name: emptyDirVolumeSpec.GetVolumeName(),
MountPath: emptyDirVolumeSpec.GetMountPath(),
}

podSpec.Volumes = append(podSpec.Volumes, emptyDirVolume)
podSpec.Containers[0].VolumeMounts = append(podSpec.Containers[0].VolumeMounts, emptyDirVolumeMount)
gregsheremeta marked this conversation as resolved.
Show resolved Hide resolved
}
gregsheremeta marked this conversation as resolved.
Show resolved Hide resolved

return nil
}

Expand Down
166 changes: 163 additions & 3 deletions backend/src/v2/driver/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ package driver

import (
"encoding/json"
"testing"

k8sres "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"testing"

"github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec"
"github.com/kubeflow/pipelines/backend/src/v2/metadata"
Expand Down Expand Up @@ -532,7 +533,7 @@ func Test_extendPodSpecPatch_Secret(t *testing.T) {
{
Name: "secret1",
VolumeSource: k8score.VolumeSource{
Secret: &k8score.SecretVolumeSource{SecretName: "secret1", Optional: &[]bool{false}[0],},
Secret: &k8score.SecretVolumeSource{SecretName: "secret1", Optional: &[]bool{false}[0]},
},
},
},
Expand Down Expand Up @@ -730,7 +731,7 @@ func Test_extendPodSpecPatch_ConfigMap(t *testing.T) {
VolumeSource: k8score.VolumeSource{
ConfigMap: &k8score.ConfigMapVolumeSource{
LocalObjectReference: k8score.LocalObjectReference{Name: "cm1"},
Optional: &[]bool{false}[0],},
Optional: &[]bool{false}[0]},
},
},
},
Expand Down Expand Up @@ -890,6 +891,165 @@ func Test_extendPodSpecPatch_ConfigMap(t *testing.T) {
}
}

func Test_extendPodSpecPatch_EmptyVolumeMount(t *testing.T) {
medium := "Memory"
sizeLimit := "1Gi"
var sizeLimitResource *k8sres.Quantity
r := k8sres.MustParse(sizeLimit)
sizeLimitResource = &r

tests := []struct {
name string
k8sExecCfg *kubernetesplatform.KubernetesExecutorConfig
podSpec *k8score.PodSpec
expected *k8score.PodSpec
}{
{
"Valid - emptydir mount with no medium or size limit",
&kubernetesplatform.KubernetesExecutorConfig{
EmptyDirMounts: []*kubernetesplatform.EmptyDirMount{
{
VolumeName: "emptydir1",
MountPath: "/data/path",
},
},
},
&k8score.PodSpec{
Containers: []k8score.Container{
{
Name: "main",
},
},
},
&k8score.PodSpec{
Containers: []k8score.Container{
{
Name: "main",
VolumeMounts: []k8score.VolumeMount{
{
Name: "emptydir1",
MountPath: "/data/path",
},
},
},
},
Volumes: []k8score.Volume{
{
Name: "emptydir1",
VolumeSource: k8score.VolumeSource{
EmptyDir: &k8score.EmptyDirVolumeSource{},
},
},
},
},
},
{
"Valid - emptydir mount with medium and size limit",
&kubernetesplatform.KubernetesExecutorConfig{
EmptyDirMounts: []*kubernetesplatform.EmptyDirMount{
{
VolumeName: "emptydir1",
MountPath: "/data/path",
Medium: &medium,
SizeLimit: &sizeLimit,
},
},
},
&k8score.PodSpec{
Containers: []k8score.Container{
{
Name: "main",
},
},
},
&k8score.PodSpec{
Containers: []k8score.Container{
{
Name: "main",
VolumeMounts: []k8score.VolumeMount{
{
Name: "emptydir1",
MountPath: "/data/path",
},
},
},
},
Volumes: []k8score.Volume{
{
Name: "emptydir1",
VolumeSource: k8score.VolumeSource{
EmptyDir: &k8score.EmptyDirVolumeSource{
Medium: k8score.StorageMedium(medium),
SizeLimit: sizeLimitResource,
},
},
},
},
},
},
{
"Valid - multiple emptydir mounts",
&kubernetesplatform.KubernetesExecutorConfig{
EmptyDirMounts: []*kubernetesplatform.EmptyDirMount{
{
VolumeName: "emptydir1",
MountPath: "/data/path",
},
{
VolumeName: "emptydir2",
MountPath: "/data/path2",
},
},
},
&k8score.PodSpec{
Containers: []k8score.Container{
{
Name: "main",
},
},
},
&k8score.PodSpec{
Containers: []k8score.Container{
{
Name: "main",
VolumeMounts: []k8score.VolumeMount{
{
Name: "emptydir1",
MountPath: "/data/path",
},
{
Name: "emptydir2",
MountPath: "/data/path2",
},
},
},
},
Volumes: []k8score.Volume{
{
Name: "emptydir1",
VolumeSource: k8score.VolumeSource{
EmptyDir: &k8score.EmptyDirVolumeSource{},
},
},
{
Name: "emptydir2",
VolumeSource: k8score.VolumeSource{
EmptyDir: &k8score.EmptyDirVolumeSource{},
},
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := extendPodSpecPatch(tt.podSpec, tt.k8sExecCfg, nil, nil)
assert.Nil(t, err)
assert.Equal(t, tt.expected, tt.podSpec)
})
}
}

func Test_extendPodSpecPatch_ImagePullSecrets(t *testing.T) {
tests := []struct {
name string
Expand Down
2 changes: 1 addition & 1 deletion backend/third_party_licenses/apiserver.csv
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-exithandler/pkg/apis/exitha
github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-kfptask/pkg/apis/kfptask,https://github.com/kubeflow/kfp-tekton/blob/a75d4b3711ff/tekton-catalog/tekton-kfptask/LICENSE,Apache-2.0
github.com/kubeflow/pipelines/api/v2alpha1/go,https://github.com/kubeflow/pipelines/blob/58ce09e07d03/api/LICENSE,Apache-2.0
github.com/kubeflow/pipelines/backend,https://github.com/kubeflow/pipelines/blob/HEAD/LICENSE,Apache-2.0
github.com/kubeflow/pipelines/kubernetes_platform/go/kubernetesplatform,https://github.com/kubeflow/pipelines/blob/8b2a099e8c9f/kubernetes_platform/LICENSE,Apache-2.0
github.com/kubeflow/pipelines/kubernetes_platform/go/kubernetesplatform,https://github.com/kubeflow/pipelines/blob/d911c8b73b49/kubernetes_platform/LICENSE,Apache-2.0
github.com/kubeflow/pipelines/third_party/ml-metadata/go/ml_metadata,https://github.com/kubeflow/pipelines/blob/e1f0c010f800/third_party/ml-metadata/LICENSE,Apache-2.0
github.com/lann/builder,https://github.com/lann/builder/blob/47ae307949d0/LICENSE,MIT
github.com/lann/ps,https://github.com/lann/ps/blob/62de8c46ede0/LICENSE,MIT
Expand Down
2 changes: 1 addition & 1 deletion backend/third_party_licenses/driver.csv
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ github.com/josharian/intern,https://github.com/josharian/intern/blob/v1.0.0/lice
github.com/json-iterator/go,https://github.com/json-iterator/go/blob/v1.1.12/LICENSE,MIT
github.com/kubeflow/pipelines/api/v2alpha1/go,https://github.com/kubeflow/pipelines/blob/58ce09e07d03/api/LICENSE,Apache-2.0
github.com/kubeflow/pipelines/backend,https://github.com/kubeflow/pipelines/blob/HEAD/LICENSE,Apache-2.0
github.com/kubeflow/pipelines/kubernetes_platform/go/kubernetesplatform,https://github.com/kubeflow/pipelines/blob/8b2a099e8c9f/kubernetes_platform/LICENSE,Apache-2.0
github.com/kubeflow/pipelines/kubernetes_platform/go/kubernetesplatform,https://github.com/kubeflow/pipelines/blob/d911c8b73b49/kubernetes_platform/LICENSE,Apache-2.0
github.com/kubeflow/pipelines/third_party/ml-metadata/go/ml_metadata,https://github.com/kubeflow/pipelines/blob/e1f0c010f800/third_party/ml-metadata/LICENSE,Apache-2.0
github.com/mailru/easyjson,https://github.com/mailru/easyjson/blob/v0.7.7/LICENSE,MIT
github.com/modern-go/concurrent,https://github.com/modern-go/concurrent/blob/bacd9c7ef1dd/LICENSE,Apache-2.0
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ require (
github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-exithandler v0.0.0-20231127195001-a75d4b3711ff
github.com/kubeflow/kfp-tekton/tekton-catalog/tekton-kfptask v0.0.0-20231127195001-a75d4b3711ff
github.com/kubeflow/pipelines/api v0.0.0-20231027040853-58ce09e07d03
github.com/kubeflow/pipelines/kubernetes_platform v0.0.0-20240403164522-8b2a099e8c9f
github.com/kubeflow/pipelines/kubernetes_platform v0.0.0-20240725205754-d911c8b73b49
github.com/kubeflow/pipelines/third_party/ml-metadata v0.0.0-20230810215105-e1f0c010f800
github.com/lestrrat-go/strftime v1.0.4
github.com/mattn/go-sqlite3 v1.14.19
Expand Down
4 changes: 2 additions & 2 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions kubernetes_platform/python/kfp/kubernetes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
'add_toleration',
'CreatePVC',
'DeletePVC',
'empty_dir_mount',
'mount_pvc',
'set_image_pull_policy',
'use_field_path_as_env',
Expand Down Expand Up @@ -49,3 +50,4 @@
from kfp.kubernetes.volume import CreatePVC
from kfp.kubernetes.volume import DeletePVC
from kfp.kubernetes.volume import mount_pvc
from kfp.kubernetes.empty_dir import empty_dir_mount
56 changes: 56 additions & 0 deletions kubernetes_platform/python/kfp/kubernetes/empty_dir.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Copyright 2024 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Optional

from google.protobuf import json_format
from kfp.dsl import PipelineTask
from kfp.kubernetes import common
from kfp.kubernetes import kubernetes_executor_config_pb2 as pb


def empty_dir_mount(
task: PipelineTask,
volume_name: str,
mount_path: str,
medium: Optional[str] = None,
size_limit: Optional[str] = None,
) -> PipelineTask:
"""Mount an EmptyDir volume to the task's container.

Args:
task: Pipeline task.
volume_name: Name of the EmptyDir volume.
mount_path: Path within the container at which the EmptyDir should be mounted.
medium: Storage medium to back the EmptyDir. Must be one of `Memory` or `HugePages`. Defaults to `None`.
size_limit: Maximum size of the EmptyDir. For example, `5Gi`. Defaults to `None`.

Returns:
Task object with updated EmptyDir mount configuration.
"""

msg = common.get_existing_kubernetes_config_as_message(task)

empty_dir_mount = pb.EmptyDirMount(
volume_name=volume_name,
mount_path=mount_path,
medium=medium,
size_limit=size_limit,
)

msg.empty_dir_mounts.append(empty_dir_mount)

task.platform_config['kubernetes'] = json_format.MessageToDict(msg)

return task
36 changes: 36 additions & 0 deletions kubernetes_platform/python/test/snapshot/data/empty_dir_mounts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Copyright 2024 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from kfp import dsl
from kfp import kubernetes


@dsl.component
def comp():
pass

@dsl.pipeline
def my_pipeline():
task = comp()
kubernetes.empty_dir_mount(
task,
volume_name='emptydir-vol-1',
mount_path='/mnt/my_vol_1',
medium='Memory',
size_limit='1Gi'
)

if __name__ == '__main__':
from kfp import compiler
compiler.Compiler().compile(my_pipeline, __file__.replace('.py', '.yaml'))
Loading
Loading