Skip to content

Commit

Permalink
fix: remove ReplicaSet write-back (#4044)
Browse files Browse the repository at this point in the history
* fix: keep informer updated

Signed-off-by: Zach Aller <zachaller@users.noreply.github.com>

* fix: move inside if

Signed-off-by: Zach Aller <zachaller@users.noreply.github.com>

* fix: cleanup logic

Signed-off-by: Zach Aller <zachaller@users.noreply.github.com>

* fix: don't write back to informer at all

Signed-off-by: Zach Aller <zachaller@users.noreply.github.com>

* fix: fix tests

Signed-off-by: Zach Aller <zachaller@users.noreply.github.com>

* fix: remove informer add

Signed-off-by: Zach Aller <zachaller@users.noreply.github.com>

* fix: typo

Signed-off-by: Zach Aller <zachaller@users.noreply.github.com>

* fix: fix lint

Signed-off-by: Zach Aller <zachaller@users.noreply.github.com>

* fix: cleanup

Signed-off-by: Zach Aller <zachaller@users.noreply.github.com>

* fix: cleanup

Signed-off-by: Zach Aller <zachaller@users.noreply.github.com>

* fix: update comments

Signed-off-by: Zach Aller <zachaller@users.noreply.github.com>

---------

Signed-off-by: Zach Aller <zachaller@users.noreply.github.com>
  • Loading branch information
zachaller committed Jan 16, 2025
1 parent 876bec3 commit 42215e6
Show file tree
Hide file tree
Showing 7 changed files with 14 additions and 221 deletions.
109 changes: 0 additions & 109 deletions rollout/canary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,10 @@ package rollout
import (
"encoding/json"
"fmt"
"os"
"strconv"
"testing"
"time"

"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
k8stesting "k8s.io/client-go/testing"

"github.com/stretchr/testify/assert"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -2117,106 +2111,3 @@ func TestIsDynamicallyRollingBackToStable(t *testing.T) {
})
}
}

func TestSyncRolloutWithConflictInScaleReplicaSet(t *testing.T) {
os.Setenv("ARGO_ROLLOUTS_LOG_RS_DIFF_CONFLICT", "true")
defer os.Unsetenv("ARGO_ROLLOUTS_LOG_RS_DIFF_CONFLICT")

f := newFixture(t)
defer f.Close()

steps := []v1alpha1.CanaryStep{
{
SetWeight: int32Ptr(10),
}, {
Pause: &v1alpha1.RolloutPause{
Duration: v1alpha1.DurationFromInt(10),
},
},
}
r1 := newCanaryRollout("foo", 10, nil, steps, int32Ptr(1), intstr.FromInt(1), intstr.FromInt(0))
r1.Spec.Template.Labels["rollout.argoproj.io/foo"] = "bar"

rs1 := newReplicaSetWithStatus(r1, 10, 10)
r1.Spec.Replicas = pointer.Int32(2)
f.kubeobjects = append(f.kubeobjects, rs1)
f.replicaSetLister = append(f.replicaSetLister, rs1)

f.rolloutLister = append(f.rolloutLister, r1)
f.objects = append(f.objects, r1)

f.expectPatchRolloutAction(r1)
f.expectUpdateReplicaSetAction(rs1) // attempt to scale replicaset but conflict
patchIndex := f.expectPatchReplicaSetAction(rs1) // instead of update patch replicaset

key := fmt.Sprintf("%s/%s", r1.Namespace, r1.Name)
c, i, k8sI := f.newController(func() time.Duration { return 30 * time.Minute })

f.kubeclient.PrependReactor("update", "replicasets", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
return true, action.(k8stesting.UpdateAction).GetObject(), errors.NewConflict(schema.GroupResource{
Group: "Apps",
Resource: "ReplicaSet",
}, action.(k8stesting.UpdateAction).GetObject().(*appsv1.ReplicaSet).Name, fmt.Errorf("test error"))
})

f.runController(key, true, false, c, i, k8sI)

updatedRs := f.getPatchedReplicaSet(patchIndex) // minus one because update did not happen because conflict
assert.Equal(t, int32(2), *updatedRs.Spec.Replicas)
}

func TestSyncRolloutWithConflictInSyncReplicaSetRevision(t *testing.T) {
os.Setenv("ARGO_ROLLOUTS_LOG_RS_DIFF_CONFLICT", "true")
defer os.Unsetenv("ARGO_ROLLOUTS_LOG_RS_DIFF_CONFLICT")

f := newFixture(t)
defer f.Close()

steps := []v1alpha1.CanaryStep{
{
SetWeight: int32Ptr(10),
}, {
Pause: &v1alpha1.RolloutPause{
Duration: v1alpha1.DurationFromInt(10),
},
},
}
r1 := newCanaryRollout("foo", 3, nil, steps, int32Ptr(1), intstr.FromInt(1), intstr.FromInt(0))
r2 := bumpVersion(r1)

rs1 := newReplicaSetWithStatus(r1, 3, 3)
rs2 := newReplicaSetWithStatus(r2, 3, 3)
rs2.Annotations["rollout.argoproj.io/revision"] = "1"

f.kubeobjects = append(f.kubeobjects, rs1, rs2)
f.replicaSetLister = append(f.replicaSetLister, rs1, rs2)

f.rolloutLister = append(f.rolloutLister, r2)
f.objects = append(f.objects, r2)

key := fmt.Sprintf("%s/%s", r1.Namespace, r1.Name)
c, i, k8sI := f.newController(func() time.Duration { return 30 * time.Minute })

f.kubeclient.PrependReactor("update", "replicasets", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
return true, &appsv1.ReplicaSet{}, errors.NewConflict(schema.GroupResource{
Group: "Apps",
Resource: "ReplicaSet",
}, action.(k8stesting.UpdateAction).GetObject().(*appsv1.ReplicaSet).Name, fmt.Errorf("test error"))
})

f.expectPatchRolloutAction(r2)
f.expectUpdateReplicaSetAction(rs1) // attempt to update replicaset revision but conflict
patchIndex1 := f.expectPatchReplicaSetAction(rs1) // instead of update patch replicaset

f.expectUpdateReplicaSetAction(rs2) // attempt to scale replicaset but conflict
patchIndex2 := f.expectPatchReplicaSetAction(rs2) // instead of update patch replicaset

f.runController(key, true, false, c, i, k8sI)

updatedRs1 := f.getPatchedReplicaSet(patchIndex1)
assert.Equal(t, "2", updatedRs1.Annotations["rollout.argoproj.io/revision"])
assert.Equal(t, int32(3), *updatedRs1.Spec.Replicas)

updatedRs2 := f.getPatchedReplicaSet(patchIndex2)
assert.Equal(t, int32(0), *updatedRs2.Spec.Replicas)
}
100 changes: 6 additions & 94 deletions rollout/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,23 @@ import (
"context"
"encoding/json"
"fmt"
"os"
"reflect"
"strconv"
"strings"
"sync"
"time"

"github.com/argoproj/argo-rollouts/utils/annotations"

"github.com/argoproj/argo-rollouts/utils/diff"
"k8s.io/apimachinery/pkg/runtime/schema"

"github.com/argoproj/argo-rollouts/pkg/apis/rollouts"
smiclientset "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/split/clientset/versioned"
log "github.com/sirupsen/logrus"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
patchtypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
Expand Down Expand Up @@ -560,10 +553,6 @@ func (c *Controller) newRolloutContext(rollout *v1alpha1.Rollout) (*rolloutConte
roCtx.stableRS = replicasetutil.GetStableRS(roCtx.rollout, roCtx.newRS, roCtx.olderRSs)
roCtx.otherRSs = replicasetutil.GetOtherRSs(roCtx.rollout, roCtx.newRS, roCtx.stableRS, rsList)
roCtx.allRSs = append(rsList, roCtx.newRS)
err := roCtx.replicaSetInformer.GetIndexer().Add(roCtx.newRS)
if err != nil {
return nil, err
}
}

if rolloututil.IsFullyPromoted(rollout) && roCtx.pauseContext.IsAborted() {
Expand Down Expand Up @@ -982,91 +971,14 @@ func remarshalRollout(r *v1alpha1.Rollout) *v1alpha1.Rollout {
return &remarshalled
}

// updateReplicaSetWithPatch updates the replicaset using Update and on failure falls back to a patch this function only exists to make sure we always can update
// replicasets and to not get into an conflict loop updating replicasets. We should really look into a complete refactor of how rollouts handles replicasets such
// that we do not keep a fully replicaset on the rollout context under newRS and instead switch to a patch only based approach.
func (c *rolloutContext) updateReplicaSetFallbackToPatch(ctx context.Context, rs *appsv1.ReplicaSet) (*appsv1.ReplicaSet, error) {
// updateReplicaSet updates the replicaset using kubeclient update. It returns the updated replicaset and copies the updated replicaset
// into the passed in pointer as well.
func (c *rolloutContext) updateReplicaSet(ctx context.Context, rs *appsv1.ReplicaSet) (*appsv1.ReplicaSet, error) {
updatedRS, err := c.kubeclientset.AppsV1().ReplicaSets(rs.Namespace).Update(ctx, rs, metav1.UpdateOptions{})
if err != nil {
if errors.IsConflict(err) {
if os.Getenv("ARGO_ROLLOUTS_LOG_RS_DIFF_CONFLICT") == "true" {
rsGet, err := c.replicaSetLister.ReplicaSets(rs.Namespace).Get(rs.Name)
if err != nil {
return nil, fmt.Errorf("error getting replicaset in updateReplicaSetFallbackToPatch %s: %w", rs.Name, err)
}
rsGetJson, err := json.Marshal(rsGet)
if err != nil {
return nil, fmt.Errorf("error marshalling informer replicaset in updateReplicaSetFallbackToPatch %s: %w", rs.Name, err)
}
rsCopyJson, err := json.Marshal(rs)
if err != nil {
return nil, fmt.Errorf("error marshalling memory replicaset in updateReplicaSetFallbackToPatch %s: %w", rs.Name, err)
}
c.log.Infof("Informer RS: %s", rsGetJson)
c.log.Infof("Memory RS: %s", rsCopyJson)
}

c.log.Infof("Conflict when updating replicaset %s, falling back to patch", rs.Name)

patchRS := appsv1.ReplicaSet{}
patchRS.Spec.Replicas = rs.Spec.Replicas
patchRS.Spec.Template.Labels = rs.Spec.Template.Labels
patchRS.Spec.Template.Annotations = rs.Spec.Template.Annotations

patchRS.Annotations = make(map[string]string)
patchRS.Labels = make(map[string]string)
patchRS.Spec.Selector = &metav1.LabelSelector{
MatchLabels: make(map[string]string),
}

if _, found := rs.Labels[v1alpha1.DefaultRolloutUniqueLabelKey]; found {
patchRS.Labels[v1alpha1.DefaultRolloutUniqueLabelKey] = rs.Labels[v1alpha1.DefaultRolloutUniqueLabelKey]
}

if _, found := rs.Annotations[v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey]; found {
patchRS.Annotations[v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey] = rs.Annotations[v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey]
}

if _, found := rs.Spec.Selector.MatchLabels[v1alpha1.DefaultRolloutUniqueLabelKey]; found {
patchRS.Spec.Selector.MatchLabels[v1alpha1.DefaultRolloutUniqueLabelKey] = rs.Spec.Selector.MatchLabels[v1alpha1.DefaultRolloutUniqueLabelKey]
}

for key, value := range rs.Annotations {
if strings.HasPrefix(key, annotations.RolloutLabel) ||
strings.HasPrefix(key, "argo-rollouts.argoproj.io") ||
strings.HasPrefix(key, "experiment.argoproj.io") {
patchRS.Annotations[key] = value
}
}
for key, value := range rs.Labels {
if strings.HasPrefix(key, annotations.RolloutLabel) ||
strings.HasPrefix(key, "argo-rollouts.argoproj.io") ||
strings.HasPrefix(key, "experiment.argoproj.io") {
patchRS.Labels[key] = value
}
}

patch, _, err := diff.CreateTwoWayMergePatch(appsv1.ReplicaSet{}, patchRS, appsv1.ReplicaSet{})
if err != nil {
return nil, fmt.Errorf("error creating patch for conflict log in updateReplicaSetFallbackToPatch %s: %w", rs.Name, err)
}

c.log.Infof("Patching replicaset with patch: %s", string(patch))
updatedRS, err = c.kubeclientset.AppsV1().ReplicaSets(rs.Namespace).Patch(ctx, rs.Name, patchtypes.StrategicMergePatchType, patch, metav1.PatchOptions{})
if err != nil {
return nil, fmt.Errorf("error patching replicaset in updateReplicaSetFallbackToPatch %s: %w", rs.Name, err)
}

err = c.replicaSetInformer.GetIndexer().Update(updatedRS)
if err != nil {
return nil, fmt.Errorf("error updating replicaset informer in updateReplicaSetFallbackToPatch %s: %w", rs.Name, err)
}

return updatedRS, err
}
}
if updatedRS != nil {
updatedRS.DeepCopyInto(rs)
return nil, fmt.Errorf("error updating replicaset in updateReplicaSet %s: %w", rs.Name, err)
}
updatedRS.DeepCopyInto(rs)

return rs, err
}
2 changes: 1 addition & 1 deletion rollout/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -960,7 +960,7 @@ func (f *fixture) getUpdatedReplicaSet(index int) *appsv1.ReplicaSet {
return rs
}

func (f *fixture) getPatchedReplicaSet(index int) *appsv1.ReplicaSet {
func (f *fixture) getPatchedReplicaSet(index int) *appsv1.ReplicaSet { //nolint
action := filterInformerActions(f.kubeclient.Actions())[index]
patchAction, ok := action.(core.PatchAction)
if !ok {
Expand Down
2 changes: 1 addition & 1 deletion rollout/ephemeralmetadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (c *rolloutContext) syncEphemeralMetadata(ctx context.Context, rs *appsv1.R
// First update replicasets, then pods owned by it.
// So that any replicas created in the interim between the two steps are using the new updated version.
// 1. Update ReplicaSet so that any new pods it creates will have the metadata
rs, err := c.updateReplicaSetFallbackToPatch(ctx, modifiedRS)
rs, err := c.updateReplicaSet(ctx, modifiedRS)
if err != nil {
c.log.Infof("failed to sync ephemeral metadata %v to ReplicaSet %s: %v", podMetadata, rs.Name, err)
return fmt.Errorf("failed to sync ephemeral metadata: %w", err)
Expand Down
8 changes: 0 additions & 8 deletions rollout/replicaset.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@ func (c *rolloutContext) removeScaleDownDelay(rs *appsv1.ReplicaSet) error {
return fmt.Errorf("error removing scale-down-deadline annotation from RS '%s': %w", rs.Name, err)
}
c.log.Infof("Removed '%s' annotation from RS '%s'", v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey, rs.Name)
err = c.replicaSetInformer.GetIndexer().Update(rs)
if err != nil {
return fmt.Errorf("error updating replicaset informer in removeScaleDownDelay: %w", err)
}
return err
}

Expand All @@ -68,10 +64,6 @@ func (c *rolloutContext) addScaleDownDelay(rs *appsv1.ReplicaSet, scaleDownDelay
return fmt.Errorf("error adding scale-down-deadline annotation to RS '%s': %w", rs.Name, err)
}
c.log.Infof("Set '%s' annotation on '%s' to %s (%s)", v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey, rs.Name, deadline, scaleDownDelaySeconds)
err = c.replicaSetInformer.GetIndexer().Update(rs)
if err != nil {
return fmt.Errorf("error updating replicaset informer in addScaleDownDelay: %w", err)
}
return err
}

Expand Down
6 changes: 2 additions & 4 deletions rollout/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,8 @@ func TestGetPreviewAndActiveServices(t *testing.T) {
noActiveSvcRollout := rollout.DeepCopy()
noActiveSvcRollout.Spec.Strategy.BlueGreen.ActiveService = ""
roCtx, err := c.newRolloutContext(noActiveSvcRollout)
assert.NoError(t, err)
_, _, err = roCtx.getPreviewAndActiveServices()
assert.NotNil(t, err)
assert.EqualError(t, err, "service \"\" not found")
assert.Error(t, err)
assert.Nil(t, roCtx)
})
}

Expand Down
8 changes: 4 additions & 4 deletions rollout/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (c *rolloutContext) syncReplicaSetRevision() (*appsv1.ReplicaSet, error) {
rsCopy.Spec.MinReadySeconds = c.rollout.Spec.MinReadySeconds
rsCopy.Spec.Template.Spec.Affinity = replicasetutil.GenerateReplicaSetAffinity(*c.rollout)

rs, err := c.updateReplicaSetFallbackToPatch(ctx, rsCopy)
rs, err := c.updateReplicaSet(ctx, rsCopy)
if err != nil {
return nil, fmt.Errorf("failed to update replicaset revision on %s: %w", rsCopy.Name, err)
}
Expand Down Expand Up @@ -372,13 +372,13 @@ func (c *rolloutContext) scaleReplicaSet(rs *appsv1.ReplicaSet, newScale int32,
*(rsCopy.Spec.Replicas) = newScale
annotations.SetReplicasAnnotations(rsCopy, rolloutReplicas)
if fullScaleDown && !c.shouldDelayScaleDownOnAbort() {
// This bypasses the normal call to removeScaleDownDelay and then depends on the removal via an update in updateReplicaSetFallbackToPatch
// This bypasses the normal call to removeScaleDownDelay and then depends on the removal via an update in updateReplicaSet
delete(rsCopy.Annotations, v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey)
}

rs, err = c.updateReplicaSetFallbackToPatch(ctx, rsCopy)
rs, err = c.updateReplicaSet(ctx, rsCopy)
if err != nil {
return scaled, rs, fmt.Errorf("failed to updateReplicaSetFallbackToPatch in scaleReplicaSet: %w", err)
return scaled, rs, fmt.Errorf("failed to updateReplicaSet in scaleReplicaSet: %w", err)
}

if sizeNeedsUpdate {
Expand Down

0 comments on commit 42215e6

Please sign in to comment.