Skip to content

Commit

Permalink
Timeout (#136)
Browse files Browse the repository at this point in the history
* add processing timeout

* tweak backoff

* cleanup tests

* add docs
  • Loading branch information
cbarbian-sap committed Aug 27, 2024
1 parent 5cf9db1 commit 2347b2f
Show file tree
Hide file tree
Showing 11 changed files with 304 additions and 166 deletions.
15 changes: 9 additions & 6 deletions internal/backoff/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@ type Backoff struct {
func NewBackoff(maxDelay time.Duration) *Backoff {
return &Backoff{
activities: make(map[any]any),
// resulting per-item backoff is the maximum of a 300-times-20ms-then-maxDelay per-item limiter,
// and an overall 10-per-second-burst-20 bucket limiter;
// as a consequence, we have up to 20 almost immediate retries, then a phase of 10 retries per seconnd
// for approximately 30s, and then slow retries at the rate given by maxDelay
// resulting per-item backoff is the maximum of a 200-times-50ms-then-maxDelay per-item limiter,
// and an overall 5-per-second-burst-20 bucket limiter;
// as a consequence, we have up to
// - up to 20 almost immediate retries
// - then then a phase of 5 guaranteed retries per seconnd (could be more if burst capacity is refilled
// because of the duration of the reconcile logic execution itself)
// - finally (after 200 iterations) slow retries at the rate given by maxDelay
limiter: workqueue.NewMaxOfRateLimiter(
workqueue.NewItemFastSlowRateLimiter(20*time.Millisecond, maxDelay, 300),
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 20)},
workqueue.NewItemFastSlowRateLimiter(50*time.Millisecond, maxDelay, 200),
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(5), 20)},
),
}
}
Expand Down
78 changes: 51 additions & 27 deletions pkg/component/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import (
"reflect"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/sap/component-operator-runtime/internal/walk"
)

Expand Down Expand Up @@ -88,7 +86,18 @@ func assertRetryConfiguration[T Component](component T) (RetryConfiguration, boo
return nil, false
}

// Calculate digest of given component, honoring annotations, spec, and references
// Check if given component or its spec implements TimeoutConfiguration (and return it).
func assertTimeoutConfiguration[T Component](component T) (TimeoutConfiguration, bool) {
if timeoutConfiguration, ok := Component(component).(TimeoutConfiguration); ok {
return timeoutConfiguration, true
}
if timeoutConfiguration, ok := getSpec(component).(TimeoutConfiguration); ok {
return timeoutConfiguration, true
}
return nil, false
}

// Calculate digest of given component, honoring annotations, spec, and references.
func calculateComponentDigest[T Component](component T) string {
digestData := make(map[string]any)
spec := getSpec(component)
Expand Down Expand Up @@ -120,8 +129,7 @@ func calculateComponentDigest[T Component](component T) string {
// note: this panic is ok because walk.Walk() only produces errors if the given walker function raises any (which ours here does not do)
panic("this cannot happen")
}
// note: this must() is ok because digestData should contain only serializable stuff
return sha256hex(must(json.Marshal(digestData)))
return calculateDigest(digestData)
}

// Implement the PlacementConfiguration interface.
Expand Down Expand Up @@ -178,46 +186,62 @@ func (s *Status) IsReady() bool {
return s.State == StateReady
}

// Get state (and related details).
func (s *Status) GetState() (State, string, string) {
var cond *Condition
// Implement the TimeoutConfiguration interface.
func (s *TimeoutSpec) GetTimeout() time.Duration {
if s.Timeout != nil {
return s.Timeout.Duration
}
return time.Duration(0)
}

// Get condition (and return nil if not existing).
// Caveat: the returned pointer might become invalid if further appends happen to the Conditions slice in the status object.
func (s *Status) getCondition(condType ConditionType) *Condition {
for i := 0; i < len(s.Conditions); i++ {
if s.Conditions[i].Type == ConditionTypeReady {
cond = &s.Conditions[i]
break
if s.Conditions[i].Type == condType {
return &s.Conditions[i]
}
}
if cond == nil {
return s.State, "", ""
}
return s.State, cond.Reason, cond.Message
return nil
}

// Set state and ready condition in status (according to the state value provided),
func (s *Status) SetState(state State, reason string, message string) {
// Get condition (adding it with initial values if not existing).
// Caveat: the returned pointer might become invalid if further appends happen to the Conditions slice in the status object.
func (s *Status) getOrAddCondition(condType ConditionType) *Condition {
var cond *Condition
for i := 0; i < len(s.Conditions); i++ {
if s.Conditions[i].Type == ConditionTypeReady {
if s.Conditions[i].Type == condType {
cond = &s.Conditions[i]
break
}
}
if cond == nil {
s.Conditions = append(s.Conditions, Condition{Type: ConditionTypeReady})
s.Conditions = append(s.Conditions, Condition{Type: condType, Status: ConditionUnknown})
cond = &s.Conditions[len(s.Conditions)-1]
}
var status ConditionStatus
return cond
}

// Get state (and related details).
func (s *Status) GetState() (State, string, string) {
cond := s.getCondition(ConditionTypeReady)
if cond == nil {
return s.State, "", ""
}
return s.State, cond.Reason, cond.Message
}

// Set state and ready condition in status (according to the state value provided).
// Note: this method does not touch the condition's LastTransitionTime.
func (s *Status) SetState(state State, reason string, message string) {
cond := s.getOrAddCondition(ConditionTypeReady)
switch state {
case StateReady:
status = ConditionTrue
cond.Status = ConditionTrue
case StateError:
status = ConditionFalse
cond.Status = ConditionFalse
default:
status = ConditionUnknown
}
if status != cond.Status {
cond.Status = status
cond.LastTransitionTime = ref(metav1.Now())
cond.Status = ConditionUnknown
}
cond.Reason = reason
cond.Message = message
Expand Down
78 changes: 61 additions & 17 deletions pkg/component/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ const (
readyConditionReasonProcessing = "Processing"
readyConditionReasonReady = "Ready"
readyConditionReasonError = "Error"
readyConditionReasonTimeout = "Timeout"
readyConditionReasonDeletionPending = "DeletionPending"
readyConditionReasonDeletionBlocked = "DeletionBlocked"
readyConditionReasonDeletionProcessing = "DeletionProcessing"
Expand Down Expand Up @@ -169,11 +170,7 @@ func (r *Reconciler[T]) Reconcile(ctx context.Context, req ctrl.Request) (result
}
component.GetObjectKind().SetGroupVersionKind(r.groupVersionKind)

// convenience accessors
status := component.GetStatus()
savedStatus := status.DeepCopy()

// requeue/retry interval
// fetch requeue interval, retry interval and timeout
requeueInterval := time.Duration(0)
if requeueConfiguration, ok := assertRequeueConfiguration(component); ok {
requeueInterval = requeueConfiguration.GetRequeueInterval()
Expand All @@ -188,6 +185,17 @@ func (r *Reconciler[T]) Reconcile(ctx context.Context, req ctrl.Request) (result
if retryInterval == 0 {
retryInterval = requeueInterval
}
timeout := time.Duration(0)
if timeoutConfiguration, ok := assertTimeoutConfiguration(component); ok {
timeout = timeoutConfiguration.GetTimeout()
}
if timeout == 0 {
timeout = requeueInterval
}

// convenience accessors
status := component.GetStatus()
savedStatus := status.DeepCopy()

// always attempt to update the status
skipStatusUpdate := false
Expand All @@ -197,11 +205,27 @@ func (r *Reconciler[T]) Reconcile(ctx context.Context, req ctrl.Request) (result
// re-panic in order skip the remaining steps
panic(r)
}

status.ObservedGeneration = component.GetGeneration()

if status.State == StateReady || err != nil {
// clear backoff if state is ready (obviously) or if there is an error;
// even is the error is a RetriableError which will be turned into a non-error;
// this is correct, because in that case, the RequeueAfter will be determined through the RetriableError
r.backoff.Forget(req)
}
status.ObservedGeneration = component.GetGeneration()
if status.State != StateProcessing || err != nil {
// clear ProcessingDigest and ProcessingSince in all non-error cases where state is StateProcessing
status.ProcessingDigest = ""
status.ProcessingSince = nil
}
if status.State == StateProcessing && now.Sub(status.ProcessingSince.Time) >= timeout {
// TODO: maybe it would be better to have a dedicated StateTimeout?
status.SetState(StateError, readyConditionReasonTimeout, "Reconcilation of dependent resources timed out")
}

if err != nil {
// convert retriable errors into non-errors (Pending or DeletionPending state), and return specified or default backoff
retriableError := &types.RetriableError{}
if errors.As(err, retriableError) {
retryAfter := retriableError.RetryAfter()
Expand All @@ -220,10 +244,12 @@ func (r *Reconciler[T]) Reconcile(ctx context.Context, req ctrl.Request) (result
status.SetState(StateError, readyConditionReasonError, err.Error())
}
}

if result.RequeueAfter > 0 {
// add jitter of 1-5 percent to RequeueAfter
addJitter(&result.RequeueAfter, 1, 5)
}

log.V(1).Info("reconcile done", "withError", err != nil, "requeue", result.Requeue || result.RequeueAfter > 0, "requeueAfter", result.RequeueAfter.String())
if err != nil {
if status, ok := err.(apierrors.APIStatus); ok || errors.As(err, &status) {
Expand All @@ -232,22 +258,34 @@ func (r *Reconciler[T]) Reconcile(ctx context.Context, req ctrl.Request) (result
metrics.ReconcileErrors.WithLabelValues(r.controllerName, "other").Inc()
}
}
// TODO: should we move this behind the DeepEqual check below?
// note: it seems that no events will be written if the component's namespace is in deletion

// TODO: should we move this behind the DeepEqual check below to avoid noise?
// also note: it seems that no events will be written if the component's namespace is in deletion
state, reason, message := status.GetState()
if state == StateError {
r.client.EventRecorder().Event(component, corev1.EventTypeWarning, reason, message)
} else {
r.client.EventRecorder().Event(component, corev1.EventTypeNormal, reason, message)
}

if skipStatusUpdate {
return
}
if reflect.DeepEqual(status, savedStatus) {
return
}
// note: it's crucial to set the following timestamp late (otherwise the DeepEqual() check before would always be false)

// note: it's crucial to set the following timestamps late (otherwise the DeepEqual() check above would always be false)
// on the other hand it's a bit weird, because LastObservedAt will not be updated if no other changes have happened to the status;
// and same for the conditions' LastTransitionTime timestamps;
// maybe we should remove this optimization, and always do the Update() call
status.LastObservedAt = &now
for i := 0; i < len(status.Conditions); i++ {
cond := &status.Conditions[i]
if savedCond := savedStatus.getCondition(cond.Type); savedCond == nil || cond.Status != savedCond.Status {
cond.LastTransitionTime = &now
}
}
if updateErr := r.client.Status().Update(ctx, component, client.FieldOwner(r.name)); updateErr != nil {
err = utilerrors.NewAggregate([]error{err, updateErr})
result = ctrl.Result{}
Expand All @@ -256,7 +294,7 @@ func (r *Reconciler[T]) Reconcile(ctx context.Context, req ctrl.Request) (result

// set a first status (and requeue, because the status update itself will not trigger another reconciliation because of the event filter set)
if status.ObservedGeneration <= 0 {
status.SetState(StateProcessing, readyConditionReasonNew, "First seen")
status.SetState(StatePending, readyConditionReasonNew, "First seen")
return ctrl.Result{Requeue: true}, nil
}

Expand Down Expand Up @@ -301,7 +339,8 @@ func (r *Reconciler[T]) Reconcile(ctx context.Context, req ctrl.Request) (result
return ctrl.Result{}, errors.Wrap(err, "error adding finalizer")
}
// trigger another round trip
// this is necessary because the update call invalidates potential changes done by the post-read hook above
// this is necessary because the update call invalidates potential changes done to the component by the post-read
// hook above; this means, not to the object itself, but for example to loaded secrets or config maps;
// in the following round trip, the finalizer will already be there, and the update will not happen again
return ctrl.Result{Requeue: true}, nil
}
Expand All @@ -312,7 +351,7 @@ func (r *Reconciler[T]) Reconcile(ctx context.Context, req ctrl.Request) (result
return ctrl.Result{}, errors.Wrapf(err, "error running pre-reconcile hook (%d)", hookOrder)
}
}
ok, err := target.Apply(ctx, component)
ok, digest, err := target.Apply(ctx, component)
if err != nil {
log.V(1).Info("error while reconciling dependent resources")
return ctrl.Result{}, errors.Wrap(err, "error reconciling dependent resources")
Expand All @@ -324,16 +363,21 @@ func (r *Reconciler[T]) Reconcile(ctx context.Context, req ctrl.Request) (result
}
}
log.V(1).Info("all dependent resources successfully reconciled")
status.SetState(StateReady, readyConditionReasonReady, "Dependent resources successfully reconciled")
status.AppliedGeneration = component.GetGeneration()
status.LastAppliedAt = &now
status.SetState(StateReady, readyConditionReasonReady, "Dependent resources successfully reconciled")
return ctrl.Result{RequeueAfter: requeueInterval}, nil
} else {
log.V(1).Info("not all dependent resources successfully reconciled")
status.SetState(StateProcessing, readyConditionReasonProcessing, "Reconcilation of dependent resources triggered; waiting until all dependent resources are ready")
if digest != status.ProcessingDigest {
status.ProcessingDigest = digest
status.ProcessingSince = &now
r.backoff.Forget(req)
}
if !reflect.DeepEqual(status.Inventory, savedStatus.Inventory) {
r.backoff.Forget(req)
}
status.SetState(StateProcessing, readyConditionReasonProcessing, "Reconcilation of dependent resources triggered; waiting until all dependent resources are ready")
return ctrl.Result{RequeueAfter: r.backoff.Next(req, readyConditionReasonProcessing)}, nil
}
} else {
Expand All @@ -352,16 +396,16 @@ func (r *Reconciler[T]) Reconcile(ctx context.Context, req ctrl.Request) (result
log.V(1).Info("deletion not allowed")
// TODO: have an additional StateDeletionBlocked?
// TODO: eliminate this msg logic
status.SetState(StateDeleting, readyConditionReasonDeletionBlocked, "Deletion blocked: "+msg)
r.client.EventRecorder().Event(component, corev1.EventTypeNormal, readyConditionReasonDeletionBlocked, "Deletion blocked: "+msg)
status.SetState(StateDeleting, readyConditionReasonDeletionBlocked, "Deletion blocked: "+msg)
return ctrl.Result{RequeueAfter: 1*time.Second + r.backoff.Next(req, readyConditionReasonDeletionBlocked)}, nil
}
if len(slices.Remove(component.GetFinalizers(), r.name)) > 0 {
// deletion is blocked because of foreign finalizers
log.V(1).Info("deleted blocked due to existence of foreign finalizers")
// TODO: have an additional StateDeletionBlocked?
status.SetState(StateDeleting, readyConditionReasonDeletionBlocked, "Deletion blocked due to existing foreign finalizers")
r.client.EventRecorder().Event(component, corev1.EventTypeNormal, readyConditionReasonDeletionBlocked, "Deletion blocked due to existing foreign finalizers")
status.SetState(StateDeleting, readyConditionReasonDeletionBlocked, "Deletion blocked due to existing foreign finalizers")
return ctrl.Result{RequeueAfter: 1*time.Second + r.backoff.Next(req, readyConditionReasonDeletionBlocked)}, nil
}
// deletion case
Expand Down Expand Up @@ -392,10 +436,10 @@ func (r *Reconciler[T]) Reconcile(ctx context.Context, req ctrl.Request) (result
} else {
// deletion triggered for dependent resources, but some are not yet gone
log.V(1).Info("not all dependent resources are successfully deleted")
status.SetState(StateDeleting, readyConditionReasonDeletionProcessing, "Deletion of dependent resources triggered; waiting until dependent resources are deleted")
if !reflect.DeepEqual(status.Inventory, savedStatus.Inventory) {
r.backoff.Forget(req)
}
status.SetState(StateDeleting, readyConditionReasonDeletionProcessing, "Deletion of dependent resources triggered; waiting until dependent resources are deleted")
return ctrl.Result{RequeueAfter: r.backoff.Next(req, readyConditionReasonDeletionProcessing)}, nil
}
}
Expand Down
7 changes: 2 additions & 5 deletions pkg/component/reference.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package component

import (
"context"
"encoding/json"
"fmt"
"reflect"
"strings"
Expand Down Expand Up @@ -69,8 +68,7 @@ func (r *ConfigMapReference) digest() string {
if !r.loaded {
return ""
}
// note: this must() is ok because marshalling map[string]string should always work
return sha256hex(must(json.Marshal(r.data)))
return calculateDigest(r.data)
}

// Return the previously loaded configmap data.
Expand Down Expand Up @@ -176,8 +174,7 @@ func (r *SecretReference) digest() string {
if !r.loaded {
return ""
}
// note: this must() is ok because marshalling map[string][]byte should always work
return sha256hex(must(json.Marshal(r.data)))
return calculateDigest(r.data)
}

// Return the previously loaded secret data.
Expand Down
Loading

0 comments on commit 2347b2f

Please sign in to comment.