Skip to content

Commit

Permalink
Adjust the namespace for Delete/Terminate/Retry run APIs (kubeflow#2765)
Browse files Browse the repository at this point in the history
* delete/terminate run in the corresponding namespace

* retry run in the corresponding namespace; wrap pod client with kubernetes core client

* fix duplicate build rules
  • Loading branch information
gaoning777 authored and Jeffwan committed Dec 9, 2020
1 parent bccbb3f commit 6d8e02f
Show file tree
Hide file tree
Showing 11 changed files with 120 additions and 44 deletions.
6 changes: 5 additions & 1 deletion backend/src/apiserver/client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ go_library(
"kfam.go",
"kfam_fake.go",
"minio.go",
"pod.go",
"kubernetes_core.go",
"kubernetes_core_fake.go",
"pod_fake.go",
"scheduled_workflow.go",
"sql.go",
"workflow_fake.go",
Expand All @@ -28,6 +30,8 @@ go_library(
"@com_github_golang_glog//:go_default_library",
"@com_github_minio_minio_go//:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@io_k8s_api//core/v1:go_default_library",
"@io_k8s_api//policy/v1beta1:go_default_library",
"@io_k8s_apimachinery//pkg/apis/meta/v1:go_default_library",
"@io_k8s_apimachinery//pkg/types:go_default_library",
"@io_k8s_apimachinery//pkg/watch:go_default_library",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,19 @@ import (
"time"
)

func createPodClient(namespace string) (v1.PodInterface, error) {
type KubernetesCoreInterface interface {
PodClient(namespace string) v1.PodInterface
}

type KubernetesCore struct {
coreV1Client v1.CoreV1Interface
}

func (c *KubernetesCore) PodClient(namespace string) v1.PodInterface {
return c.coreV1Client.Pods(namespace)
}

func createKubernetesCore() (KubernetesCoreInterface, error) {
restConfig, err := rest.InClusterConfig()
if err != nil {
return nil, errors.Wrap(err, "Failed to initialize kubernetes client.")
Expand All @@ -20,15 +32,15 @@ func createPodClient(namespace string) (v1.PodInterface, error) {
if err != nil {
return nil, errors.Wrap(err, "Failed to initialize kubernetes client set.")
}
return clientSet.CoreV1().Pods(namespace), nil
return &KubernetesCore{clientSet.CoreV1()}, nil
}

// CreatePodClientOrFatal creates a new client for the Kubernetes pod.
func CreatePodClientOrFatal(namespace string, initConnectionTimeout time.Duration) v1.PodInterface{
var client v1.PodInterface
// CreateKubernetesCoreOrFatal creates a new client for the Kubernetes pod.
func CreateKubernetesCoreOrFatal(initConnectionTimeout time.Duration) KubernetesCoreInterface {
var client KubernetesCoreInterface
var err error
var operation = func() error {
client, err = createPodClient(namespace)
client, err = createKubernetesCore()
if err != nil {
return err
}
Expand Down
43 changes: 43 additions & 0 deletions backend/src/apiserver/client/kubernetes_core_fake.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2019 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package client

import (
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
)

type FakeKuberneteCoreClient struct {
podClientFake *FakePodClient
}

func (c *FakeKuberneteCoreClient) PodClient(namespace string) v1.PodInterface {
return c.podClientFake
}

func NewFakeKuberneteCoresClient() *FakeKuberneteCoreClient {
return &FakeKuberneteCoreClient{&FakePodClient{}}
}

type FakeKubernetesCoreClientWithBadPodClient struct {
podClientFake *FakeBadPodClient
}

func NewFakeKubernetesCoreClientWithBadPodClient() *FakeKubernetesCoreClientWithBadPodClient {
return &FakeKubernetesCoreClientWithBadPodClient{&FakeBadPodClient{}}
}

func (c *FakeKubernetesCoreClientWithBadPodClient) PodClient(namespace string) v1.PodInterface {
return c.podClientFake
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package resource
package client

import (
"errors"
Expand Down
13 changes: 5 additions & 8 deletions backend/src/apiserver/client_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@ import (
"os"
"time"

"github.com/kubeflow/pipelines/backend/src/apiserver/common"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"

"github.com/cenkalti/backoff"
"github.com/golang/glog"
"github.com/kubeflow/pipelines/backend/src/apiserver/common"

"github.com/jinzhu/gorm"
_ "github.com/jinzhu/gorm/dialects/sqlite"
Expand Down Expand Up @@ -68,7 +66,7 @@ type ClientManager struct {
objectStore storage.ObjectStoreInterface
argoClient client.ArgoClientInterface
swfClient scheduledworkflowclient.ScheduledWorkflowInterface
podClient v1.PodInterface
k8sCoreClient client.KubernetesCoreInterface
kfamClient client.KFAMClientInterface
time util.TimeInterface
uuid util.UUIDGeneratorInterface
Expand Down Expand Up @@ -114,8 +112,8 @@ func (c *ClientManager) ScheduledWorkflow() scheduledworkflowclient.ScheduledWor
return c.swfClient
}

func (c *ClientManager) PodClient() v1.PodInterface {
return c.podClient
func (c *ClientManager) KubernetesCoreClient() client.KubernetesCoreInterface {
return c.k8sCoreClient
}

func (c *ClientManager) KFAMClient() client.KFAMClientInterface {
Expand Down Expand Up @@ -154,8 +152,7 @@ func (c *ClientManager) init() {
c.swfClient = client.CreateScheduledWorkflowClientOrFatal(
common.GetStringConfig(client.PodNamespace), common.GetDurationConfig(initConnectionTimeout))

c.podClient = client.CreatePodClientOrFatal(
common.GetStringConfig(client.PodNamespace), common.GetDurationConfig(initConnectionTimeout))
c.k8sCoreClient = client.CreateKubernetesCoreOrFatal(common.GetDurationConfig(initConnectionTimeout))

runStore := storage.NewRunStore(db, c.time)
c.runStore = runStore
Expand Down
1 change: 0 additions & 1 deletion backend/src/apiserver/resource/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ go_library(
srcs = [
"client_manager_fake.go",
"model_converter.go",
"pod_fake.go",
"resource_manager.go",
"resource_manager_util.go",
"scheduled_workflow_fake.go",
Expand Down
9 changes: 4 additions & 5 deletions backend/src/apiserver/resource/client_manager_fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/kubeflow/pipelines/backend/src/apiserver/storage"
"github.com/kubeflow/pipelines/backend/src/common/util"
scheduledworkflowclient "github.com/kubeflow/pipelines/backend/src/crd/pkg/client/clientset/versioned/typed/scheduledworkflow/v1beta1"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
)

const (
Expand All @@ -40,7 +39,7 @@ type FakeClientManager struct {
objectStore storage.ObjectStoreInterface
ArgoClientFake *client.FakeArgoClient
scheduledWorkflowClientFake *FakeScheduledWorkflowClient
podClientFake v1.PodInterface
k8sCoreClientFake *client.FakeKuberneteCoreClient
KfamClientFake client.KFAMClientInterface
time util.TimeInterface
uuid util.UUIDGeneratorInterface
Expand Down Expand Up @@ -76,7 +75,7 @@ func NewFakeClientManager(time util.TimeInterface, uuid util.UUIDGeneratorInterf
defaultExperimentStore: storage.NewDefaultExperimentStore(db),
objectStore: storage.NewFakeObjectStore(),
scheduledWorkflowClientFake: NewScheduledWorkflowClientFake(),
podClientFake: FakePodClient{},
k8sCoreClientFake: client.NewFakeKuberneteCoresClient(),
KfamClientFake: client.NewFakeKFAMClientAuthorized(),
time: time,
uuid: uuid,
Expand Down Expand Up @@ -144,8 +143,8 @@ func (f *FakeClientManager) ScheduledWorkflow() scheduledworkflowclient.Schedule
return f.scheduledWorkflowClientFake
}

func (f *FakeClientManager) PodClient() v1.PodInterface {
return f.podClientFake
func (f *FakeClientManager) KubernetesCoreClient() client.KubernetesCoreInterface {
return f.k8sCoreClientFake
}

func (f *FakeClientManager) KFAMClient() client.KFAMClientInterface {
Expand Down
45 changes: 33 additions & 12 deletions backend/src/apiserver/resource/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
scheduledworkflowclient "github.com/kubeflow/pipelines/backend/src/crd/pkg/client/clientset/versioned/typed/scheduledworkflow/v1beta1"
"github.com/pkg/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"

"k8s.io/apimachinery/pkg/types"
)
Expand All @@ -56,7 +55,7 @@ type ClientManagerInterface interface {
ObjectStore() storage.ObjectStoreInterface
ArgoClient() client.ArgoClientInterface
ScheduledWorkflow() scheduledworkflowclient.ScheduledWorkflowInterface
PodClient() corev1.PodInterface
KubernetesCoreClient() client.KubernetesCoreInterface
KFAMClient() client.KFAMClientInterface
Time() util.TimeInterface
UUID() util.UUIDGeneratorInterface
Expand All @@ -73,7 +72,7 @@ type ResourceManager struct {
objectStore storage.ObjectStoreInterface
argoClient client.ArgoClientInterface
scheduledWorkflowClient scheduledworkflowclient.ScheduledWorkflowInterface
podClient corev1.PodInterface
k8sCoreClient client.KubernetesCoreInterface
kfamClient client.KFAMClientInterface
time util.TimeInterface
uuid util.UUIDGeneratorInterface
Expand All @@ -91,7 +90,7 @@ func NewResourceManager(clientManager ClientManagerInterface) *ResourceManager {
objectStore: clientManager.ObjectStore(),
argoClient: clientManager.ArgoClient(),
scheduledWorkflowClient: clientManager.ScheduledWorkflow(),
podClient: clientManager.PodClient(),
k8sCoreClient: clientManager.KubernetesCoreClient(),
kfamClient: clientManager.KFAMClient(),
time: clientManager.Time(),
uuid: clientManager.UUID(),
Expand Down Expand Up @@ -351,7 +350,11 @@ func (r *ResourceManager) DeleteRun(runID string) error {
if err != nil {
return util.Wrap(err, "Delete run failed")
}
err = r.getWorkflowClient("").Delete(runDetail.Name, &v1.DeleteOptions{})
namespace, err := r.GetNamespaceFromRunID(runID)
if err != nil {
return util.Wrap(err, "Delete run failed")
}
err = r.getWorkflowClient(namespace).Delete(runDetail.Name, &v1.DeleteOptions{})
if err != nil {
// API won't need to delete the workflow CRD
// once persistent agent sync the state to DB and set TTL for it.
Expand Down Expand Up @@ -397,12 +400,17 @@ func (r *ResourceManager) TerminateRun(runId string) error {
return util.Wrap(err, "Terminate run failed")
}

namespace, err := r.GetNamespaceFromRunID(runId)
if err != nil {
return util.Wrap(err, "Terminate run failed")
}

err = r.runStore.TerminateRun(runId)
if err != nil {
return util.Wrap(err, "Terminate run failed")
}

err = TerminateWorkflow(r.getWorkflowClient(""), runDetail.Run.Name)
err = TerminateWorkflow(r.getWorkflowClient(namespace), runDetail.Run.Name)
if err != nil {
return util.NewInternalServerError(err, "Failed to terminate the run")
}
Expand All @@ -414,6 +422,10 @@ func (r *ResourceManager) RetryRun(runId string) error {
if err != nil {
return util.Wrap(err, "Retry run failed")
}
namespace, err := r.GetNamespaceFromRunID(runId)
if err != nil {
return util.Wrap(err, "Retry run failed")
}

if runDetail.WorkflowRuntimeManifest == "" {
return util.NewBadRequestError(errors.New("workflow cannot be retried"), "Workflow must be Failed/Error to retry")
Expand All @@ -428,16 +440,16 @@ func (r *ResourceManager) RetryRun(runId string) error {
return util.Wrap(err, "Retry run failed.")
}

if err = deletePods(r.podClient, podsToDelete); err != nil {
if err = deletePods(r.k8sCoreClient, podsToDelete, namespace); err != nil {
return util.NewInternalServerError(err, "Retry run failed. Failed to clean up the failed pods from previous run.")
}

// First try to update workflow
updateError := r.updateWorkflow(newWorkflow)
updateError := r.updateWorkflow(newWorkflow, namespace)
if updateError != nil {
// Remove resource version
newWorkflow.ResourceVersion = ""
newCreatedWorkflow, createError := r.getWorkflowClient("").Create(newWorkflow.Workflow)
newCreatedWorkflow, createError := r.getWorkflowClient(namespace).Create(newWorkflow.Workflow)
if createError != nil {
return util.NewInternalServerError(createError,
"Retry run failed. Failed to create or update the run. Update Error: %s, Create Error: %s",
Expand All @@ -452,15 +464,15 @@ func (r *ResourceManager) RetryRun(runId string) error {
return nil
}

func (r *ResourceManager) updateWorkflow(newWorkflow *util.Workflow) error {
func (r *ResourceManager) updateWorkflow(newWorkflow *util.Workflow, namespace string) error {
// If fail to get the workflow, return error.
latestWorkflow, err := r.getWorkflowClient("").Get(newWorkflow.Name, v1.GetOptions{})
latestWorkflow, err := r.getWorkflowClient(namespace).Get(newWorkflow.Name, v1.GetOptions{})
if err != nil {
return err
}
// Update the workflow's resource version to latest.
newWorkflow.ResourceVersion = latestWorkflow.ResourceVersion
_, err = r.getWorkflowClient("").Update(newWorkflow.Workflow)
_, err = r.getWorkflowClient(namespace).Update(newWorkflow.Workflow)
return err
}

Expand Down Expand Up @@ -971,3 +983,12 @@ func (r *ResourceManager) GetPipelineVersionTemplate(versionId string) ([]byte,
func (r *ResourceManager) IsRequestAuthorized(userIdentity string, namespace string) (bool, error) {
return r.kfamClient.IsAuthorized(userIdentity, namespace)
}

func (r *ResourceManager) GetNamespaceFromRunID(runId string) (string, error) {
runDetail, err := r.GetRun(runId)
if err != nil {
return "", util.Wrap(err, "Failed to get namespace from run id.")
}
namespace := model.GetNamespaceFromModelResourceReferences(runDetail.ResourceReferences)
return namespace, nil
}
2 changes: 1 addition & 1 deletion backend/src/apiserver/resource/resource_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,7 @@ func TestRetryRun_FailedDeletePods(t *testing.T) {
store, manager, runDetail := initWithOneTimeFailedRun(t)
defer store.Close()

manager.podClient = FakeBadPodClient{}
manager.k8sCoreClient = client.NewFakeKubernetesCoreClientWithBadPodClient()
err := manager.RetryRun(runDetail.UUID)
assert.NotNil(t, err)
assert.Contains(t, err.Error(), "failed to delete pod")
Expand Down
13 changes: 7 additions & 6 deletions backend/src/apiserver/resource/resource_manager_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,19 @@ package resource

import (
"errors"
"regexp"
"strings"
"time"

wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/workflow/common"
api "github.com/kubeflow/pipelines/backend/api/go_client"
"github.com/kubeflow/pipelines/backend/src/apiserver/client"
"github.com/kubeflow/pipelines/backend/src/common/util"
scheduledworkflow "github.com/kubeflow/pipelines/backend/src/crd/pkg/apis/scheduledworkflow/v1beta1"
apierr "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"regexp"
"strings"
"time"
)

func toCRDTrigger(apiTrigger *api.Trigger) *scheduledworkflow.Trigger {
Expand Down Expand Up @@ -171,9 +172,9 @@ func formulateRetryWorkflow(wf *util.Workflow) (*util.Workflow, []string, error)
return util.NewWorkflow(newWF), podsToDelete, nil
}

func deletePods(podClient corev1.PodInterface, podsToDelete []string) error {
func deletePods(k8sCoreClient client.KubernetesCoreInterface, podsToDelete []string, namespace string) error {
for _, podId := range podsToDelete {
err := podClient.Delete(podId, &metav1.DeleteOptions{})
err := k8sCoreClient.PodClient(namespace).Delete(podId, &metav1.DeleteOptions{})
if err != nil && !apierr.IsNotFound(err) {
return util.NewInternalServerError(err, "Failed to delete pods.")
}
Expand Down
6 changes: 3 additions & 3 deletions backend/src/apiserver/server/run_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,14 +186,14 @@ func (s *RunServer) canAccessRun(ctx context.Context, runId string) error {
// Skip authz if not multi-user mode.
return nil
}
runDetail, err := s.resourceManager.GetRun(runId)
namespace, err := s.resourceManager.GetNamespaceFromRunID(runId)
if err != nil {
return util.Wrap(err, "Failed to authorize with the run Id.")
}
namespace := model.GetNamespaceFromModelResourceReferences(runDetail.ResourceReferences)
if len(namespace) == 0 {
return util.NewInternalServerError(errors.New("There is no namespace in the ResourceReferences"), "There is no namespace in the ResourceReferences")
return util.NewInternalServerError(errors.New("There is no namespace found"), "There is no namespace found")
}

err = isAuthorized(s.resourceManager, ctx, namespace)
if err != nil {
return util.Wrap(err, "Failed to authorize with API resource references")
Expand Down

0 comments on commit 6d8e02f

Please sign in to comment.