Skip to content

Commit

Permalink
Record Events when InferenceService goes in and out of readiness (kub…
Browse files Browse the repository at this point in the history
…eflow#876)

* enabling event recording for service state

* add test for failing event

* resolved comments
  • Loading branch information
ifilonenko authored Jun 15, 2020
1 parent bda2209 commit d0fc859
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 17 deletions.
29 changes: 21 additions & 8 deletions pkg/apis/serving/v1alpha2/inferenceservice_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,18 @@ func (ss *InferenceServiceStatus) GetCondition(t apis.ConditionType) *apis.Condi
return conditionSet.Manage(ss).GetCondition(t)
}

// InferenceState describes the Readiness of the InferenceService
type InferenceServiceState string

// Different InferenceServiceState an InferenceService may have.
const (
InferenceServiceReadyState InferenceServiceState = "InferenceServiceReady"
InferenceServiceNotReadyState InferenceServiceState = "InferenceServiceNotReady"
)

// PropagateDefaultStatus propagates the status for the default spec
func (ss *InferenceServiceStatus) PropagateDefaultStatus(component constants.InferenceServiceComponent, defaultStatus *knservingv1.ServiceStatus) {
func (ss *InferenceServiceStatus) PropagateDefaultStatus(component constants.InferenceServiceComponent,
defaultStatus *knservingv1.ServiceStatus) bool {
if ss.Default == nil {
emptyStatusMap := make(map[constants.InferenceServiceComponent]StatusConfigurationSpec)
ss.Default = &emptyStatusMap
Expand All @@ -86,19 +96,20 @@ func (ss *InferenceServiceStatus) PropagateDefaultStatus(component constants.Inf
if defaultStatus == nil {
conditionSet.Manage(ss).ClearCondition(conditionType)
delete(*ss.Default, component)
return
return false
}

statusSpec, ok := (*ss.Default)[component]
if !ok {
statusSpec = StatusConfigurationSpec{}
(*ss.Default)[component] = statusSpec
}
ss.propagateStatus(component, false, conditionType, defaultStatus)
return ss.propagateStatus(component, false, conditionType, defaultStatus)
}

// PropagateCanaryStatus propagates the status for the canary spec
func (ss *InferenceServiceStatus) PropagateCanaryStatus(component constants.InferenceServiceComponent, canaryStatus *knservingv1.ServiceStatus) {
func (ss *InferenceServiceStatus) PropagateCanaryStatus(component constants.InferenceServiceComponent,
canaryStatus *knservingv1.ServiceStatus) bool {
if ss.Canary == nil {
emptyStatusMap := make(map[constants.InferenceServiceComponent]StatusConfigurationSpec)
ss.Canary = &emptyStatusMap
Expand All @@ -108,31 +119,32 @@ func (ss *InferenceServiceStatus) PropagateCanaryStatus(component constants.Infe
if canaryStatus == nil {
conditionSet.Manage(ss).ClearCondition(conditionType)
delete(*ss.Canary, component)
return
return false
}

statusSpec, ok := (*ss.Canary)[component]
if !ok {
statusSpec = StatusConfigurationSpec{}
(*ss.Canary)[component] = statusSpec
}
ss.propagateStatus(component, true, conditionType, canaryStatus)
return ss.propagateStatus(component, true, conditionType, canaryStatus)
}

func (ss *InferenceServiceStatus) propagateStatus(component constants.InferenceServiceComponent, isCanary bool,
conditionType apis.ConditionType,
serviceStatus *knservingv1.ServiceStatus) {
serviceStatus *knservingv1.ServiceStatus) bool {
statusSpec := StatusConfigurationSpec{}
statusSpec.Name = serviceStatus.LatestCreatedRevisionName
serviceCondition := serviceStatus.GetCondition(knservingv1.ServiceConditionReady)

isReady := false
switch {
case serviceCondition == nil:
case serviceCondition.Status == v1.ConditionUnknown:
conditionSet.Manage(ss).MarkUnknown(conditionType, serviceCondition.Reason, serviceCondition.Message)
statusSpec.Hostname = ""
case serviceCondition.Status == v1.ConditionTrue:
conditionSet.Manage(ss).MarkTrue(conditionType)
isReady = true
if serviceStatus.URL != nil {
statusSpec.Hostname = serviceStatus.URL.Host
}
Expand All @@ -145,6 +157,7 @@ func (ss *InferenceServiceStatus) propagateStatus(component constants.InferenceS
} else {
(*ss.Default)[component] = statusSpec
}
return isReady
}

// PropagateRouteStatus propagates route's status to the service's status.
Expand Down
19 changes: 16 additions & 3 deletions pkg/controller/inferenceservice/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package service
import (
"context"
"istio.io/client-go/pkg/apis/networking/v1alpha3"
"k8s.io/client-go/kubernetes"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"os"

"github.com/kubeflow/kfserving/pkg/controller/inferenceservice/reconcilers/istio"

Expand Down Expand Up @@ -60,11 +63,21 @@ func Add(mgr manager.Manager) error {
// newReconciler returns a new reconcile.Reconciler
func newReconciler(mgr manager.Manager) reconcile.Reconciler {
eventBroadcaster := record.NewBroadcaster()
clientSet, err := kubernetes.NewForConfig(mgr.GetConfig())
if err != nil {
log.Error(err, "unable to start manager")
os.Exit(1)
}

eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: clientSet.CoreV1().Events("")})

mgrScheme := mgr.GetScheme()

return &ReconcileService{
Client: mgr.GetClient(),
scheme: mgr.GetScheme(),
scheme: mgrScheme,
Recorder: eventBroadcaster.NewRecorder(
mgr.GetScheme(), v1.EventSource{Component: ControllerName}),
mgrScheme, v1.EventSource{Component: ControllerName}),
}
}

Expand Down Expand Up @@ -145,7 +158,7 @@ func (r *ReconcileService) Reconcile(request reconcile.Request) (reconcile.Resul
}

reconcilers := []Reconciler{
knative.NewServiceReconciler(r.Client, r.scheme, configMap),
knative.NewServiceReconciler(r.Client, r.scheme, r.Recorder, configMap),
istio.NewVirtualServiceReconciler(r.Client, r.scheme, configMap),
}

Expand Down
48 changes: 47 additions & 1 deletion pkg/controller/inferenceservice/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ func TestInferenceServiceWithOnlyPredictor(t *testing.T) {
mgr, err := manager.New(cfg, manager.Options{MetricsBindAddress: "0"})
g.Expect(err).NotTo(gomega.HaveOccurred())
c = mgr.GetClient()

recFn, requests := SetupTestReconcile(newReconciler(mgr))
g.Expect(add(mgr, recFn)).NotTo(gomega.HaveOccurred())

Expand Down Expand Up @@ -336,6 +335,53 @@ func TestInferenceServiceWithOnlyPredictor(t *testing.T) {
}
return &isvc.Status
}, timeout).Should(testutils.BeSematicEqual(&expectedKfsvcStatus))
g.Eventually(func() error {
events := &v1.EventList{}
if err := c.List(context.TODO(), events); err != nil {
return fmt.Errorf("Test %q failed: returned error: %v", serviceName, err)
}
if len(events.Items) == 0 {
return fmt.Errorf("Test %q failed: no events were created", serviceName)
}
for _, event := range events.Items {
if event.Reason == string(kfserving.InferenceServiceReadyState) &&
event.Type == v1.EventTypeNormal {
return nil
}
}
return fmt.Errorf("Test %q failed: events [%v] did not contain ready", serviceName, events.Items)
}, timeout).Should(gomega.Succeed())
// Testing that when service fails, that an event is thrown
failingService := &knservingv1.Service{}
g.Eventually(func() error { return c.Get(context.TODO(), predictorService, failingService) }, timeout).
Should(gomega.Succeed())
failingService.Status.LatestCreatedRevisionName = "revision-v2"
failingService.Status.LatestReadyRevisionName = "revision-v2"
failingService.Status.URL = nil
failingService.Status.Conditions = duckv1.Conditions{
{
Type: knservingv1.ServiceConditionReady,
Status: "False",
},
}
g.Expect(c.Status().Update(context.TODO(), failingService)).NotTo(gomega.HaveOccurred())
g.Eventually(requests, timeout).Should(gomega.Receive(gomega.Equal(expectedRequest)))
g.Eventually(func() error {
events := &v1.EventList{}
if err := c.List(context.TODO(), events); err != nil {
return fmt.Errorf("Test %q failed: returned error: %v", serviceName, err)
}
if len(events.Items) == 0 {
return fmt.Errorf("Test %q failed: no events were created", serviceName)
}
for _, event := range events.Items {
if event.Reason == string(kfserving.InferenceServiceNotReadyState) &&
event.Type == v1.EventTypeWarning {
return nil
}
}
return fmt.Errorf("Test %q failed: events [%v] did not contain warning", serviceName, events.Items)
}, timeout).Should(gomega.Succeed())
}

func TestInferenceServiceWithDefaultAndCanaryPredictor(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package knative
import (
"context"
"fmt"
"k8s.io/client-go/tools/record"

"github.com/kubeflow/kfserving/pkg/apis/serving/v1alpha2"
"github.com/kubeflow/kfserving/pkg/constants"
Expand All @@ -44,13 +45,15 @@ type ServiceReconciler struct {
client client.Client
scheme *runtime.Scheme
serviceBuilder *knative.ServiceBuilder
recorder record.EventRecorder
}

func NewServiceReconciler(client client.Client, scheme *runtime.Scheme, config *v1.ConfigMap) *ServiceReconciler {
func NewServiceReconciler(client client.Client, scheme *runtime.Scheme, recorder record.EventRecorder, config *v1.ConfigMap) *ServiceReconciler {
return &ServiceReconciler{
client: client,
scheme: scheme,
serviceBuilder: knative.NewServiceBuilder(client, config),
recorder: recorder,
}
}

Expand All @@ -70,6 +73,9 @@ func (r *ServiceReconciler) reconcileComponent(isvc *v1alpha2.InferenceService,
endpointSpec := &isvc.Spec.Default
serviceName := constants.DefaultServiceName(isvc.Name, component)
propagateStatusFn := isvc.Status.PropagateDefaultStatus
wasReady := isvc.Status.Conditions != nil &&
isvc.Status.GetCondition(knservingv1.ServiceConditionReady) != nil &&
isvc.Status.GetCondition(knservingv1.ServiceConditionReady).Status == v1.ConditionTrue
if isCanary {
endpointSpec = isvc.Spec.Canary
serviceName = constants.CanaryServiceName(isvc.Name, component)
Expand All @@ -83,23 +89,33 @@ func (r *ServiceReconciler) reconcileComponent(isvc *v1alpha2.InferenceService,
return err
}
}

if service == nil {
if err = r.finalizeService(serviceName, isvc.Namespace); err != nil {
return err
}
propagateStatusFn(component, nil)
// If it was ready and is no longer ready
r.RecordServeEvent(isvc, wasReady, propagateStatusFn(component, nil))
return nil
} else {
if status, err := r.reconcileService(isvc, service); err != nil {
return err
} else {
propagateStatusFn(component, status)
r.RecordServeEvent(isvc, wasReady, propagateStatusFn(component, status))
return nil
}
}
}

func (r *ServiceReconciler) RecordServeEvent(isvc *v1alpha2.InferenceService, old bool, new bool) {
if old && !new {
r.recorder.Event(isvc, v1.EventTypeWarning, string(v1alpha2.InferenceServiceNotReadyState),
fmt.Sprintf("InferenceService [%v] is no longer Ready ", isvc.Name))
} else if !old && new {
r.recorder.Event(isvc, v1.EventTypeNormal, string(v1alpha2.InferenceServiceReadyState),
fmt.Sprintf("InferenceService [%v] is Ready ", isvc.Name))
}
}

func (r *ServiceReconciler) finalizeService(serviceName, namespace string) error {
existing := &knservingv1.Service{}
if err := r.client.Get(context.TODO(), types.NamespacedName{Name: serviceName, Namespace: namespace}, existing); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func TestKnativeServiceReconcile(t *testing.T) {
g.Expect(err).NotTo(gomega.HaveOccurred())
stopMgr, mgrStopped := testutils.StartTestManager(mgr, g)
c := mgr.GetClient()
recorder := mgr.GetEventRecorderFor("InferenceServiceEventRecorder")

defer func() {
close(stopMgr)
Expand All @@ -78,7 +79,7 @@ func TestKnativeServiceReconcile(t *testing.T) {
}`,
}

serviceReconciler := NewServiceReconciler(c, mgr.GetScheme(), &v1.ConfigMap{
serviceReconciler := NewServiceReconciler(c, mgr.GetScheme(), recorder, &v1.ConfigMap{
Data: configs,
})
scenarios := map[string]struct {
Expand Down

0 comments on commit d0fc859

Please sign in to comment.