Skip to content

Commit

Permalink
Retry on resource conflict error (kubeflow#3035)
Browse files Browse the repository at this point in the history
Signed-off-by: Sivanantham Chinnaiyan <sivanantham.chinnaiyan@ideas2it.com>
  • Loading branch information
sivanantha321 authored Jul 24, 2023
1 parent 9331ed1 commit a294e6e
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 82 deletions.
63 changes: 35 additions & 28 deletions pkg/controller/v1alpha1/inferencegraph/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"fmt"
isvcutils "github.com/kserve/kserve/pkg/controller/v1beta1/inferenceservice/utils"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/client-go/util/retry"

"github.com/go-logr/logr"
v1alpha1api "github.com/kserve/kserve/pkg/apis/serving/v1alpha1"
Expand Down Expand Up @@ -207,36 +208,42 @@ func (r *InferenceGraphReconciler) Reconcile(ctx context.Context, req ctrl.Reque
}

func (r *InferenceGraphReconciler) updateStatus(desiredGraph *v1alpha1api.InferenceGraph) error {
graph := &v1alpha1api.InferenceGraph{}
namespacedName := types.NamespacedName{Name: desiredGraph.Name, Namespace: desiredGraph.Namespace}
if err := r.Get(context.TODO(), namespacedName, graph); err != nil {
return err
}
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
graph := &v1alpha1api.InferenceGraph{}
namespacedName := types.NamespacedName{Name: desiredGraph.Name, Namespace: desiredGraph.Namespace}
if err := r.Get(context.TODO(), namespacedName, graph); err != nil {
return err
}

wasReady := inferenceGraphReadiness(graph.Status)
if equality.Semantic.DeepEqual(graph.Status, desiredGraph.Status) {
// If we didn't change anything then don't call updateStatus.
// This is important because the copy we loaded from the informer's
// cache may be stale and we don't want to overwrite a prior update
// to status with this stale state.
} else if err := r.Status().Update(context.TODO(), desiredGraph); err != nil {
r.Log.Error(err, "Failed to update InferenceGraph status", "InferenceGraph", desiredGraph.Name)
r.Recorder.Eventf(desiredGraph, v1.EventTypeWarning, "UpdateFailed",
"Failed to update status for InferenceGraph %q: %v", desiredGraph.Name, err)
return errors.Wrapf(err, "fails to update InferenceGraph status")
} else {
r.Log.Info("updated InferenceGraph status", "InferenceGraph", desiredGraph.Name)
// If there was a difference and there was no error.
isReady := inferenceGraphReadiness(desiredGraph.Status)
if wasReady && !isReady { // Moved to NotReady State
r.Recorder.Eventf(desiredGraph, v1.EventTypeWarning, string(InferenceGraphNotReadyState),
fmt.Sprintf("InferenceGraph [%v] is no longer Ready", desiredGraph.GetName()))
} else if !wasReady && isReady { // Moved to Ready State
r.Recorder.Eventf(desiredGraph, v1.EventTypeNormal, string(InferenceGraphReadyState),
fmt.Sprintf("InferenceGraph [%v] is Ready", desiredGraph.GetName()))
wasReady := inferenceGraphReadiness(graph.Status)
if equality.Semantic.DeepEqual(graph.Status, desiredGraph.Status) {
// If we didn't change anything then don't call updateStatus.
// This is important because the copy we loaded from the informer's
// cache may be stale and we don't want to overwrite a prior update
// to status with this stale state.
} else if err := r.Status().Update(context.TODO(), desiredGraph); err != nil {
if apierr.IsConflict(err) {
return err
}
r.Log.Error(err, "Failed to update InferenceGraph status", "InferenceGraph", desiredGraph.Name)
r.Recorder.Eventf(desiredGraph, v1.EventTypeWarning, "UpdateFailed",
"Failed to update status for InferenceGraph %q: %v", desiredGraph.Name, err)
return errors.Wrapf(err, "fails to update InferenceGraph status")
} else {
r.Log.Info("updated InferenceGraph status", "InferenceGraph", desiredGraph.Name)
// If there was a difference and there was no error.
isReady := inferenceGraphReadiness(desiredGraph.Status)
if wasReady && !isReady { // Moved to NotReady State
r.Recorder.Eventf(desiredGraph, v1.EventTypeWarning, string(InferenceGraphNotReadyState),
fmt.Sprintf("InferenceGraph [%v] is no longer Ready", desiredGraph.GetName()))
} else if !wasReady && isReady { // Moved to Ready State
r.Recorder.Eventf(desiredGraph, v1.EventTypeNormal, string(InferenceGraphReadyState),
fmt.Sprintf("InferenceGraph [%v] is Ready", desiredGraph.GetName()))
}
}
}
return nil
return nil
})
return err
}

func inferenceGraphReadiness(status v1alpha1api.InferenceGraphStatus) bool {
Expand Down
37 changes: 24 additions & 13 deletions pkg/controller/v1alpha1/inferencegraph/knative_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,10 @@ func NewGraphKnativeServiceReconciler(client client.Client,
}
}

func (r *GraphKnativeServiceReconciler) Reconcile() (*knservingv1.ServiceStatus, error) {
desired := r.Service
existing := &knservingv1.Service{}
err := r.client.Get(context.TODO(), types.NamespacedName{Name: desired.Name, Namespace: desired.Namespace}, existing)
if err != nil {
if apierr.IsNotFound(err) {
log.Info("Creating inference graph knative service", "namespace", desired.Namespace, "name", desired.Name)
return &desired.Status, r.client.Create(context.TODO(), desired)
}
return nil, err
}
func reconcileKsvc(desired *knservingv1.Service, existing *knservingv1.Service) error {
// Return if no differences to reconcile.
if semanticEquals(desired, existing) {
return &existing.Status, nil
return nil
}

// Reconcile differences and update
Expand All @@ -86,14 +76,35 @@ func (r *GraphKnativeServiceReconciler) Reconcile() (*knservingv1.ServiceStatus,
}
diff, err := kmp.SafeDiff(desired.Spec.ConfigurationSpec, existing.Spec.ConfigurationSpec)
if err != nil {
return &existing.Status, errors.Wrapf(err, "failed to diff inference graph knative service configuration spec")
return errors.Wrapf(err, "failed to diff inference graph knative service configuration spec")
}
log.Info("inference graph knative service configuration diff (-desired, +observed):", "diff", diff)
existing.Spec.ConfigurationSpec = desired.Spec.ConfigurationSpec
existing.ObjectMeta.Labels = desired.ObjectMeta.Labels
existing.Spec.Traffic = desired.Spec.Traffic
return nil
}

func (r *GraphKnativeServiceReconciler) Reconcile() (*knservingv1.ServiceStatus, error) {
desired := r.Service
existing := &knservingv1.Service{}
err := r.client.Get(context.TODO(), types.NamespacedName{Name: desired.Name, Namespace: desired.Namespace}, existing)
if err != nil {
if apierr.IsNotFound(err) {
log.Info("Creating inference graph knative service", "namespace", desired.Namespace, "name", desired.Name)
return &desired.Status, r.client.Create(context.TODO(), desired)
}
return nil, err
}

err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
log.Info("Updating inference graph knative service", "namespace", desired.Namespace, "name", desired.Name)
if err := r.client.Get(context.TODO(), types.NamespacedName{Name: desired.Name, Namespace: desired.Namespace}, existing); err != nil {
return err
}
if err := reconcileKsvc(desired, existing); err != nil {
return err
}
return r.client.Update(context.TODO(), existing)
})
if err != nil {
Expand Down
61 changes: 34 additions & 27 deletions pkg/controller/v1beta1/inferenceservice/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package inferenceservice
import (
"context"
"fmt"
"k8s.io/client-go/util/retry"
"reflect"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -237,34 +238,40 @@ func (r *InferenceServiceReconciler) Reconcile(ctx context.Context, req ctrl.Req
}

func (r *InferenceServiceReconciler) updateStatus(desiredService *v1beta1api.InferenceService, deploymentMode constants.DeploymentModeType) error {
existingService := &v1beta1api.InferenceService{}
namespacedName := types.NamespacedName{Name: desiredService.Name, Namespace: desiredService.Namespace}
if err := r.Get(context.TODO(), namespacedName, existingService); err != nil {
return err
}
wasReady := inferenceServiceReadiness(existingService.Status)
if inferenceServiceStatusEqual(existingService.Status, desiredService.Status, deploymentMode) {
// If we didn't change anything then don't call updateStatus.
// This is important because the copy we loaded from the informer's
// cache may be stale and we don't want to overwrite a prior update
// to status with this stale state.
} else if err := r.Status().Update(context.TODO(), desiredService); err != nil {
r.Log.Error(err, "Failed to update InferenceService status", "InferenceService", desiredService.Name)
r.Recorder.Eventf(desiredService, v1.EventTypeWarning, "UpdateFailed",
"Failed to update status for InferenceService %q: %v", desiredService.Name, err)
return errors.Wrapf(err, "fails to update InferenceService status")
} else {
// If there was a difference and there was no error.
isReady := inferenceServiceReadiness(desiredService.Status)
if wasReady && !isReady { // Moved to NotReady State
r.Recorder.Eventf(desiredService, v1.EventTypeWarning, string(InferenceServiceNotReadyState),
fmt.Sprintf("InferenceService [%v] is no longer Ready", desiredService.GetName()))
} else if !wasReady && isReady { // Moved to Ready State
r.Recorder.Eventf(desiredService, v1.EventTypeNormal, string(InferenceServiceReadyState),
fmt.Sprintf("InferenceService [%v] is Ready", desiredService.GetName()))
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
existingService := &v1beta1api.InferenceService{}
namespacedName := types.NamespacedName{Name: desiredService.Name, Namespace: desiredService.Namespace}
if err := r.Get(context.TODO(), namespacedName, existingService); err != nil {
return err
}
}
return nil
wasReady := inferenceServiceReadiness(existingService.Status)
if inferenceServiceStatusEqual(existingService.Status, desiredService.Status, deploymentMode) {
// If we didn't change anything then don't call updateStatus.
// This is important because the copy we loaded from the informer's
// cache may be stale and we don't want to overwrite a prior update
// to status with this stale state.
} else if err := r.Status().Update(context.TODO(), desiredService); err != nil {
if apierr.IsConflict(err) {
return err
}
r.Log.Error(err, "Failed to update InferenceService status", "InferenceService", desiredService.Name)
r.Recorder.Eventf(desiredService, v1.EventTypeWarning, "UpdateFailed",
"Failed to update status for InferenceService %q: %v", desiredService.Name, err)
return errors.Wrapf(err, "fails to update InferenceService status")
} else {
// If there was a difference and there was no error.
isReady := inferenceServiceReadiness(desiredService.Status)
if wasReady && !isReady { // Moved to NotReady State
r.Recorder.Eventf(desiredService, v1.EventTypeWarning, string(InferenceServiceNotReadyState),
fmt.Sprintf("InferenceService [%v] is no longer Ready", desiredService.GetName()))
} else if !wasReady && isReady { // Moved to Ready State
r.Recorder.Eventf(desiredService, v1.EventTypeNormal, string(InferenceServiceReadyState),
fmt.Sprintf("InferenceService [%v] is Ready", desiredService.GetName()))
}
}
return nil
})
return err
}

func inferenceServiceReadiness(status v1beta1api.InferenceServiceStatus) bool {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,21 +183,10 @@ func createKnativeService(componentMeta metav1.ObjectMeta,
return service
}

func (r *KsvcReconciler) Reconcile() (*knservingv1.ServiceStatus, error) {
// Create service if does not exist
desired := r.Service
existing := &knservingv1.Service{}
err := r.client.Get(context.TODO(), types.NamespacedName{Name: desired.Name, Namespace: desired.Namespace}, existing)
if err != nil {
if apierr.IsNotFound(err) {
log.Info("Creating knative service", "namespace", desired.Namespace, "name", desired.Name)
return &desired.Status, r.client.Create(context.TODO(), desired)
}
return nil, err
}
func reconcileKsvc(desired *knservingv1.Service, existing *knservingv1.Service) error {
// Return if no differences to reconcile.
if semanticEquals(desired, existing) {
return &existing.Status, nil
return nil
}

// Reconcile differences and update
Expand All @@ -210,7 +199,7 @@ func (r *KsvcReconciler) Reconcile() (*knservingv1.ServiceStatus, error) {
}
diff, err := kmp.SafeDiff(desired.Spec.ConfigurationSpec, existing.Spec.ConfigurationSpec)
if err != nil {
return &existing.Status, errors.Wrapf(err, "failed to diff knative service configuration spec")
return errors.Wrapf(err, "failed to diff knative service configuration spec")
}
log.Info("knative service configuration diff (-desired, +observed):", "diff", diff)
existing.Spec.ConfigurationSpec = desired.Spec.ConfigurationSpec
Expand All @@ -223,8 +212,30 @@ func (r *KsvcReconciler) Reconcile() (*knservingv1.ServiceStatus, error) {
delete(existing.ObjectMeta.Annotations, ksvcAnnotationKey)
}
}
return nil
}

func (r *KsvcReconciler) Reconcile() (*knservingv1.ServiceStatus, error) {
// Create service if does not exist
desired := r.Service
existing := &knservingv1.Service{}
err := r.client.Get(context.TODO(), types.NamespacedName{Name: desired.Name, Namespace: desired.Namespace}, existing)
if err != nil {
if apierr.IsNotFound(err) {
log.Info("Creating knative service", "namespace", desired.Namespace, "name", desired.Name)
return &desired.Status, r.client.Create(context.TODO(), desired)
}
return nil, err
}

err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
log.Info("Updating knative service", "namespace", desired.Namespace, "name", desired.Name)
if err := r.client.Get(context.TODO(), types.NamespacedName{Name: desired.Name, Namespace: desired.Namespace}, existing); err != nil {
return err
}
if err := reconcileKsvc(desired, existing); err != nil {
return err
}
return r.client.Update(context.TODO(), existing)
})
if err != nil {
Expand Down

0 comments on commit a294e6e

Please sign in to comment.