Skip to content

Commit

Permalink
controller: ensure object in cache before requeue
Browse files Browse the repository at this point in the history
Signed-off-by: Hidde Beydals <hidde@hhh.computer>
  • Loading branch information
hiddeco committed Dec 1, 2023
1 parent 3eb2457 commit 688b636
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 1 deletion.
28 changes: 28 additions & 0 deletions internal/controller/helmrelease_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ import (

"github.com/hashicorp/go-retryablehttp"
corev1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
apierrutil "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/client-go/rest"
kuberecorder "k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -182,6 +184,14 @@ func (r *HelmReleaseReconciler) Reconcile(ctx context.Context, req ctrl.Request)
retErr = apierrutil.Reduce(apierrutil.NewAggregate([]error{retErr, err}))
}

// Wait for the object to have synced in-cache after patching.
// This is required to ensure that the next reconciliation will
// operate on the patched object when an immediate reconcile is
// requested.
if err := wait.PollUntilContextTimeout(context.Background(), 1*time.Second, 10*time.Second, true, r.waitForHistoryCacheSync(obj)); err != nil {
log.Error(err, "failed to wait for object to sync in-cache after patching")
}

// Always record suspend, readiness and duration metrics.
r.Metrics.RecordSuspend(ctx, obj, obj.Spec.Suspend)
r.Metrics.RecordReadiness(ctx, obj)
Expand Down Expand Up @@ -641,6 +651,24 @@ func (r *HelmReleaseReconciler) getHelmChart(ctx context.Context, obj *v2.HelmRe
return &hc, nil
}

// waitForHistoryCacheSync returns a function that can be used to wait for the
// cache backing the Kubernetes client to be in sync with the current state of
// the v2beta2.HelmRelease.
// This is a trade-off between not caching at all, and introducing a slight
// delay to ensure we always have the latest history state.
func (r *HelmReleaseReconciler) waitForHistoryCacheSync(obj *v2.HelmRelease) wait.ConditionWithContextFunc {
newObj := &v2.HelmRelease{}
return func(ctx context.Context) (bool, error) {
if err := r.Get(ctx, client.ObjectKeyFromObject(obj), newObj); err != nil {
if apierrors.IsNotFound(err) {
return true, nil
}
return false, err
}
return apiequality.Semantic.DeepEqual(obj.Status, newObj.Status), nil
}
}

func (r *HelmReleaseReconciler) requestsForHelmChartChange(ctx context.Context, o client.Object) []reconcile.Request {
hc, ok := o.(*sourcev1.HelmChart)
if !ok {
Expand Down
110 changes: 110 additions & 0 deletions internal/controller/helmrelease_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1931,6 +1931,116 @@ func TestHelmReleaseReconciler_getHelmChart(t *testing.T) {
}
}

func Test_waitForHistoryCacheSync(t *testing.T) {
tests := []struct {
name string
rel *v2.HelmRelease
cacheRel *v2.HelmRelease
want bool
}{
{
name: "different history",
rel: &v2.HelmRelease{
ObjectMeta: metav1.ObjectMeta{
Name: "some-name",
},
Status: v2.HelmReleaseStatus{
History: v2.Snapshots{
{
Version: 2,
Status: "deployed",
},
{
Version: 1,
Status: "failed",
},
},
},
},
cacheRel: &v2.HelmRelease{
ObjectMeta: metav1.ObjectMeta{
Name: "some-name",
},
Status: v2.HelmReleaseStatus{
History: v2.Snapshots{
{
Version: 1,
Status: "deployed",
},
},
},
},
want: false,
},
{
name: "same history",
rel: &v2.HelmRelease{
ObjectMeta: metav1.ObjectMeta{
Name: "some-name",
},
Status: v2.HelmReleaseStatus{
History: v2.Snapshots{
{
Version: 2,
Status: "deployed",
},
{
Version: 1,
Status: "failed",
},
},
},
},
cacheRel: &v2.HelmRelease{
ObjectMeta: metav1.ObjectMeta{
Name: "some-name",
},
Status: v2.HelmReleaseStatus{
History: v2.Snapshots{
{
Version: 2,
Status: "deployed",
},
{
Version: 1,
Status: "failed",
},
},
},
},
want: true,
},
{
name: "does not exist",
rel: &v2.HelmRelease{
ObjectMeta: metav1.ObjectMeta{
Name: "some-name",
},
},
cacheRel: nil,
want: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)

c := fake.NewClientBuilder()
c.WithScheme(NewTestScheme())
if tt.cacheRel != nil {
c.WithObjects(tt.cacheRel)
}
r := &HelmReleaseReconciler{
Client: c.Build(),
}

got, err := r.waitForHistoryCacheSync(tt.rel)(context.Background())
g.Expect(err).ToNot(HaveOccurred())
g.Expect(got == tt.want).To(BeTrue())
})
}
}

func TestValuesReferenceValidation(t *testing.T) {
tests := []struct {
name string
Expand Down
2 changes: 1 addition & 1 deletion internal/reconcile/atomic_release.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (r *AtomicRelease) Reconcile(ctx context.Context, req *Request) error {
// last observation before returning. If the patch fails, we
// log the error and return the original context cancellation
// error.
if err := r.patchHelper.Patch(ctx, req.Object); err != nil {
if err := r.patchHelper.Patch(ctx, req.Object, patch.WithOwnedConditions{Conditions: OwnedConditions}, patch.WithFieldOwner(r.fieldManager)); err != nil {
log.Error(err, "failed to patch HelmRelease after context cancellation")
}
cancel()
Expand Down

0 comments on commit 688b636

Please sign in to comment.