Skip to content

Commit

Permalink
Add recorder component
Browse files Browse the repository at this point in the history
- records the canary analysis status and current weight as Prometheus metrics
  • Loading branch information
stefanprodan committed Oct 29, 2018
1 parent 6bf4a8f commit 4a8aa3b
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 18 deletions.
16 changes: 10 additions & 6 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ type Controller struct {
flaggerSynced cache.InformerSynced
flaggerWindow time.Duration
workqueue workqueue.RateLimitingInterface
recorder record.EventRecorder
eventRecorder record.EventRecorder
logger *zap.SugaredLogger
canaries *sync.Map
deployer CanaryDeployer
router CanaryRouter
observer CanaryObserver
recorder CanaryRecorder
}

func NewController(
Expand All @@ -61,7 +62,7 @@ func NewController(
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{
Interface: kubeClient.CoreV1().Events(""),
})
recorder := eventBroadcaster.NewRecorder(
eventRecorder := eventBroadcaster.NewRecorder(
scheme.Scheme, corev1.EventSource{Component: controllerAgentName})

deployer := CanaryDeployer{
Expand All @@ -82,20 +83,23 @@ func NewController(
metricsServer: metricServer,
}

recorder := NewCanaryRecorder()

ctrl := &Controller{
kubeClient: kubeClient,
istioClient: istioClient,
flaggerClient: flaggerClient,
flaggerLister: flaggerInformer.Lister(),
flaggerSynced: flaggerInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerAgentName),
recorder: recorder,
eventRecorder: eventRecorder,
logger: logger,
canaries: new(sync.Map),
flaggerWindow: flaggerWindow,
deployer: deployer,
router: router,
observer: observer,
recorder: recorder,
}

flaggerInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -239,17 +243,17 @@ func checkCustomResourceType(obj interface{}, logger *zap.SugaredLogger) (flagge

func (c *Controller) recordEventInfof(r *flaggerv1.Canary, template string, args ...interface{}) {
c.logger.Infof(template, args...)
c.recorder.Event(r, corev1.EventTypeNormal, "Synced", fmt.Sprintf(template, args...))
c.eventRecorder.Event(r, corev1.EventTypeNormal, "Synced", fmt.Sprintf(template, args...))
}

func (c *Controller) recordEventErrorf(r *flaggerv1.Canary, template string, args ...interface{}) {
c.logger.Errorf(template, args...)
c.recorder.Event(r, corev1.EventTypeWarning, "Synced", fmt.Sprintf(template, args...))
c.eventRecorder.Event(r, corev1.EventTypeWarning, "Synced", fmt.Sprintf(template, args...))
}

func (c *Controller) recordEventWarningf(r *flaggerv1.Canary, template string, args ...interface{}) {
c.logger.Infof(template, args...)
c.recorder.Event(r, corev1.EventTypeWarning, "Synced", fmt.Sprintf(template, args...))
c.eventRecorder.Event(r, corev1.EventTypeWarning, "Synced", fmt.Sprintf(template, args...))
}

func int32p(i int32) *int32 {
Expand Down
57 changes: 57 additions & 0 deletions pkg/controller/recorder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package controller

import (
"fmt"

"github.com/prometheus/client_golang/prometheus"
flaggerv1 "github.com/stefanprodan/flagger/pkg/apis/flagger/v1alpha1"
)

// CanaryRecorder records the canary analysis as Prometheus metrics
type CanaryRecorder struct {
status *prometheus.GaugeVec
weight *prometheus.GaugeVec
}

// NewCanaryRecorder registers the Prometheus metrics
func NewCanaryRecorder() CanaryRecorder {
// 0 - running, 1 - successful, 2 - failed
status := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Subsystem: controllerAgentName,
Name: "canary_status",
Help: "Last canary analysis result",
}, []string{"name", "namespace"})
prometheus.MustRegister(status)

weight := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Subsystem: controllerAgentName,
Name: "canary_weight",
Help: "The virtual service destination weight current value",
}, []string{"workload", "namespace"})
prometheus.MustRegister(weight)

return CanaryRecorder{
status: status,
weight: weight,
}
}

// RecordStatus sets the last known canary analysis status
func (cr *CanaryRecorder) RecordStatus(cd *flaggerv1.Canary) {
status := 1
switch cd.Status.State {
case "running":
status = 0
case "failed":
status = 2
default:
status = 1
}
cr.status.WithLabelValues(cd.Spec.TargetRef.Name, cd.Namespace).Set(float64(status))
}

// RecordWeight sets the weight values for primary and canary destinations
func (cr *CanaryRecorder) RecordWeight(cd *flaggerv1.Canary, primary int, canary int) {
cr.weight.WithLabelValues(fmt.Sprintf("%s-primary", cd.Spec.TargetRef.Name), cd.Namespace).Set(float64(primary))
cr.weight.WithLabelValues(cd.Spec.TargetRef.Name, cd.Namespace).Set(float64(canary))
}
30 changes: 20 additions & 10 deletions pkg/controller/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ func (c *Controller) advanceCanary(name string, namespace string) {
return
}

c.recorder.RecordWeight(cd, primaryRoute.Weight, canaryRoute.Weight)

// check if canary analysis should start (canary revision has changes) or continue
if ok := c.checkCanaryStatus(cd, c.deployer); !ok {
return
Expand All @@ -76,6 +78,7 @@ func (c *Controller) advanceCanary(name string, namespace string) {
return
}

c.recorder.RecordWeight(cd, primaryRoute.Weight, canaryRoute.Weight)
c.recordEventWarningf(cd, "Canary failed! Scaling down %s.%s",
cd.Spec.TargetRef.Name, cd.Namespace)

Expand All @@ -90,6 +93,7 @@ func (c *Controller) advanceCanary(name string, namespace string) {
c.logger.Errorf("%v", err)
return
}
c.recorder.RecordStatus(cd)
return
}

Expand Down Expand Up @@ -123,6 +127,7 @@ func (c *Controller) advanceCanary(name string, namespace string) {
return
}

c.recorder.RecordWeight(cd, primaryRoute.Weight, canaryRoute.Weight)
c.recordEventInfof(cd, "Advance %s.%s canary weight %v", cd.Name, cd.Namespace, canaryRoute.Weight)

// promote canary
Expand All @@ -144,6 +149,7 @@ func (c *Controller) advanceCanary(name string, namespace string) {
return
}

c.recorder.RecordWeight(cd, primaryRoute.Weight, canaryRoute.Weight)
c.recordEventInfof(cd, "Promotion completed! Scaling down %s.%s", cd.Spec.TargetRef.Name, cd.Namespace)

// shutdown canary
Expand All @@ -157,33 +163,37 @@ func (c *Controller) advanceCanary(name string, namespace string) {
c.recordEventWarningf(cd, "%v", err)
return
}
c.recorder.RecordStatus(cd)
}
}

func (c *Controller) checkCanaryStatus(r *flaggerv1.Canary, deployer CanaryDeployer) bool {
if r.Status.State == "running" {
func (c *Controller) checkCanaryStatus(cd *flaggerv1.Canary, deployer CanaryDeployer) bool {
if cd.Status.State == "running" {
c.recorder.RecordStatus(cd)
return true
}

if r.Status.State == "" {
if err := deployer.SyncStatus(r, flaggerv1.CanaryStatus{State: "initialized"}); err != nil {
if cd.Status.State == "" {
if err := deployer.SyncStatus(cd, flaggerv1.CanaryStatus{State: "initialized"}); err != nil {
c.logger.Errorf("%v", err)
return false
}
c.recordEventInfof(r, "Initialization done! %s.%s", r.Name, r.Namespace)
c.recorder.RecordStatus(cd)
c.recordEventInfof(cd, "Initialization done! %s.%s", cd.Name, cd.Namespace)
return false
}

if diff, err := deployer.IsNewSpec(r); diff {
c.recordEventInfof(r, "New revision detected! Scaling up %s.%s", r.Spec.TargetRef.Name, r.Namespace)
if err = deployer.Scale(r, 1); err != nil {
c.recordEventErrorf(r, "%v", err)
if diff, err := deployer.IsNewSpec(cd); diff {
c.recordEventInfof(cd, "New revision detected! Scaling up %s.%s", cd.Spec.TargetRef.Name, cd.Namespace)
if err = deployer.Scale(cd, 1); err != nil {
c.recordEventErrorf(cd, "%v", err)
return false
}
if err := deployer.SyncStatus(r, flaggerv1.CanaryStatus{State: "running"}); err != nil {
if err := deployer.SyncStatus(cd, flaggerv1.CanaryStatus{State: "running"}); err != nil {
c.logger.Errorf("%v", err)
return false
}
c.recorder.RecordStatus(cd)
return false
}
return false
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestScheduler_Init(t *testing.T) {
flaggerLister: flaggerInformer.Lister(),
flaggerSynced: flaggerInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerAgentName),
recorder: &record.FakeRecorder{},
eventRecorder: &record.FakeRecorder{},
logger: logger,
canaries: new(sync.Map),
flaggerWindow: time.Second,
Expand Down Expand Up @@ -108,7 +108,7 @@ func TestScheduler_NewRevision(t *testing.T) {
flaggerLister: flaggerInformer.Lister(),
flaggerSynced: flaggerInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerAgentName),
recorder: &record.FakeRecorder{},
eventRecorder: &record.FakeRecorder{},
logger: logger,
canaries: new(sync.Map),
flaggerWindow: time.Second,
Expand Down

0 comments on commit 4a8aa3b

Please sign in to comment.