diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go index 827532dc984..baf55f157b3 100644 --- a/pkg/constants/constants.go +++ b/pkg/constants/constants.go @@ -426,6 +426,12 @@ const ( StateReasonCrashLoopBackOff = "CrashLoopBackOff" ) +// CRD Kinds +const ( + IstioVirtualServiceKind = "VirtualService" + KnativeServiceKind = "Service" +) + // GetRawServiceLabel generate native service label func GetRawServiceLabel(service string) string { return "isvc." + service diff --git a/pkg/controller/v1alpha1/inferencegraph/controller.go b/pkg/controller/v1alpha1/inferencegraph/controller.go index 860729df392..a9b144581a3 100644 --- a/pkg/controller/v1alpha1/inferencegraph/controller.go +++ b/pkg/controller/v1alpha1/inferencegraph/controller.go @@ -25,7 +25,6 @@ import ( "context" "encoding/json" "fmt" - isvcutils "github.com/kserve/kserve/pkg/controller/v1beta1/inferenceservice/utils" appsv1 "k8s.io/api/apps/v1" "k8s.io/client-go/util/retry" @@ -34,6 +33,8 @@ import ( "github.com/kserve/kserve/pkg/apis/serving/v1beta1" v1beta1api "github.com/kserve/kserve/pkg/apis/serving/v1beta1" "github.com/kserve/kserve/pkg/constants" + isvcutils "github.com/kserve/kserve/pkg/controller/v1beta1/inferenceservice/utils" + "github.com/kserve/kserve/pkg/utils" "github.com/pkg/errors" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" @@ -41,6 +42,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/rest" "k8s.io/client-go/tools/record" "knative.dev/pkg/apis" knservingv1 "knative.dev/serving/pkg/apis/serving/v1" @@ -53,9 +55,10 @@ import ( // InferenceGraphReconciler reconciles a InferenceGraph object type InferenceGraphReconciler struct { client.Client - Log logr.Logger - Scheme *runtime.Scheme - Recorder record.EventRecorder + ClientConfig *rest.Config + Log logr.Logger + Scheme *runtime.Scheme + Recorder record.EventRecorder } // InferenceGraphState describes the Readiness of the InferenceGraph @@ -174,6 +177,19 @@ func (r *InferenceGraphReconciler) Reconcile(ctx context.Context, req ctrl.Reque r.Log.Error(err, "name", graph.GetName()) return reconcile.Result{}, err } + + // Abort if Knative Services are not available + ksvcAvailable, checkKsvcErr := utils.IsCrdAvailable(r.ClientConfig, knservingv1.SchemeGroupVersion.String(), constants.KnativeServiceKind) + if err != nil { + return reconcile.Result{}, checkKsvcErr + } + + if !ksvcAvailable { + r.Recorder.Event(graph, v1.EventTypeWarning, "ServerlessModeRejected", + "It is not possible to use Serverless deployment mode when Knative Services are not available") + return reconcile.Result{Requeue: false}, nil + } + //@TODO check raw deployment mode desired := createKnativeService(graph.ObjectMeta, graph, routerConfig) err = controllerutil.SetControllerReference(graph, desired, r.Scheme) @@ -253,15 +269,22 @@ func inferenceGraphReadiness(status v1alpha1api.InferenceGraphStatus) bool { } func (r *InferenceGraphReconciler) SetupWithManager(mgr ctrl.Manager, deployConfig *v1beta1api.DeployConfig) error { - if deployConfig.DefaultDeploymentMode == string(constants.RawDeployment) { - return ctrl.NewControllerManagedBy(mgr). - For(&v1alpha1api.InferenceGraph{}). - Owns(&appsv1.Deployment{}). - Complete(r) + r.ClientConfig = mgr.GetConfig() + + ksvcFound, err := utils.IsCrdAvailable(r.ClientConfig, knservingv1.SchemeGroupVersion.String(), constants.KnativeServiceKind) + if err != nil { + return err + } + + ctrlBuilder := ctrl.NewControllerManagedBy(mgr). + For(&v1alpha1api.InferenceGraph{}). + Owns(&appsv1.Deployment{}) + + if ksvcFound { + ctrlBuilder = ctrlBuilder.Owns(&knservingv1.Service{}) } else { - return ctrl.NewControllerManagedBy(mgr). - For(&v1alpha1api.InferenceGraph{}). - Owns(&knservingv1.Service{}). - Complete(r) + r.Log.Info("The InferenceGraph controller won't watch serving.knative.dev/v1/Service resources because the CRD is not available.") } + + return ctrlBuilder.Complete(r) } diff --git a/pkg/controller/v1alpha1/inferencegraph/controller_test.go b/pkg/controller/v1alpha1/inferencegraph/controller_test.go index 566db562f63..9c2264290d0 100644 --- a/pkg/controller/v1alpha1/inferencegraph/controller_test.go +++ b/pkg/controller/v1alpha1/inferencegraph/controller_test.go @@ -20,6 +20,7 @@ import ( "context" "github.com/kserve/kserve/pkg/apis/serving/v1alpha1" "github.com/kserve/kserve/pkg/constants" + "github.com/kserve/kserve/pkg/utils" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" v1 "k8s.io/api/core/v1" @@ -30,6 +31,7 @@ import ( "knative.dev/pkg/ptr" knservingdefaults "knative.dev/serving/pkg/apis/config" knservingv1 "knative.dev/serving/pkg/apis/serving/v1" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/reconcile" "time" ) @@ -461,4 +463,75 @@ var _ = Describe("Inference Graph controller test", func() { }) }) + Context("When creating an InferenceGraph in Serverless mode", func() { + It("Should fail if Knative Serving is not installed", func() { + // Simulate Knative Serving is absent by setting to false the relevant item in utils.gvResourcesCache variable + servingResources, getServingResourcesErr := utils.GetAvailableResourcesForApi(cfg, knservingv1.SchemeGroupVersion.String()) + Expect(getServingResourcesErr).To(BeNil()) + defer utils.SetAvailableResourcesForApi(knservingv1.SchemeGroupVersion.String(), servingResources) + utils.SetAvailableResourcesForApi(knservingv1.SchemeGroupVersion.String(), nil) + + By("By creating a new InferenceGraph") + var configMap = &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: constants.InferenceServiceConfigMapName, + Namespace: constants.KServeNamespace, + }, + Data: configs, + } + Expect(k8sClient.Create(context.TODO(), configMap)).NotTo(HaveOccurred()) + defer k8sClient.Delete(context.TODO(), configMap) + + graphName := "singlenode1" + var expectedRequest = reconcile.Request{NamespacedName: types.NamespacedName{Name: graphName, Namespace: "default"}} + var serviceKey = expectedRequest.NamespacedName + ctx := context.Background() + ig := &v1alpha1.InferenceGraph{ + ObjectMeta: metav1.ObjectMeta{ + Name: serviceKey.Name, + Namespace: serviceKey.Namespace, + Annotations: map[string]string{ + "serving.kserve.io/deploymentMode": string(constants.Serverless), + }, + }, + Spec: v1alpha1.InferenceGraphSpec{ + Nodes: map[string]v1alpha1.InferenceRouter{ + v1alpha1.GraphRootNodeName: { + RouterType: v1alpha1.Sequence, + Steps: []v1alpha1.InferenceStep{ + { + InferenceTarget: v1alpha1.InferenceTarget{ + ServiceURL: "http://someservice.exmaple.com", + }, + }, + }, + }, + }, + }, + } + Expect(k8sClient.Create(ctx, ig)).Should(Succeed()) + defer k8sClient.Delete(ctx, ig) + + Eventually(func() bool { + events := &v1.EventList{} + err := k8sClient.List(ctx, events, client.InNamespace(serviceKey.Namespace)) + if err != nil { + return false + } + if events == nil { + return false + } + + for _, event := range events.Items { + if event.InvolvedObject.Kind == "InferenceGraph" && + event.InvolvedObject.Name == serviceKey.Name && + event.Reason == "ServerlessModeRejected" { + return true + } + } + + return false + }, timeout, interval).Should(BeTrue()) + }) + }) }) diff --git a/pkg/controller/v1beta1/inferenceservice/controller.go b/pkg/controller/v1beta1/inferenceservice/controller.go index 6ee7c79384a..a897814870a 100644 --- a/pkg/controller/v1beta1/inferenceservice/controller.go +++ b/pkg/controller/v1beta1/inferenceservice/controller.go @@ -42,6 +42,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/rest" "k8s.io/client-go/tools/record" "knative.dev/pkg/apis" knservingv1 "knative.dev/serving/pkg/apis/serving/v1" @@ -87,9 +88,10 @@ const ( // InferenceServiceReconciler reconciles a InferenceService object type InferenceServiceReconciler struct { client.Client - Log logr.Logger - Scheme *runtime.Scheme - Recorder record.EventRecorder + ClientConfig *rest.Config + Log logr.Logger + Scheme *runtime.Scheme + Recorder record.EventRecorder } func (r *InferenceServiceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { @@ -164,6 +166,21 @@ func (r *InferenceServiceReconciler) Reconcile(ctx context.Context, req ctrl.Req return ctrl.Result{}, nil } + // Abort early if the resolved deployment mode is Serverless, but Knative Services are not available + if deploymentMode == constants.Serverless { + ksvcAvailable, checkKsvcErr := utils.IsCrdAvailable(r.ClientConfig, knservingv1.SchemeGroupVersion.String(), constants.KnativeServiceKind) + if err != nil { + return reconcile.Result{}, checkKsvcErr + } + + if !ksvcAvailable { + r.Recorder.Event(isvc, v1.EventTypeWarning, "ServerlessModeRejected", + "It is not possible to use Serverless deployment mode when Knative Services are not available") + return reconcile.Result{Requeue: false}, nil + } + } + + // Setup reconcilers r.Log.Info("Reconciling inference service", "apiVersion", isvc.APIVersion, "isvc", isvc.Name) isvcConfig, err := v1beta1api.NewInferenceServicesConfig(r.Client) if err != nil { @@ -304,26 +321,35 @@ func inferenceServiceStatusEqual(s1, s2 v1beta1api.InferenceServiceStatus, deplo } func (r *InferenceServiceReconciler) SetupWithManager(mgr ctrl.Manager, deployConfig *v1beta1api.DeployConfig, ingressConfig *v1beta1api.IngressConfig) error { - if deployConfig.DefaultDeploymentMode == string(constants.RawDeployment) { - return ctrl.NewControllerManagedBy(mgr). - For(&v1beta1api.InferenceService{}). - Owns(&appsv1.Deployment{}). - Complete(r) - } else if ingressConfig.DisableIstioVirtualHost == false { - return ctrl.NewControllerManagedBy(mgr). - For(&v1beta1api.InferenceService{}). - Owns(&knservingv1.Service{}). - Owns(&v1alpha3.VirtualService{}). - Owns(&appsv1.Deployment{}). - Complete(r) + r.ClientConfig = mgr.GetConfig() + + ksvcFound, err := utils.IsCrdAvailable(r.ClientConfig, knservingv1.SchemeGroupVersion.String(), constants.KnativeServiceKind) + if err != nil { + return err + } + + vsFound, err := utils.IsCrdAvailable(r.ClientConfig, v1alpha3.SchemeGroupVersion.String(), constants.IstioVirtualServiceKind) + if err != nil { + return err + } + + ctrlBuilder := ctrl.NewControllerManagedBy(mgr). + For(&v1beta1api.InferenceService{}). + Owns(&appsv1.Deployment{}) + + if ksvcFound { + ctrlBuilder = ctrlBuilder.Owns(&knservingv1.Service{}) + } else { + r.Log.Info("The InferenceService controller won't watch serving.knative.dev/v1/Service resources because the CRD is not available.") + } + + if vsFound && ingressConfig.DisableIstioVirtualHost == false { + ctrlBuilder = ctrlBuilder.Owns(&v1alpha3.VirtualService{}) } else { - return ctrl.NewControllerManagedBy(mgr). - For(&v1beta1api.InferenceService{}). - Owns(&knservingv1.Service{}). - Owns(&appsv1.Deployment{}). - Complete(r) + r.Log.Info("The InferenceService controller won't watch networking.istio.io/v1alpha3/VirtualService resources because the CRD is not available.") } + return ctrlBuilder.Complete(r) } func (r *InferenceServiceReconciler) deleteExternalResources(isvc *v1beta1api.InferenceService) error { diff --git a/pkg/controller/v1beta1/inferenceservice/controller_test.go b/pkg/controller/v1beta1/inferenceservice/controller_test.go index 6630529799f..9650f281cde 100644 --- a/pkg/controller/v1beta1/inferenceservice/controller_test.go +++ b/pkg/controller/v1beta1/inferenceservice/controller_test.go @@ -29,6 +29,7 @@ import ( "github.com/kserve/kserve/pkg/apis/serving/v1alpha1" "github.com/kserve/kserve/pkg/apis/serving/v1beta1" "github.com/kserve/kserve/pkg/constants" + "github.com/kserve/kserve/pkg/utils" . "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" . "github.com/onsi/gomega" @@ -383,6 +384,83 @@ var _ = Describe("v1beta1 inference service controller", func() { Expect(updatedVirtualService.Annotations).To(gomega.Equal(annotations)) Expect(updatedVirtualService.Labels).To(gomega.Equal(labels)) }) + It("Should fail if Knative Serving is not installed", func() { + // Simulate Knative Serving is absent by setting to false the relevant item in utils.gvResourcesCache variable + servingResources, getServingResourcesErr := utils.GetAvailableResourcesForApi(cfg, knservingv1.SchemeGroupVersion.String()) + Expect(getServingResourcesErr).To(BeNil()) + defer utils.SetAvailableResourcesForApi(knservingv1.SchemeGroupVersion.String(), servingResources) + utils.SetAvailableResourcesForApi(knservingv1.SchemeGroupVersion.String(), nil) + + // Create configmap + var configMap = &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: constants.InferenceServiceConfigMapName, + Namespace: constants.KServeNamespace, + }, + Data: configs, + } + Expect(k8sClient.Create(context.TODO(), configMap)).NotTo(HaveOccurred()) + defer k8sClient.Delete(context.TODO(), configMap) + + // Create InferenceService + serviceName := "serverless-isvc" + var expectedRequest = reconcile.Request{NamespacedName: types.NamespacedName{Name: serviceName, Namespace: "default"}} + var serviceKey = expectedRequest.NamespacedName + var storageUri = "s3://test/mnist/export" + isvc := &v1beta1.InferenceService{ + ObjectMeta: metav1.ObjectMeta{ + Name: serviceKey.Name, + Namespace: serviceKey.Namespace, + Annotations: map[string]string{ + "serving.kserve.io/deploymentMode": "Serverless", + }, + }, + Spec: v1beta1.InferenceServiceSpec{ + Predictor: v1beta1.PredictorSpec{ + ComponentExtensionSpec: v1beta1.ComponentExtensionSpec{ + MinReplicas: v1beta1.GetIntReference(1), + MaxReplicas: 3, + }, + Tensorflow: &v1beta1.TFServingSpec{ + PredictorExtensionSpec: v1beta1.PredictorExtensionSpec{ + StorageURI: &storageUri, + RuntimeVersion: proto.String("1.14.0"), + Container: v1.Container{ + Name: constants.InferenceServiceContainerName, + Resources: defaultResource, + }, + }, + }, + }, + }, + } + isvc.DefaultInferenceService(nil, nil) + + ctx := context.Background() + Expect(k8sClient.Create(ctx, isvc)).Should(Succeed()) + defer k8sClient.Delete(ctx, isvc) + + Eventually(func() bool { + events := &v1.EventList{} + err := k8sClient.List(ctx, events, client.InNamespace(serviceKey.Namespace)) + if err != nil { + return false + } + if events == nil { + return false + } + + for _, event := range events.Items { + if event.InvolvedObject.Kind == "InferenceService" && + event.InvolvedObject.Name == serviceKey.Name && + event.Reason == "ServerlessModeRejected" { + return true + } + } + + return false + }, timeout, interval).Should(BeTrue()) + }) }) Context("Inference Service with transformer", func() { diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index 9ff91ee1bb0..b69fda6de52 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -21,6 +21,10 @@ import ( "github.com/kserve/kserve/pkg/constants" v1 "k8s.io/api/core/v1" + apierr "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/discovery" + "k8s.io/client-go/rest" ) /* NOTE TO AUTHORS: @@ -29,6 +33,8 @@ import ( * Please add functional style container operations sparingly and intentionally. */ +var gvResourcesCache map[string]*metav1.APIResourceList + func Filter(origin map[string]string, predicate func(string) bool) map[string]string { result := make(map[string]string) for k, v := range origin { @@ -153,3 +159,62 @@ func AppendEnvVarIfNotExists(slice []v1.EnvVar, elems ...v1.EnvVar) []v1.EnvVar } return slice } + +// IsCrdAvailable checks if a given CRD is present in the cluster by verifying the +// existence of its API. +func IsCrdAvailable(config *rest.Config, groupVersion, kind string) (bool, error) { + gvResources, err := GetAvailableResourcesForApi(config, groupVersion) + if err != nil { + return false, err + } + + found := false + if gvResources != nil { + for _, crd := range gvResources.APIResources { + if crd.Kind == kind { + found = true + break + } + } + } + + return found, nil +} + +// GetAvailableResourcesForApi returns the list of discovered resources that belong +// to the API specified in groupVersion. The first query to a specifig groupVersion will +// query the cluster API server to discover the available resources and the discovered +// resources will be cached and returned to subsequent invocations to prevent additional +// queries to the API server. +func GetAvailableResourcesForApi(config *rest.Config, groupVersion string) (*metav1.APIResourceList, error) { + var gvResources *metav1.APIResourceList + var ok bool + + if gvResources, ok = gvResourcesCache[groupVersion]; !ok { + discoveryClient, newClientErr := discovery.NewDiscoveryClientForConfig(config) + if newClientErr != nil { + return nil, newClientErr + } + + var getGvResourcesErr error + gvResources, getGvResourcesErr = discoveryClient.ServerResourcesForGroupVersion(groupVersion) + if getGvResourcesErr != nil && !apierr.IsNotFound(getGvResourcesErr) { + return nil, getGvResourcesErr + } + + SetAvailableResourcesForApi(groupVersion, gvResources) + } + + return gvResources, nil +} + +// SetAvailableResourcesForApi stores the value fo resources argument in the global cache +// of discovered API resources. This function should never be called directly. It is exported +// for usage in tests. +func SetAvailableResourcesForApi(groupVersion string, resources *metav1.APIResourceList) { + if gvResourcesCache == nil { + gvResourcesCache = make(map[string]*metav1.APIResourceList) + } + + gvResourcesCache[groupVersion] = resources +}