Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Commit

Permalink
Keep track of observed generations and rollbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
hiddeco committed May 15, 2019
1 parent d03de1d commit 2786e0f
Show file tree
Hide file tree
Showing 7 changed files with 216 additions and 121 deletions.
10 changes: 5 additions & 5 deletions cmd/helm-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,16 +166,16 @@ func main() {
TLSHostname: *tillerTLSHostname,
})

// The status updater, to keep track the release status for each
// HelmRelease. It runs as a separate loop for now.
statusUpdater := status.New(ifClient, kubeClient, helmClient, *namespace)
go statusUpdater.Loop(shutdown, log.With(logger, "component", "annotator"))

nsOpt := ifinformers.WithNamespace(*namespace)
ifInformerFactory := ifinformers.NewSharedInformerFactoryWithOptions(ifClient, *chartsSyncInterval, nsOpt)
fhrInformer := ifInformerFactory.Flux().V1beta1().HelmReleases()
go ifInformerFactory.Start(shutdown)

// The status updater, to keep track the release status for each
// HelmRelease. It runs as a separate loop for now.
statusUpdater := status.New(ifClient, fhrInformer.Lister(), helmClient)
go statusUpdater.Loop(shutdown, log.With(logger, "component", "statusupdater"))

queue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ChartRelease")

// release instance is needed during the sync of git chart changes and during the sync of HelmRelease changes
Expand Down
14 changes: 12 additions & 2 deletions integrations/apis/flux.weave.works/v1beta1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ type GitChartSource struct {
SkipDepUpdate bool `json:"skipDepUpdate,omitempty"`
}

// DefaultGitRef is the ref assumed if the Ref field is not given in a GitChartSource
// DefaultGitRef is the ref assumed if the Ref field is not given in
// a GitChartSource
const DefaultGitRef = "master"

func (s GitChartSource) RefOrDefault() string {
Expand Down Expand Up @@ -135,7 +136,7 @@ type HelmReleaseSpec struct {
// Force resource update through delete/recreate, allows recovery from a failed state
// +optional
ForceUpgrade bool `json:"forceUpgrade,omitempty"`
// Enable automatic rollbacks
// Enable rollback and configure options
// +optional
Rollback Rollback `json:"rollback,omitempty"`
}
Expand All @@ -157,6 +158,10 @@ type HelmReleaseStatus struct {
// managed by this resource.
ReleaseStatus string `json:"releaseStatus"`

// ObservedGeneration is the most recent generation observed by
// the controller.
ObservedGeneration int64 `json:"observedGeneration"`

// Revision would define what Git hash or Chart version has currently
// been deployed.
// +optional
Expand All @@ -174,6 +179,8 @@ type HelmReleaseCondition struct {
Type HelmReleaseConditionType `json:"type"`
Status v1.ConditionStatus `json:"status"`
// +optional
LastUpdateTime metav1.Time `json:"lastUpdateTime,omitempty"`
// +optional
LastTransitionTime metav1.Time `json:"lastTransitionTime,omitempty"`
// +optional
Reason string `json:"reason,omitempty"`
Expand All @@ -190,6 +197,9 @@ const (
// Released means the chart release, as specified in this
// HelmRelease, has been processed by Helm.
HelmReleaseReleased HelmReleaseConditionType = "Released"
// RolledBack means the chart to which the HelmRelease refers
// has been rolled back
HelmReleaseRolledBack HelmReleaseConditionType = "RolledBack"
)

// FluxHelmValues embeds chartutil.Values so we can implement deepcopy on map[string]interface{}
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

64 changes: 34 additions & 30 deletions integrations/helm/chartsync/chartsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,13 @@ import (
"sync"
"time"

"k8s.io/apimachinery/pkg/labels"

"github.com/go-kit/kit/log"
google_protobuf "github.com/golang/protobuf/ptypes/any"
"github.com/google/go-cmp/cmp"
"github.com/ncabatoff/go-seq/seq"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
Expand All @@ -74,6 +73,7 @@ const (
ReasonInstallFailed = "HelmInstallFailed"
ReasonDependencyFailed = "UpdateDependencyFailed"
ReasonUpgradeFailed = "HelmUgradeFailed"
ReasonRollbackFailed = "HelmRollbackFailed"
ReasonCloned = "GitRepoCloned"
ReasonSuccess = "HelmSuccess"
)
Expand Down Expand Up @@ -243,15 +243,15 @@ func (chs *ChartChangeSync) Run(stopCh <-chan struct{}, errc chan error, wg *syn
if cloneForChart.export != nil {
cloneForChart.export.Clean()
}
}

// Enqueue release
cacheKey, err := cache.MetaNamespaceKeyFunc(fhr.GetObjectMeta())
if err != nil {
continue
// Enqueue release
cacheKey, err := cache.MetaNamespaceKeyFunc(fhr.GetObjectMeta())
if err != nil {
continue
}
chs.logger.Log("info", "enqueing release upgrade due to change in git chart source", "resource", fhr.ResourceID().String())
chs.releaseQueue.AddRateLimited(cacheKey)
}
chs.logger.Log("info", "enqueing release upgrade due to change in git chart source", "resource", fhr.ResourceID().String())
chs.releaseQueue.AddRateLimited(cacheKey)
}
}
case <-stopCh:
Expand Down Expand Up @@ -375,8 +375,11 @@ func (chs *ChartChangeSync) reconcileReleaseDef(fhr fluxv1beta1.HelmRelease) {
chs.logger.Log("warning", "failed to install chart", "resource", fhr.ResourceID().String(), "err", err)
return
}
if err = chs.updateObservedGeneration(fhr); err != nil {
chs.logger.Log("warning", "could not update the observed generation", "resource", fhr.ResourceID().String(), "err", err)
}
chs.setCondition(fhr, fluxv1beta1.HelmReleaseReleased, v1.ConditionTrue, ReasonSuccess, "helm install succeeded")
if err = status.UpdateReleaseRevision(chs.ifClient.FluxV1beta1().HelmReleases(fhr.Namespace), fhr, chartRevision); err != nil {
if err = status.SetReleaseRevision(chs.ifClient.FluxV1beta1().HelmReleases(fhr.Namespace), fhr, chartRevision); err != nil {
chs.logger.Log("warning", "could not update the release revision", "namespace", fhr.Namespace, "resource", fhr.Name, "err", err)
}
return
Expand All @@ -403,8 +406,11 @@ func (chs *ChartChangeSync) reconcileReleaseDef(fhr fluxv1beta1.HelmRelease) {
chs.logger.Log("warning", "failed to upgrade chart", "resource", fhr.ResourceID().String(), "err", err)
return
}
if err = chs.updateObservedGeneration(fhr); err != nil {
chs.logger.Log("warning", "could not update the observed generation", "resource", fhr.ResourceID().String(), "err", err)
}
chs.setCondition(fhr, fluxv1beta1.HelmReleaseReleased, v1.ConditionTrue, ReasonSuccess, "helm upgrade succeeded")
if err = status.UpdateReleaseRevision(chs.ifClient.FluxV1beta1().HelmReleases(fhr.Namespace), fhr, chartRevision); err != nil {
if err = status.SetReleaseRevision(chs.ifClient.FluxV1beta1().HelmReleases(fhr.Namespace), fhr, chartRevision); err != nil {
chs.logger.Log("warning", "could not update the release revision", "resource", fhr.ResourceID().String(), "err", err)
}
return
Expand All @@ -422,7 +428,12 @@ func (chs *ChartChangeSync) RollbackRelease(fhr fluxv1beta1.HelmRelease) {
fhr.Spec.Rollback.Recreate, fhr.Spec.Rollback.DisableHooks, fhr.Spec.Rollback.Wait)
if err != nil {
chs.logger.Log("warning", "unable to rollback chart release", "resource", fhr.ResourceID().String(), "release", name, "err", err)
chs.setCondition(fhr, fluxv1beta1.HelmReleaseRolledBack, v1.ConditionFalse, ReasonRollbackFailed, err.Error())
}
if err = chs.updateObservedGeneration(fhr); err != nil {
chs.logger.Log("warning", "could not update the observed generation", "resource", fhr.ResourceID().String(), "err", err)
}
chs.setCondition(fhr, fluxv1beta1.HelmReleaseRolledBack, v1.ConditionTrue, ReasonSuccess, "helm rollback succeeded")
}

// DeleteRelease deletes the helm release associated with a
Expand Down Expand Up @@ -478,26 +489,19 @@ func (chs *ChartChangeSync) getCustomResourcesForMirror(mirror string) ([]fluxv1
return fhrs, nil
}

// setCondition saves the status of a condition, if it's new
// information. New information is something that adds or changes the
// status, reason or message (i.e., anything but the transition time)
// for one of the types of condition.
func (chs *ChartChangeSync) setCondition(fhr fluxv1beta1.HelmRelease, typ fluxv1beta1.HelmReleaseConditionType, st v1.ConditionStatus, reason, message string) error {
for _, c := range fhr.Status.Conditions {
if c.Type == typ && c.Status == st && c.Message == message && c.Reason == reason {
return nil
}
}
// setCondition saves the status of a condition.
func (chs *ChartChangeSync) setCondition(hr fluxv1beta1.HelmRelease, typ fluxv1beta1.HelmReleaseConditionType, st v1.ConditionStatus, reason, message string) error {
hrClient := chs.ifClient.FluxV1beta1().HelmReleases(hr.Namespace)
condition := status.NewCondition(typ, st, reason, message)
return status.SetCondition(hrClient, hr, condition)
}

fhrClient := chs.ifClient.FluxV1beta1().HelmReleases(fhr.Namespace)
cond := fluxv1beta1.HelmReleaseCondition{
Type: typ,
Status: st,
LastTransitionTime: metav1.Now(),
Reason: reason,
Message: message,
}
return status.UpdateConditions(fhrClient, fhr, cond)
// updateObservedGeneration updates the observed generation of the
// given HelmRelease to the generation.
func (chs *ChartChangeSync) updateObservedGeneration(hr fluxv1beta1.HelmRelease) error {
hrClient := chs.ifClient.FluxV1beta1().HelmReleases(hr.Namespace)

return status.SetObservedGeneration(hrClient, hr, hr.Generation)
}

func sortStrings(ss []string) []string {
Expand Down
19 changes: 13 additions & 6 deletions integrations/helm/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/helm/pkg/proto/hapi/release"

flux_v1beta1 "github.com/weaveworks/flux/integrations/apis/flux.weave.works/v1beta1"
ifscheme "github.com/weaveworks/flux/integrations/client/clientset/versioned/scheme"
fhrv1 "github.com/weaveworks/flux/integrations/client/informers/externalversions/flux.weave.works/v1beta1"
iflister "github.com/weaveworks/flux/integrations/client/listers/flux.weave.works/v1beta1"
"github.com/weaveworks/flux/integrations/helm/chartsync"
"github.com/weaveworks/flux/integrations/helm/status"
)

const (
Expand Down Expand Up @@ -234,8 +234,8 @@ func (c *Controller) syncHandler(key string) error {
return err
}

// Attempt a rollback if the release status is FAILED.
if fhr.Status.ReleaseStatus == release.Status_FAILED.String() {
// (Maybe) attempt a rollback if the release has failed.
if status.ReleaseFailed(*fhr) {
c.sync.RollbackRelease(*fhr)
return nil
}
Expand Down Expand Up @@ -289,10 +289,10 @@ func (c *Controller) enqueueUpdateJob(old, new interface{}) {
return
}

// Enqueue rollback if the status of the release has changed to
// FAILED, and rollbacks for this HelmRelease are enabled.
// Enqueue rollback if the roll-out of the release failed and
// rollbacks are enabled.
if oldFhr.Status.ReleaseStatus != newFhr.Status.ReleaseStatus {
if newFhr.Spec.Rollback.Enable && newFhr.Status.ReleaseStatus == release.Status_FAILED.String() {
if newFhr.Spec.Rollback.Enable && status.ReleaseFailed(newFhr) {
c.logger.Log("info", "enqueing rollback", "resource", newFhr.ResourceID().String())
c.enqueueJob(new)
return
Expand All @@ -310,6 +310,13 @@ func (c *Controller) enqueueUpdateJob(old, new interface{}) {
return
}

// Skip if the current HelmRelease generation has been rolled
// back, as otherwise we will end up in a loop of failure.
if status.HasRolledBack(newFhr) {
c.logger.Log("warning", "current revision has been rolled back", "resource", newFhr.ResourceID().String())
return
}

log := []string{"info", "enqueuing release"}
if diff != "" && c.logDiffs {
log = append(log, "diff", diff)
Expand Down
77 changes: 50 additions & 27 deletions integrations/helm/status/conditions.go
Original file line number Diff line number Diff line change
@@ -1,43 +1,66 @@
package status

import (
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/weaveworks/flux/integrations/apis/flux.weave.works/v1beta1"
v1beta1client "github.com/weaveworks/flux/integrations/client/clientset/versioned/typed/flux.weave.works/v1beta1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// We can't rely on having UpdateStatus, or strategic merge patching
// for custom resources. So we have to create an object which
// represents the merge path or JSON patch to apply.
func UpdateConditionsPatch(status *v1beta1.HelmReleaseStatus, updates ...v1beta1.HelmReleaseCondition) {
newConditions := make([]v1beta1.HelmReleaseCondition, len(status.Conditions))
oldConditions := status.Conditions
for i, c := range oldConditions {
newConditions[i] = c
}
updates:
for _, up := range updates {
for i, c := range oldConditions {
if c.Type == up.Type {
newConditions[i] = up
continue updates
}
}
newConditions = append(newConditions, up)

// NewCondition creates a new HelmReleaseCondition.
func NewCondition(conditionType v1beta1.HelmReleaseConditionType, status v1.ConditionStatus, reason, message string) v1beta1.HelmReleaseCondition {
return v1beta1.HelmReleaseCondition{
Type: conditionType,
Status: status,
LastUpdateTime: metav1.Now(),
LastTransitionTime: metav1.Now(),
Reason: reason,
Message: message,
}
status.Conditions = newConditions
}

// UpdateConditions retrieves a new copy of the HelmRelease given,
// applies the updates to this copy, and updates the resource in the
// cluster.
func UpdateConditions(client v1beta1client.HelmReleaseInterface, fhr v1beta1.HelmRelease, updates ...v1beta1.HelmReleaseCondition) error {
cFhr, err := client.Get(fhr.Name, v1.GetOptions{})
// SetCondition updates the HelmRelease to include the given condition.
func SetCondition(client v1beta1client.HelmReleaseInterface, hr v1beta1.HelmRelease,
condition v1beta1.HelmReleaseCondition) error {

cHr, err := client.Get(hr.Name, metav1.GetOptions{})
if err != nil {
return err
}

UpdateConditionsPatch(&cFhr.Status, updates...)
_, err = client.UpdateStatus(cFhr)
currCondition := GetCondition(cHr.Status, condition.Type)
if currCondition != nil && currCondition.Status == condition.Status {
condition.LastTransitionTime = currCondition.LastTransitionTime
}

newConditions := filterOutCondition(cHr.Status.Conditions, condition.Type)
cHr.Status.Conditions = append(newConditions, condition)

_, err = client.UpdateStatus(cHr)
return err
}

// GetCondition returns the condition with the given type.
func GetCondition(status v1beta1.HelmReleaseStatus, conditionType v1beta1.HelmReleaseConditionType) *v1beta1.HelmReleaseCondition {
for i := range status.Conditions {
c := status.Conditions[i]
if c.Type == conditionType {
return &c
}
}
return nil
}

// filterOutCondition returns a new slice of conditions without the
// conditions of the given type.
func filterOutCondition(conditions []v1beta1.HelmReleaseCondition, conditionType v1beta1.HelmReleaseConditionType) []v1beta1.HelmReleaseCondition {
var newConditions []v1beta1.HelmReleaseCondition
for _, c := range conditions {
if c.Type == conditionType {
continue
}
newConditions = append(newConditions, c)
}
return newConditions
}
Loading

0 comments on commit 2786e0f

Please sign in to comment.