From 62269ebaa8cc382be3c224728713e645af191820 Mon Sep 17 00:00:00 2001 From: Rui Fang <31815555+rui5i@users.noreply.github.com> Date: Tue, 31 Mar 2020 15:57:27 -0700 Subject: [PATCH] [Backend]Cache - KFP pod filter logic looking for cache_enabled = true label selector (#3368) * Initial execution cache This commit adds initial execution cache service. Including http service and execution key generation. * fix master * fix go.sum * Change kfp annotation for pod filtering * update filter logic * Remove unused const --- .../deployer/cache-configmap.yaml.template | 3 ++ backend/src/cache/server/mutation.go | 43 +++---------------- backend/src/cache/server/mutation_test.go | 17 -------- 3 files changed, 10 insertions(+), 53 deletions(-) diff --git a/backend/src/cache/deployer/cache-configmap.yaml.template b/backend/src/cache/deployer/cache-configmap.yaml.template index 5e5abf6b036..d5a6a4bf8f5 100644 --- a/backend/src/cache/deployer/cache-configmap.yaml.template +++ b/backend/src/cache/deployer/cache-configmap.yaml.template @@ -10,6 +10,9 @@ webhooks: namespace: ${NAMESPACE} path: "/mutate" caBundle: ${CA_BUNDLE} + objectSelector: + matchLabels: + pipelines.kubeflow.org/cache_enabled: true rules: - operations: [ "CREATE" ] apiGroups: [""] diff --git a/backend/src/cache/server/mutation.go b/backend/src/cache/server/mutation.go index ef77552bf62..aa5ec50442f 100644 --- a/backend/src/cache/server/mutation.go +++ b/backend/src/cache/server/mutation.go @@ -32,7 +32,6 @@ import ( ) const ( - KFPAnnotation string = "pipelines.kubeflow.org" ArgoWorkflowNodeName string = "workflows.argoproj.io/node-name" ArgoWorkflowTemplate string = "workflows.argoproj.io/template" ExecutionKey string = "pipelines.kubeflow.org/execution_cache_key" @@ -74,8 +73,8 @@ func MutatePodIfCached(req *v1beta1.AdmissionRequest, clientMgr ClientManagerInt } // Pod filtering to only cache KFP argo pods except TFX pods - if !isValidPod(&pod) { - log.Printf("This pod %s is not a valid pod.", pod.ObjectMeta.Name) + if isTFXPod(&pod) { + log.Printf("This pod %s is created by tfx pipelines.", pod.ObjectMeta.Name) return nil, nil } @@ -193,42 +192,14 @@ func getValueFromSerializedMap(serializedMap string, key string) string { return value } -func isValidPod(pod *corev1.Pod) bool { - annotations := pod.ObjectMeta.Annotations - if annotations == nil || len(annotations) == 0 { - log.Printf("The annotation of this pod %s is empty.", pod.ObjectMeta.Name) - return false - } - if !isKFPArgoPod(&annotations, pod.ObjectMeta.Name) { - log.Printf("This pod %s is not created by KFP.", pod.ObjectMeta.Name) - return false - } +func isTFXPod(pod *corev1.Pod) bool { containers := pod.Spec.Containers - if containers != nil && len(containers) != 0 && isTFXPod(&containers) { - log.Printf("This pod %s is created by TFX pipelines.", pod.ObjectMeta.Name) - return false - } - return true -} - -func isKFPArgoPod(annotations *map[string]string, podName string) bool { - // is argo pod or not - if _, exists := (*annotations)[ArgoWorkflowNodeName]; !exists { - log.Printf("This pod %s is not created by Argo.", podName) - return false + if containers == nil || len(containers) == 0 { + log.Printf("This pod container does not exist.") + return true } - // is KFP pod or not - for k := range *annotations { - if strings.Contains(k, KFPAnnotation) { - return true - } - } - return false -} - -func isTFXPod(containers *[]corev1.Container) bool { var mainContainers []corev1.Container - for _, c := range *containers { + for _, c := range containers { if c.Name != "" && c.Name == "main" { mainContainers = append(mainContainers, c) } diff --git a/backend/src/cache/server/mutation_test.go b/backend/src/cache/server/mutation_test.go index 05f1f454825..2fd90366fa7 100644 --- a/backend/src/cache/server/mutation_test.go +++ b/backend/src/cache/server/mutation_test.go @@ -38,7 +38,6 @@ var ( Annotations: map[string]string{ ArgoWorkflowNodeName: "test_node", ArgoWorkflowTemplate: `{"name": "test_template"}`, - KFPAnnotation: "test_kfp", }, Labels: map[string]string{ ArgoCompleteLabelKey: "true", @@ -107,22 +106,6 @@ func TestMutatePodIfCachedWithDecodeError(t *testing.T) { assert.Contains(t, err.Error(), "could not deserialize pod object") } -func TestMutatePodIfCachedWithNonKFPPod(t *testing.T) { - nonKFPPod := *fakePod - delete(nonKFPPod.Annotations, KFPAnnotation) - patchOperation, err := MutatePodIfCached(GetFakeRequestFromPod(&nonKFPPod), fakeClientManager) - assert.Nil(t, patchOperation) - assert.Nil(t, err) -} - -func TestMutatePodIfCachedWithNonArgoPod(t *testing.T) { - nonArgoPod := *fakePod - delete(nonArgoPod.Annotations, ArgoWorkflowNodeName) - patchOperation, err := MutatePodIfCached(GetFakeRequestFromPod(&nonArgoPod), fakeClientManager) - assert.Nil(t, patchOperation) - assert.Nil(t, err) -} - func TestMutatePodIfCachedWithTFXPod(t *testing.T) { tfxPod := *fakePod mainContainerCommand := append(tfxPod.Spec.Containers[0].Command, "/tfx-src/"+TFXPodSuffix)