diff --git a/backend/src/apiserver/client/BUILD.bazel b/backend/src/apiserver/client/BUILD.bazel index 50d26b8b7fad..7a8a07a76f6e 100644 --- a/backend/src/apiserver/client/BUILD.bazel +++ b/backend/src/apiserver/client/BUILD.bazel @@ -8,7 +8,9 @@ go_library( "kfam.go", "kfam_fake.go", "minio.go", - "pod.go", + "kubernetes_core.go", + "kubernetes_core_fake.go", + "pod_fake.go", "scheduled_workflow.go", "sql.go", "workflow_fake.go", @@ -28,6 +30,8 @@ go_library( "@com_github_golang_glog//:go_default_library", "@com_github_minio_minio_go//:go_default_library", "@com_github_pkg_errors//:go_default_library", + "@io_k8s_api//core/v1:go_default_library", + "@io_k8s_api//policy/v1beta1:go_default_library", "@io_k8s_apimachinery//pkg/apis/meta/v1:go_default_library", "@io_k8s_apimachinery//pkg/types:go_default_library", "@io_k8s_apimachinery//pkg/watch:go_default_library", diff --git a/backend/src/apiserver/client/pod.go b/backend/src/apiserver/client/kubernetes_core.go similarity index 56% rename from backend/src/apiserver/client/pod.go rename to backend/src/apiserver/client/kubernetes_core.go index 637c7e9e93d3..5e1282906a4a 100644 --- a/backend/src/apiserver/client/pod.go +++ b/backend/src/apiserver/client/kubernetes_core.go @@ -10,7 +10,19 @@ import ( "time" ) -func createPodClient(namespace string) (v1.PodInterface, error) { +type KubernetesCoreInterface interface { + PodClient(namespace string) v1.PodInterface +} + +type KubernetesCore struct { + coreV1Client v1.CoreV1Interface +} + +func (c *KubernetesCore) PodClient(namespace string) v1.PodInterface { + return c.coreV1Client.Pods(namespace) +} + +func createKubernetesCore() (KubernetesCoreInterface, error) { restConfig, err := rest.InClusterConfig() if err != nil { return nil, errors.Wrap(err, "Failed to initialize kubernetes client.") @@ -20,15 +32,15 @@ func createPodClient(namespace string) (v1.PodInterface, error) { if err != nil { return nil, errors.Wrap(err, "Failed to initialize kubernetes client set.") } - return clientSet.CoreV1().Pods(namespace), nil + return &KubernetesCore{clientSet.CoreV1()}, nil } -// CreatePodClientOrFatal creates a new client for the Kubernetes pod. -func CreatePodClientOrFatal(namespace string, initConnectionTimeout time.Duration) v1.PodInterface{ - var client v1.PodInterface +// CreateKubernetesCoreOrFatal creates a new client for the Kubernetes pod. +func CreateKubernetesCoreOrFatal(initConnectionTimeout time.Duration) KubernetesCoreInterface { + var client KubernetesCoreInterface var err error var operation = func() error { - client, err = createPodClient(namespace) + client, err = createKubernetesCore() if err != nil { return err } diff --git a/backend/src/apiserver/client/kubernetes_core_fake.go b/backend/src/apiserver/client/kubernetes_core_fake.go new file mode 100644 index 000000000000..2c40849ebdab --- /dev/null +++ b/backend/src/apiserver/client/kubernetes_core_fake.go @@ -0,0 +1,43 @@ +// Copyright 2019 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +import ( + v1 "k8s.io/client-go/kubernetes/typed/core/v1" +) + +type FakeKuberneteCoreClient struct { + podClientFake *FakePodClient +} + +func (c *FakeKuberneteCoreClient) PodClient(namespace string) v1.PodInterface { + return c.podClientFake +} + +func NewFakeKuberneteCoresClient() *FakeKuberneteCoreClient { + return &FakeKuberneteCoreClient{&FakePodClient{}} +} + +type FakeKubernetesCoreClientWithBadPodClient struct { + podClientFake *FakeBadPodClient +} + +func NewFakeKubernetesCoreClientWithBadPodClient() *FakeKubernetesCoreClientWithBadPodClient { + return &FakeKubernetesCoreClientWithBadPodClient{&FakeBadPodClient{}} +} + +func (c *FakeKubernetesCoreClientWithBadPodClient) PodClient(namespace string) v1.PodInterface { + return c.podClientFake +} diff --git a/backend/src/apiserver/resource/pod_fake.go b/backend/src/apiserver/client/pod_fake.go similarity index 99% rename from backend/src/apiserver/resource/pod_fake.go rename to backend/src/apiserver/client/pod_fake.go index cfe89a3e08a0..68b87c3a9387 100644 --- a/backend/src/apiserver/resource/pod_fake.go +++ b/backend/src/apiserver/client/pod_fake.go @@ -1,4 +1,4 @@ -package resource +package client import ( "errors" diff --git a/backend/src/apiserver/client_manager.go b/backend/src/apiserver/client_manager.go index 81f414b0bbaf..3f6014c20fa1 100644 --- a/backend/src/apiserver/client_manager.go +++ b/backend/src/apiserver/client_manager.go @@ -20,11 +20,9 @@ import ( "os" "time" - "github.com/kubeflow/pipelines/backend/src/apiserver/common" - v1 "k8s.io/client-go/kubernetes/typed/core/v1" - "github.com/cenkalti/backoff" "github.com/golang/glog" + "github.com/kubeflow/pipelines/backend/src/apiserver/common" "github.com/jinzhu/gorm" _ "github.com/jinzhu/gorm/dialects/sqlite" @@ -68,7 +66,7 @@ type ClientManager struct { objectStore storage.ObjectStoreInterface argoClient client.ArgoClientInterface swfClient scheduledworkflowclient.ScheduledWorkflowInterface - podClient v1.PodInterface + k8sCoreClient client.KubernetesCoreInterface kfamClient client.KFAMClientInterface time util.TimeInterface uuid util.UUIDGeneratorInterface @@ -114,8 +112,8 @@ func (c *ClientManager) ScheduledWorkflow() scheduledworkflowclient.ScheduledWor return c.swfClient } -func (c *ClientManager) PodClient() v1.PodInterface { - return c.podClient +func (c *ClientManager) KubernetesCoreClient() client.KubernetesCoreInterface { + return c.k8sCoreClient } func (c *ClientManager) KFAMClient() client.KFAMClientInterface { @@ -154,8 +152,7 @@ func (c *ClientManager) init() { c.swfClient = client.CreateScheduledWorkflowClientOrFatal( common.GetStringConfig(client.PodNamespace), common.GetDurationConfig(initConnectionTimeout)) - c.podClient = client.CreatePodClientOrFatal( - common.GetStringConfig(client.PodNamespace), common.GetDurationConfig(initConnectionTimeout)) + c.k8sCoreClient = client.CreateKubernetesCoreOrFatal(common.GetDurationConfig(initConnectionTimeout)) runStore := storage.NewRunStore(db, c.time) c.runStore = runStore diff --git a/backend/src/apiserver/resource/BUILD.bazel b/backend/src/apiserver/resource/BUILD.bazel index 10b0e5d2760e..5cbe4a8a136f 100644 --- a/backend/src/apiserver/resource/BUILD.bazel +++ b/backend/src/apiserver/resource/BUILD.bazel @@ -5,7 +5,6 @@ go_library( srcs = [ "client_manager_fake.go", "model_converter.go", - "pod_fake.go", "resource_manager.go", "resource_manager_util.go", "scheduled_workflow_fake.go", diff --git a/backend/src/apiserver/resource/client_manager_fake.go b/backend/src/apiserver/resource/client_manager_fake.go index 016accaed44f..47d42dd52fee 100644 --- a/backend/src/apiserver/resource/client_manager_fake.go +++ b/backend/src/apiserver/resource/client_manager_fake.go @@ -20,7 +20,6 @@ import ( "github.com/kubeflow/pipelines/backend/src/apiserver/storage" "github.com/kubeflow/pipelines/backend/src/common/util" scheduledworkflowclient "github.com/kubeflow/pipelines/backend/src/crd/pkg/client/clientset/versioned/typed/scheduledworkflow/v1beta1" - v1 "k8s.io/client-go/kubernetes/typed/core/v1" ) const ( @@ -40,7 +39,7 @@ type FakeClientManager struct { objectStore storage.ObjectStoreInterface ArgoClientFake *client.FakeArgoClient scheduledWorkflowClientFake *FakeScheduledWorkflowClient - podClientFake v1.PodInterface + k8sCoreClientFake *client.FakeKuberneteCoreClient KfamClientFake client.KFAMClientInterface time util.TimeInterface uuid util.UUIDGeneratorInterface @@ -76,7 +75,7 @@ func NewFakeClientManager(time util.TimeInterface, uuid util.UUIDGeneratorInterf defaultExperimentStore: storage.NewDefaultExperimentStore(db), objectStore: storage.NewFakeObjectStore(), scheduledWorkflowClientFake: NewScheduledWorkflowClientFake(), - podClientFake: FakePodClient{}, + k8sCoreClientFake: client.NewFakeKuberneteCoresClient(), KfamClientFake: client.NewFakeKFAMClientAuthorized(), time: time, uuid: uuid, @@ -144,8 +143,8 @@ func (f *FakeClientManager) ScheduledWorkflow() scheduledworkflowclient.Schedule return f.scheduledWorkflowClientFake } -func (f *FakeClientManager) PodClient() v1.PodInterface { - return f.podClientFake +func (f *FakeClientManager) KubernetesCoreClient() client.KubernetesCoreInterface { + return f.k8sCoreClientFake } func (f *FakeClientManager) KFAMClient() client.KFAMClientInterface { diff --git a/backend/src/apiserver/resource/resource_manager.go b/backend/src/apiserver/resource/resource_manager.go index 16a39f0b990f..5a53b03a0491 100644 --- a/backend/src/apiserver/resource/resource_manager.go +++ b/backend/src/apiserver/resource/resource_manager.go @@ -34,7 +34,6 @@ import ( scheduledworkflowclient "github.com/kubeflow/pipelines/backend/src/crd/pkg/client/clientset/versioned/typed/scheduledworkflow/v1beta1" "github.com/pkg/errors" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - corev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/apimachinery/pkg/types" ) @@ -56,7 +55,7 @@ type ClientManagerInterface interface { ObjectStore() storage.ObjectStoreInterface ArgoClient() client.ArgoClientInterface ScheduledWorkflow() scheduledworkflowclient.ScheduledWorkflowInterface - PodClient() corev1.PodInterface + KubernetesCoreClient() client.KubernetesCoreInterface KFAMClient() client.KFAMClientInterface Time() util.TimeInterface UUID() util.UUIDGeneratorInterface @@ -73,7 +72,7 @@ type ResourceManager struct { objectStore storage.ObjectStoreInterface argoClient client.ArgoClientInterface scheduledWorkflowClient scheduledworkflowclient.ScheduledWorkflowInterface - podClient corev1.PodInterface + k8sCoreClient client.KubernetesCoreInterface kfamClient client.KFAMClientInterface time util.TimeInterface uuid util.UUIDGeneratorInterface @@ -91,7 +90,7 @@ func NewResourceManager(clientManager ClientManagerInterface) *ResourceManager { objectStore: clientManager.ObjectStore(), argoClient: clientManager.ArgoClient(), scheduledWorkflowClient: clientManager.ScheduledWorkflow(), - podClient: clientManager.PodClient(), + k8sCoreClient: clientManager.KubernetesCoreClient(), kfamClient: clientManager.KFAMClient(), time: clientManager.Time(), uuid: clientManager.UUID(), @@ -351,7 +350,11 @@ func (r *ResourceManager) DeleteRun(runID string) error { if err != nil { return util.Wrap(err, "Delete run failed") } - err = r.getWorkflowClient("").Delete(runDetail.Name, &v1.DeleteOptions{}) + namespace, err := r.GetNamespaceFromRunID(runID) + if err != nil { + return util.Wrap(err, "Delete run failed") + } + err = r.getWorkflowClient(namespace).Delete(runDetail.Name, &v1.DeleteOptions{}) if err != nil { // API won't need to delete the workflow CRD // once persistent agent sync the state to DB and set TTL for it. @@ -397,12 +400,17 @@ func (r *ResourceManager) TerminateRun(runId string) error { return util.Wrap(err, "Terminate run failed") } + namespace, err := r.GetNamespaceFromRunID(runId) + if err != nil { + return util.Wrap(err, "Terminate run failed") + } + err = r.runStore.TerminateRun(runId) if err != nil { return util.Wrap(err, "Terminate run failed") } - err = TerminateWorkflow(r.getWorkflowClient(""), runDetail.Run.Name) + err = TerminateWorkflow(r.getWorkflowClient(namespace), runDetail.Run.Name) if err != nil { return util.NewInternalServerError(err, "Failed to terminate the run") } @@ -414,6 +422,10 @@ func (r *ResourceManager) RetryRun(runId string) error { if err != nil { return util.Wrap(err, "Retry run failed") } + namespace, err := r.GetNamespaceFromRunID(runId) + if err != nil { + return util.Wrap(err, "Retry run failed") + } if runDetail.WorkflowRuntimeManifest == "" { return util.NewBadRequestError(errors.New("workflow cannot be retried"), "Workflow must be Failed/Error to retry") @@ -428,16 +440,16 @@ func (r *ResourceManager) RetryRun(runId string) error { return util.Wrap(err, "Retry run failed.") } - if err = deletePods(r.podClient, podsToDelete); err != nil { + if err = deletePods(r.k8sCoreClient, podsToDelete, namespace); err != nil { return util.NewInternalServerError(err, "Retry run failed. Failed to clean up the failed pods from previous run.") } // First try to update workflow - updateError := r.updateWorkflow(newWorkflow) + updateError := r.updateWorkflow(newWorkflow, namespace) if updateError != nil { // Remove resource version newWorkflow.ResourceVersion = "" - newCreatedWorkflow, createError := r.getWorkflowClient("").Create(newWorkflow.Workflow) + newCreatedWorkflow, createError := r.getWorkflowClient(namespace).Create(newWorkflow.Workflow) if createError != nil { return util.NewInternalServerError(createError, "Retry run failed. Failed to create or update the run. Update Error: %s, Create Error: %s", @@ -452,15 +464,15 @@ func (r *ResourceManager) RetryRun(runId string) error { return nil } -func (r *ResourceManager) updateWorkflow(newWorkflow *util.Workflow) error { +func (r *ResourceManager) updateWorkflow(newWorkflow *util.Workflow, namespace string) error { // If fail to get the workflow, return error. - latestWorkflow, err := r.getWorkflowClient("").Get(newWorkflow.Name, v1.GetOptions{}) + latestWorkflow, err := r.getWorkflowClient(namespace).Get(newWorkflow.Name, v1.GetOptions{}) if err != nil { return err } // Update the workflow's resource version to latest. newWorkflow.ResourceVersion = latestWorkflow.ResourceVersion - _, err = r.getWorkflowClient("").Update(newWorkflow.Workflow) + _, err = r.getWorkflowClient(namespace).Update(newWorkflow.Workflow) return err } @@ -971,3 +983,12 @@ func (r *ResourceManager) GetPipelineVersionTemplate(versionId string) ([]byte, func (r *ResourceManager) IsRequestAuthorized(userIdentity string, namespace string) (bool, error) { return r.kfamClient.IsAuthorized(userIdentity, namespace) } + +func (r *ResourceManager) GetNamespaceFromRunID(runId string) (string, error) { + runDetail, err := r.GetRun(runId) + if err != nil { + return "", util.Wrap(err, "Failed to get namespace from run id.") + } + namespace := model.GetNamespaceFromModelResourceReferences(runDetail.ResourceReferences) + return namespace, nil +} diff --git a/backend/src/apiserver/resource/resource_manager_test.go b/backend/src/apiserver/resource/resource_manager_test.go index 7a48621e36c6..f915425ba941 100644 --- a/backend/src/apiserver/resource/resource_manager_test.go +++ b/backend/src/apiserver/resource/resource_manager_test.go @@ -757,7 +757,7 @@ func TestRetryRun_FailedDeletePods(t *testing.T) { store, manager, runDetail := initWithOneTimeFailedRun(t) defer store.Close() - manager.podClient = FakeBadPodClient{} + manager.k8sCoreClient = client.NewFakeKubernetesCoreClientWithBadPodClient() err := manager.RetryRun(runDetail.UUID) assert.NotNil(t, err) assert.Contains(t, err.Error(), "failed to delete pod") diff --git a/backend/src/apiserver/resource/resource_manager_util.go b/backend/src/apiserver/resource/resource_manager_util.go index 406e6b1cb9f2..2ea2372350e0 100644 --- a/backend/src/apiserver/resource/resource_manager_util.go +++ b/backend/src/apiserver/resource/resource_manager_util.go @@ -16,18 +16,19 @@ package resource import ( "errors" + "regexp" + "strings" + "time" + wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" "github.com/argoproj/argo/workflow/common" api "github.com/kubeflow/pipelines/backend/api/go_client" + "github.com/kubeflow/pipelines/backend/src/apiserver/client" "github.com/kubeflow/pipelines/backend/src/common/util" scheduledworkflow "github.com/kubeflow/pipelines/backend/src/crd/pkg/apis/scheduledworkflow/v1beta1" apierr "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - corev1 "k8s.io/client-go/kubernetes/typed/core/v1" - "regexp" - "strings" - "time" ) func toCRDTrigger(apiTrigger *api.Trigger) *scheduledworkflow.Trigger { @@ -171,9 +172,9 @@ func formulateRetryWorkflow(wf *util.Workflow) (*util.Workflow, []string, error) return util.NewWorkflow(newWF), podsToDelete, nil } -func deletePods(podClient corev1.PodInterface, podsToDelete []string) error { +func deletePods(k8sCoreClient client.KubernetesCoreInterface, podsToDelete []string, namespace string) error { for _, podId := range podsToDelete { - err := podClient.Delete(podId, &metav1.DeleteOptions{}) + err := k8sCoreClient.PodClient(namespace).Delete(podId, &metav1.DeleteOptions{}) if err != nil && !apierr.IsNotFound(err) { return util.NewInternalServerError(err, "Failed to delete pods.") } diff --git a/backend/src/apiserver/server/run_server.go b/backend/src/apiserver/server/run_server.go index 2c27a6cf1841..ef4541fff411 100644 --- a/backend/src/apiserver/server/run_server.go +++ b/backend/src/apiserver/server/run_server.go @@ -186,14 +186,14 @@ func (s *RunServer) canAccessRun(ctx context.Context, runId string) error { // Skip authz if not multi-user mode. return nil } - runDetail, err := s.resourceManager.GetRun(runId) + namespace, err := s.resourceManager.GetNamespaceFromRunID(runId) if err != nil { return util.Wrap(err, "Failed to authorize with the run Id.") } - namespace := model.GetNamespaceFromModelResourceReferences(runDetail.ResourceReferences) if len(namespace) == 0 { - return util.NewInternalServerError(errors.New("There is no namespace in the ResourceReferences"), "There is no namespace in the ResourceReferences") + return util.NewInternalServerError(errors.New("There is no namespace found"), "There is no namespace found") } + err = isAuthorized(s.resourceManager, ctx, namespace) if err != nil { return util.Wrap(err, "Failed to authorize with API resource references")