From 65c5a6ee1b498e57c938ce6ea4a77b6c5ccd21ba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Hern=C3=A1ndez?= <23639005+israel-hdez@users.noreply.github.com> Date: Thu, 22 Feb 2024 17:26:08 -0600 Subject: [PATCH] Enhance controller setup based on available CRDs This enhances the setup of the InferenceService controller and the InferenceGraph controller. Instead of relying on the `defaultDeploymentMode` configuration to determine what CRDs to watch, the setup now checks whether KNative Services and Istio VirtualServices are available in the cluster and setup the watches (invoke `Owns`) accordingly. This enhancement has the following advantages: * A crashloop is prevented if the CRDs are missing in the cluster. The user would still be able to create InferenceServices by taking care of annotating the ISVC for RawDeployment mode. * If RawDeployment mode is configured as the default mode, the controllers would still watch for KNative and Istio resources if these components are available. This will let the controller watch for changes for the dependent resources if the user uses Serverless mode for some of the InferenceServices. * In the InferenceService controller, the watch for the VirtualServices is still conditioned to the value of the `disableVirtualHost` configuration. --- pkg/constants/constants.go | 6 ++ .../v1alpha1/inferencegraph/controller.go | 49 ++++++++---- .../inferencegraph/controller_test.go | 73 +++++++++++++++++ .../v1beta1/inferenceservice/controller.go | 66 +++++++++++----- .../inferenceservice/controller_test.go | 78 +++++++++++++++++++ pkg/utils/utils.go | 65 ++++++++++++++++ 6 files changed, 304 insertions(+), 33 deletions(-) diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go index 827532dc984..baf55f157b3 100644 --- a/pkg/constants/constants.go +++ b/pkg/constants/constants.go @@ -426,6 +426,12 @@ const ( StateReasonCrashLoopBackOff = "CrashLoopBackOff" ) +// CRD Kinds +const ( + IstioVirtualServiceKind = "VirtualService" + KnativeServiceKind = "Service" +) + // GetRawServiceLabel generate native service label func GetRawServiceLabel(service string) string { return "isvc." + service diff --git a/pkg/controller/v1alpha1/inferencegraph/controller.go b/pkg/controller/v1alpha1/inferencegraph/controller.go index 860729df392..a9b144581a3 100644 --- a/pkg/controller/v1alpha1/inferencegraph/controller.go +++ b/pkg/controller/v1alpha1/inferencegraph/controller.go @@ -25,7 +25,6 @@ import ( "context" "encoding/json" "fmt" - isvcutils "github.com/kserve/kserve/pkg/controller/v1beta1/inferenceservice/utils" appsv1 "k8s.io/api/apps/v1" "k8s.io/client-go/util/retry" @@ -34,6 +33,8 @@ import ( "github.com/kserve/kserve/pkg/apis/serving/v1beta1" v1beta1api "github.com/kserve/kserve/pkg/apis/serving/v1beta1" "github.com/kserve/kserve/pkg/constants" + isvcutils "github.com/kserve/kserve/pkg/controller/v1beta1/inferenceservice/utils" + "github.com/kserve/kserve/pkg/utils" "github.com/pkg/errors" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" @@ -41,6 +42,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/rest" "k8s.io/client-go/tools/record" "knative.dev/pkg/apis" knservingv1 "knative.dev/serving/pkg/apis/serving/v1" @@ -53,9 +55,10 @@ import ( // InferenceGraphReconciler reconciles a InferenceGraph object type InferenceGraphReconciler struct { client.Client - Log logr.Logger - Scheme *runtime.Scheme - Recorder record.EventRecorder + ClientConfig *rest.Config + Log logr.Logger + Scheme *runtime.Scheme + Recorder record.EventRecorder } // InferenceGraphState describes the Readiness of the InferenceGraph @@ -174,6 +177,19 @@ func (r *InferenceGraphReconciler) Reconcile(ctx context.Context, req ctrl.Reque r.Log.Error(err, "name", graph.GetName()) return reconcile.Result{}, err } + + // Abort if Knative Services are not available + ksvcAvailable, checkKsvcErr := utils.IsCrdAvailable(r.ClientConfig, knservingv1.SchemeGroupVersion.String(), constants.KnativeServiceKind) + if err != nil { + return reconcile.Result{}, checkKsvcErr + } + + if !ksvcAvailable { + r.Recorder.Event(graph, v1.EventTypeWarning, "ServerlessModeRejected", + "It is not possible to use Serverless deployment mode when Knative Services are not available") + return reconcile.Result{Requeue: false}, nil + } + //@TODO check raw deployment mode desired := createKnativeService(graph.ObjectMeta, graph, routerConfig) err = controllerutil.SetControllerReference(graph, desired, r.Scheme) @@ -253,15 +269,22 @@ func inferenceGraphReadiness(status v1alpha1api.InferenceGraphStatus) bool { } func (r *InferenceGraphReconciler) SetupWithManager(mgr ctrl.Manager, deployConfig *v1beta1api.DeployConfig) error { - if deployConfig.DefaultDeploymentMode == string(constants.RawDeployment) { - return ctrl.NewControllerManagedBy(mgr). - For(&v1alpha1api.InferenceGraph{}). - Owns(&appsv1.Deployment{}). - Complete(r) + r.ClientConfig = mgr.GetConfig() + + ksvcFound, err := utils.IsCrdAvailable(r.ClientConfig, knservingv1.SchemeGroupVersion.String(), constants.KnativeServiceKind) + if err != nil { + return err + } + + ctrlBuilder := ctrl.NewControllerManagedBy(mgr). + For(&v1alpha1api.InferenceGraph{}). + Owns(&appsv1.Deployment{}) + + if ksvcFound { + ctrlBuilder = ctrlBuilder.Owns(&knservingv1.Service{}) } else { - return ctrl.NewControllerManagedBy(mgr). - For(&v1alpha1api.InferenceGraph{}). - Owns(&knservingv1.Service{}). - Complete(r) + r.Log.Info("The InferenceGraph controller won't watch serving.knative.dev/v1/Service resources because the CRD is not available.") } + + return ctrlBuilder.Complete(r) } diff --git a/pkg/controller/v1alpha1/inferencegraph/controller_test.go b/pkg/controller/v1alpha1/inferencegraph/controller_test.go index 566db562f63..9c2264290d0 100644 --- a/pkg/controller/v1alpha1/inferencegraph/controller_test.go +++ b/pkg/controller/v1alpha1/inferencegraph/controller_test.go @@ -20,6 +20,7 @@ import ( "context" "github.com/kserve/kserve/pkg/apis/serving/v1alpha1" "github.com/kserve/kserve/pkg/constants" + "github.com/kserve/kserve/pkg/utils" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" v1 "k8s.io/api/core/v1" @@ -30,6 +31,7 @@ import ( "knative.dev/pkg/ptr" knservingdefaults "knative.dev/serving/pkg/apis/config" knservingv1 "knative.dev/serving/pkg/apis/serving/v1" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/reconcile" "time" ) @@ -461,4 +463,75 @@ var _ = Describe("Inference Graph controller test", func() { }) }) + Context("When creating an InferenceGraph in Serverless mode", func() { + It("Should fail if Knative Serving is not installed", func() { + // Simulate Knative Serving is absent by setting to false the relevant item in utils.gvResourcesCache variable + servingResources, getServingResourcesErr := utils.GetAvailableResourcesForApi(cfg, knservingv1.SchemeGroupVersion.String()) + Expect(getServingResourcesErr).To(BeNil()) + defer utils.SetAvailableResourcesForApi(knservingv1.SchemeGroupVersion.String(), servingResources) + utils.SetAvailableResourcesForApi(knservingv1.SchemeGroupVersion.String(), nil) + + By("By creating a new InferenceGraph") + var configMap = &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: constants.InferenceServiceConfigMapName, + Namespace: constants.KServeNamespace, + }, + Data: configs, + } + Expect(k8sClient.Create(context.TODO(), configMap)).NotTo(HaveOccurred()) + defer k8sClient.Delete(context.TODO(), configMap) + + graphName := "singlenode1" + var expectedRequest = reconcile.Request{NamespacedName: types.NamespacedName{Name: graphName, Namespace: "default"}} + var serviceKey = expectedRequest.NamespacedName + ctx := context.Background() + ig := &v1alpha1.InferenceGraph{ + ObjectMeta: metav1.ObjectMeta{ + Name: serviceKey.Name, + Namespace: serviceKey.Namespace, + Annotations: map[string]string{ + "serving.kserve.io/deploymentMode": string(constants.Serverless), + }, + }, + Spec: v1alpha1.InferenceGraphSpec{ + Nodes: map[string]v1alpha1.InferenceRouter{ + v1alpha1.GraphRootNodeName: { + RouterType: v1alpha1.Sequence, + Steps: []v1alpha1.InferenceStep{ + { + InferenceTarget: v1alpha1.InferenceTarget{ + ServiceURL: "http://someservice.exmaple.com", + }, + }, + }, + }, + }, + }, + } + Expect(k8sClient.Create(ctx, ig)).Should(Succeed()) + defer k8sClient.Delete(ctx, ig) + + Eventually(func() bool { + events := &v1.EventList{} + err := k8sClient.List(ctx, events, client.InNamespace(serviceKey.Namespace)) + if err != nil { + return false + } + if events == nil { + return false + } + + for _, event := range events.Items { + if event.InvolvedObject.Kind == "InferenceGraph" && + event.InvolvedObject.Name == serviceKey.Name && + event.Reason == "ServerlessModeRejected" { + return true + } + } + + return false + }, timeout, interval).Should(BeTrue()) + }) + }) }) diff --git a/pkg/controller/v1beta1/inferenceservice/controller.go b/pkg/controller/v1beta1/inferenceservice/controller.go index 6ee7c79384a..a897814870a 100644 --- a/pkg/controller/v1beta1/inferenceservice/controller.go +++ b/pkg/controller/v1beta1/inferenceservice/controller.go @@ -42,6 +42,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/rest" "k8s.io/client-go/tools/record" "knative.dev/pkg/apis" knservingv1 "knative.dev/serving/pkg/apis/serving/v1" @@ -87,9 +88,10 @@ const ( // InferenceServiceReconciler reconciles a InferenceService object type InferenceServiceReconciler struct { client.Client - Log logr.Logger - Scheme *runtime.Scheme - Recorder record.EventRecorder + ClientConfig *rest.Config + Log logr.Logger + Scheme *runtime.Scheme + Recorder record.EventRecorder } func (r *InferenceServiceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { @@ -164,6 +166,21 @@ func (r *InferenceServiceReconciler) Reconcile(ctx context.Context, req ctrl.Req return ctrl.Result{}, nil } + // Abort early if the resolved deployment mode is Serverless, but Knative Services are not available + if deploymentMode == constants.Serverless { + ksvcAvailable, checkKsvcErr := utils.IsCrdAvailable(r.ClientConfig, knservingv1.SchemeGroupVersion.String(), constants.KnativeServiceKind) + if err != nil { + return reconcile.Result{}, checkKsvcErr + } + + if !ksvcAvailable { + r.Recorder.Event(isvc, v1.EventTypeWarning, "ServerlessModeRejected", + "It is not possible to use Serverless deployment mode when Knative Services are not available") + return reconcile.Result{Requeue: false}, nil + } + } + + // Setup reconcilers r.Log.Info("Reconciling inference service", "apiVersion", isvc.APIVersion, "isvc", isvc.Name) isvcConfig, err := v1beta1api.NewInferenceServicesConfig(r.Client) if err != nil { @@ -304,26 +321,35 @@ func inferenceServiceStatusEqual(s1, s2 v1beta1api.InferenceServiceStatus, deplo } func (r *InferenceServiceReconciler) SetupWithManager(mgr ctrl.Manager, deployConfig *v1beta1api.DeployConfig, ingressConfig *v1beta1api.IngressConfig) error { - if deployConfig.DefaultDeploymentMode == string(constants.RawDeployment) { - return ctrl.NewControllerManagedBy(mgr). - For(&v1beta1api.InferenceService{}). - Owns(&appsv1.Deployment{}). - Complete(r) - } else if ingressConfig.DisableIstioVirtualHost == false { - return ctrl.NewControllerManagedBy(mgr). - For(&v1beta1api.InferenceService{}). - Owns(&knservingv1.Service{}). - Owns(&v1alpha3.VirtualService{}). - Owns(&appsv1.Deployment{}). - Complete(r) + r.ClientConfig = mgr.GetConfig() + + ksvcFound, err := utils.IsCrdAvailable(r.ClientConfig, knservingv1.SchemeGroupVersion.String(), constants.KnativeServiceKind) + if err != nil { + return err + } + + vsFound, err := utils.IsCrdAvailable(r.ClientConfig, v1alpha3.SchemeGroupVersion.String(), constants.IstioVirtualServiceKind) + if err != nil { + return err + } + + ctrlBuilder := ctrl.NewControllerManagedBy(mgr). + For(&v1beta1api.InferenceService{}). + Owns(&appsv1.Deployment{}) + + if ksvcFound { + ctrlBuilder = ctrlBuilder.Owns(&knservingv1.Service{}) + } else { + r.Log.Info("The InferenceService controller won't watch serving.knative.dev/v1/Service resources because the CRD is not available.") + } + + if vsFound && ingressConfig.DisableIstioVirtualHost == false { + ctrlBuilder = ctrlBuilder.Owns(&v1alpha3.VirtualService{}) } else { - return ctrl.NewControllerManagedBy(mgr). - For(&v1beta1api.InferenceService{}). - Owns(&knservingv1.Service{}). - Owns(&appsv1.Deployment{}). - Complete(r) + r.Log.Info("The InferenceService controller won't watch networking.istio.io/v1alpha3/VirtualService resources because the CRD is not available.") } + return ctrlBuilder.Complete(r) } func (r *InferenceServiceReconciler) deleteExternalResources(isvc *v1beta1api.InferenceService) error { diff --git a/pkg/controller/v1beta1/inferenceservice/controller_test.go b/pkg/controller/v1beta1/inferenceservice/controller_test.go index 6630529799f..9650f281cde 100644 --- a/pkg/controller/v1beta1/inferenceservice/controller_test.go +++ b/pkg/controller/v1beta1/inferenceservice/controller_test.go @@ -29,6 +29,7 @@ import ( "github.com/kserve/kserve/pkg/apis/serving/v1alpha1" "github.com/kserve/kserve/pkg/apis/serving/v1beta1" "github.com/kserve/kserve/pkg/constants" + "github.com/kserve/kserve/pkg/utils" . "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" . "github.com/onsi/gomega" @@ -383,6 +384,83 @@ var _ = Describe("v1beta1 inference service controller", func() { Expect(updatedVirtualService.Annotations).To(gomega.Equal(annotations)) Expect(updatedVirtualService.Labels).To(gomega.Equal(labels)) }) + It("Should fail if Knative Serving is not installed", func() { + // Simulate Knative Serving is absent by setting to false the relevant item in utils.gvResourcesCache variable + servingResources, getServingResourcesErr := utils.GetAvailableResourcesForApi(cfg, knservingv1.SchemeGroupVersion.String()) + Expect(getServingResourcesErr).To(BeNil()) + defer utils.SetAvailableResourcesForApi(knservingv1.SchemeGroupVersion.String(), servingResources) + utils.SetAvailableResourcesForApi(knservingv1.SchemeGroupVersion.String(), nil) + + // Create configmap + var configMap = &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: constants.InferenceServiceConfigMapName, + Namespace: constants.KServeNamespace, + }, + Data: configs, + } + Expect(k8sClient.Create(context.TODO(), configMap)).NotTo(HaveOccurred()) + defer k8sClient.Delete(context.TODO(), configMap) + + // Create InferenceService + serviceName := "serverless-isvc" + var expectedRequest = reconcile.Request{NamespacedName: types.NamespacedName{Name: serviceName, Namespace: "default"}} + var serviceKey = expectedRequest.NamespacedName + var storageUri = "s3://test/mnist/export" + isvc := &v1beta1.InferenceService{ + ObjectMeta: metav1.ObjectMeta{ + Name: serviceKey.Name, + Namespace: serviceKey.Namespace, + Annotations: map[string]string{ + "serving.kserve.io/deploymentMode": "Serverless", + }, + }, + Spec: v1beta1.InferenceServiceSpec{ + Predictor: v1beta1.PredictorSpec{ + ComponentExtensionSpec: v1beta1.ComponentExtensionSpec{ + MinReplicas: v1beta1.GetIntReference(1), + MaxReplicas: 3, + }, + Tensorflow: &v1beta1.TFServingSpec{ + PredictorExtensionSpec: v1beta1.PredictorExtensionSpec{ + StorageURI: &storageUri, + RuntimeVersion: proto.String("1.14.0"), + Container: v1.Container{ + Name: constants.InferenceServiceContainerName, + Resources: defaultResource, + }, + }, + }, + }, + }, + } + isvc.DefaultInferenceService(nil, nil) + + ctx := context.Background() + Expect(k8sClient.Create(ctx, isvc)).Should(Succeed()) + defer k8sClient.Delete(ctx, isvc) + + Eventually(func() bool { + events := &v1.EventList{} + err := k8sClient.List(ctx, events, client.InNamespace(serviceKey.Namespace)) + if err != nil { + return false + } + if events == nil { + return false + } + + for _, event := range events.Items { + if event.InvolvedObject.Kind == "InferenceService" && + event.InvolvedObject.Name == serviceKey.Name && + event.Reason == "ServerlessModeRejected" { + return true + } + } + + return false + }, timeout, interval).Should(BeTrue()) + }) }) Context("Inference Service with transformer", func() { diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index 9ff91ee1bb0..b69fda6de52 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -21,6 +21,10 @@ import ( "github.com/kserve/kserve/pkg/constants" v1 "k8s.io/api/core/v1" + apierr "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/discovery" + "k8s.io/client-go/rest" ) /* NOTE TO AUTHORS: @@ -29,6 +33,8 @@ import ( * Please add functional style container operations sparingly and intentionally. */ +var gvResourcesCache map[string]*metav1.APIResourceList + func Filter(origin map[string]string, predicate func(string) bool) map[string]string { result := make(map[string]string) for k, v := range origin { @@ -153,3 +159,62 @@ func AppendEnvVarIfNotExists(slice []v1.EnvVar, elems ...v1.EnvVar) []v1.EnvVar } return slice } + +// IsCrdAvailable checks if a given CRD is present in the cluster by verifying the +// existence of its API. +func IsCrdAvailable(config *rest.Config, groupVersion, kind string) (bool, error) { + gvResources, err := GetAvailableResourcesForApi(config, groupVersion) + if err != nil { + return false, err + } + + found := false + if gvResources != nil { + for _, crd := range gvResources.APIResources { + if crd.Kind == kind { + found = true + break + } + } + } + + return found, nil +} + +// GetAvailableResourcesForApi returns the list of discovered resources that belong +// to the API specified in groupVersion. The first query to a specifig groupVersion will +// query the cluster API server to discover the available resources and the discovered +// resources will be cached and returned to subsequent invocations to prevent additional +// queries to the API server. +func GetAvailableResourcesForApi(config *rest.Config, groupVersion string) (*metav1.APIResourceList, error) { + var gvResources *metav1.APIResourceList + var ok bool + + if gvResources, ok = gvResourcesCache[groupVersion]; !ok { + discoveryClient, newClientErr := discovery.NewDiscoveryClientForConfig(config) + if newClientErr != nil { + return nil, newClientErr + } + + var getGvResourcesErr error + gvResources, getGvResourcesErr = discoveryClient.ServerResourcesForGroupVersion(groupVersion) + if getGvResourcesErr != nil && !apierr.IsNotFound(getGvResourcesErr) { + return nil, getGvResourcesErr + } + + SetAvailableResourcesForApi(groupVersion, gvResources) + } + + return gvResources, nil +} + +// SetAvailableResourcesForApi stores the value fo resources argument in the global cache +// of discovered API resources. This function should never be called directly. It is exported +// for usage in tests. +func SetAvailableResourcesForApi(groupVersion string, resources *metav1.APIResourceList) { + if gvResourcesCache == nil { + gvResourcesCache = make(map[string]*metav1.APIResourceList) + } + + gvResourcesCache[groupVersion] = resources +}