Skip to content

Commit

Permalink
Merge pull request #219 from keel-hq/feature/deregister_tracked_images
Browse files Browse the repository at this point in the history
unwatching images that are not tracked anymore
rusenask authored Jun 3, 2018

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
2 parents d003b62 + af0a7d8 commit 356b4d8
Showing 3 changed files with 153 additions and 56 deletions.
37 changes: 6 additions & 31 deletions trigger/poll/manager.go
Original file line number Diff line number Diff line change
@@ -6,24 +6,10 @@ import (
"time"

"github.com/keel-hq/keel/provider"
"github.com/keel-hq/keel/types"

"github.com/prometheus/client_golang/prometheus"

log "github.com/sirupsen/logrus"
)

var pollTriggerTrackedImages = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "poll_trigger_tracked_images",
Help: "How many images are tracked by poll trigger",
},
)

func init() {
prometheus.MustRegister(pollTriggerTrackedImages)
}

// DefaultManager - default manager is responsible for scanning deployments and identifying
// deployments that have market
type DefaultManager struct {
@@ -47,7 +33,7 @@ func NewPollManager(providers provider.Providers, watcher Watcher) *DefaultManag
providers: providers,
watcher: watcher,
mu: &sync.Mutex{},
scanTick: 1,
scanTick: 3,
}
}

@@ -91,23 +77,12 @@ func (s *DefaultManager) scan(ctx context.Context) error {
return err
}

var tracked float64
for _, trackedImage := range trackedImages {
if trackedImage.Trigger != types.TriggerTypePoll {
continue
}
tracked++
err = s.watcher.Watch(trackedImage, trackedImage.PollSchedule)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"schedule": trackedImage.PollSchedule,
"image": trackedImage.Image.Remote(),
}).Error("trigger.poll.manager: failed to start watching repository")
}
err = s.watcher.Watch(trackedImages...)
if err != nil {
log.WithFields(log.Fields{
"error": err,
}).Error("trigger.poll.manager: got error(-s) while watching images")
}

pollTriggerTrackedImages.Set(tracked)

return nil
}
90 changes: 71 additions & 19 deletions trigger/poll/watcher.go
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@ package poll
import (
"context"
"fmt"
"strings"

"github.com/keel-hq/keel/extension/credentialshelper"
"github.com/keel-hq/keel/provider"
@@ -25,13 +26,21 @@ var registriesScannedCounter = prometheus.NewCounterVec(
[]string{"registry", "image"},
)

var pollTriggerTrackedImages = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "poll_trigger_tracked_images",
Help: "How many images are tracked by poll trigger",
},
)

func init() {
prometheus.MustRegister(registriesScannedCounter)
prometheus.MustRegister(pollTriggerTrackedImages)
}

// Watcher - generic watcher interface
type Watcher interface {
Watch(image *types.TrackedImage, schedule string) error
Watch(image ...*types.TrackedImage) error
Unwatch(image string) error
}

@@ -73,11 +82,10 @@ func (w *RepositoryWatcher) Start(ctx context.Context) {
// starting cron job
w.cron.Start()
go func() {
for {
select {
case <-ctx.Done():
w.cron.Stop()
}
select {
case <-ctx.Done():
w.cron.Stop()
return
}
}()
}
@@ -114,20 +122,62 @@ func (w *RepositoryWatcher) Unwatch(imageName string) error {

// Watch - starts watching repository for changes, if it's already watching - ignores,
// if details changed - updates details
func (w *RepositoryWatcher) Watch(image *types.TrackedImage, schedule string) error {
func (w *RepositoryWatcher) Watch(images ...*types.TrackedImage) error {

var errs []string
tracked := map[string]bool{}

for _, image := range images {
identifier, err := w.watch(image)
if err != nil {
errs = append(errs, err.Error())
continue
}
tracked[identifier] = true
}

pollTriggerTrackedImages.Set(float64(len(tracked)))

// removing registries that should not be tracked anymore
// for example: deployment using image X was deleted so we should not query
// registry that points to image X as nothing is using it anymore
w.unwatch(tracked)

if len(errs) > 0 {
return fmt.Errorf("encountered errors while adding images: %s", strings.Join(errs, ", "))
}

return nil
}

if schedule == "" {
return fmt.Errorf("cron schedule cannot be empty")
func (w *RepositoryWatcher) unwatch(tracked map[string]bool) {
for key, details := range w.watched {
if !tracked[key] {
log.WithFields(log.Fields{
"job_name": key,
"image": details.trackedImage.String(),
"schedule": details.schedule,
}).Info("trigger.poll.RepositoryWatcher: image no tracked anymore, removing watcher")
w.cron.DeleteJob(key)
delete(w.watched, key)
}
}
}

func (w *RepositoryWatcher) watch(image *types.TrackedImage) (string, error) {

_, err := cron.Parse(schedule)
if image.PollSchedule == "" {
return "", fmt.Errorf("cron schedule cannot be empty")
}

_, err := cron.Parse(image.PollSchedule)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"image": image.String(),
"schedule": schedule,
"schedule": image.PollSchedule,
}).Error("trigger.poll.RepositoryWatcher.addJob: invalid cron schedule")
return fmt.Errorf("invalid cron schedule: %s", err)
return "", fmt.Errorf("invalid cron schedule: %s", err)
}

key := getImageIdentifier(image.Image)
@@ -136,25 +186,25 @@ func (w *RepositoryWatcher) Watch(image *types.TrackedImage, schedule string) er
details, ok := w.watched[key]
if !ok {
// err = w.addJob(imageRef, registryUsername, registryPassword, schedule)
err = w.addJob(image, schedule)
err = w.addJob(image, image.PollSchedule)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"image": image.String(),
}).Error("trigger.poll.RepositoryWatcher.Watch: failed to add image watch job")

return "", err
}
return err
return key, nil
}

// checking schedule
if details.schedule != schedule {
w.cron.UpdateJob(key, schedule)
if details.schedule != image.PollSchedule {
w.cron.UpdateJob(key, image.PollSchedule)
}

// nothing to do

return nil
return key, nil
}

func (w *RepositoryWatcher) addJob(ti *types.TrackedImage, schedule string) error {
@@ -254,7 +304,7 @@ func (j *WatchTagJob) Run() {
Password: creds.Password,
})

registriesScannedCounter.With(prometheus.Labels{"registry": j.details.trackedImage.Image.Registry(), "image": j.details.trackedImage.Image.Name()}).Inc()
registriesScannedCounter.With(prometheus.Labels{"registry": j.details.trackedImage.Image.Registry(), "image": j.details.trackedImage.Image.Repository()}).Inc()

if err != nil {
log.WithFields(log.Fields{
@@ -335,6 +385,8 @@ func (j *WatchRepositoryTagsJob) Run() {
return
}

registriesScannedCounter.With(prometheus.Labels{"registry": j.details.trackedImage.Image.Registry(), "image": j.details.trackedImage.Image.Repository()}).Inc()

log.WithFields(log.Fields{
"current_tag": j.details.trackedImage.Image.Tag(),
"repository_tags": repository.Tags,
82 changes: 76 additions & 6 deletions trigger/poll/watcher_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package poll

import (
"context"
"os"
"testing"
"time"
@@ -15,13 +16,14 @@ import (
"github.com/keel-hq/keel/util/image"
)

func mustParse(img string) *types.TrackedImage {
func mustParse(img string, schedule string) *types.TrackedImage {
ref, err := image.Parse(img)
if err != nil {
panic(err)
}
return &types.TrackedImage{
Image: ref,
Image: ref,
PollSchedule: schedule,
}
}

@@ -279,10 +281,14 @@ func TestWatchMultipleTags(t *testing.T) {

watcher := NewRepositoryWatcher(providers, frc)

watcher.Watch(mustParse("gcr.io/v2-namespace/hello-world:1.1.1"), "@every 10m")
watcher.Watch(mustParse("gcr.io/v2-namespace/greetings-world:1.1.1"), "@every 10m")
watcher.Watch(mustParse("gcr.io/v2-namespace/greetings-world:alpha"), "@every 10m")
watcher.Watch(mustParse("gcr.io/v2-namespace/greetings-world:master"), "@every 10m")
tracked := []*types.TrackedImage{
mustParse("gcr.io/v2-namespace/hello-world:1.1.1", "@every 10m"),
mustParse("gcr.io/v2-namespace/greetings-world:1.1.1", "@every 10m"),
mustParse("gcr.io/v2-namespace/greetings-world:alpha", "@every 10m"),
mustParse("gcr.io/v2-namespace/greetings-world:master", "@every 10m"),
}

watcher.Watch(tracked...)

if len(watcher.watched) != 4 {
t.Errorf("expected to find watching 4 entries, found: %d", len(watcher.watched))
@@ -428,3 +434,67 @@ func TestWatchTagJobLatestECR(t *testing.T) {
t.Errorf("job details digest wasn't updated")
}
}

func TestUnwatchAfterNotTrackedAnymore(t *testing.T) {
fp := &fakeProvider{}
mem := memory.NewMemoryCache(100*time.Millisecond, 100*time.Millisecond, 10*time.Millisecond)
am := approvals.New(mem, codecs.DefaultSerializer())
providers := provider.New([]provider.Provider{fp}, am)

// returning some sha
frc := &fakeRegistryClient{
digestToReturn: "sha256:0604af35299dd37ff23937d115d103532948b568a9dd8197d14c256a8ab8b0bb",
tagsToReturn: []string{"5.0.0"},
}

watcher := NewRepositoryWatcher(providers, frc)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
watcher.Start(ctx)

tracked := []*types.TrackedImage{
mustParse("gcr.io/v2-namespace/hello-world:1.1.1", "@every 10m"),
mustParse("gcr.io/v2-namespace/greetings-world:1.1.1", "@every 10m"),
mustParse("gcr.io/v2-namespace/greetings-world:alpha", "@every 10m"),
mustParse("gcr.io/v2-namespace/greetings-world:master", "@every 10m"),
}

watcher.Watch(tracked...)

if len(watcher.watched) != 4 {
t.Errorf("expected to find watching 4 entries, found: %d", len(watcher.watched))
}

if dig, ok := watcher.watched["gcr.io/v2-namespace/greetings-world:alpha"]; ok != true {
t.Errorf("alpha watcher not found")
if dig.digest != "sha256:0604af35299dd37ff23937d115d103532948b568a9dd8197d14c256a8ab8b0bb" {
t.Errorf("digest not set for alpha")
}
}

if dig, ok := watcher.watched["gcr.io/v2-namespace/greetings-world:master"]; ok != true {
t.Errorf("alpha watcher not found")
if dig.digest != "sha256:0604af35299dd37ff23937d115d103532948b568a9dd8197d14c256a8ab8b0bb" {
t.Errorf("digest not set for alpha")
}
}

if det, ok := watcher.watched["gcr.io/v2-namespace/greetings-world"]; ok != true {
t.Errorf("alpha watcher not found")
if det.latest != "5.0.0" {
t.Errorf("expected to find a tag set for multiple tags watch job")
}
}

trackedUpdated := []*types.TrackedImage{
mustParse("gcr.io/v2-namespace/hello-world:1.1.1", "@every 10m"),
mustParse("gcr.io/v2-namespace/greetings-world:1.1.1", "@every 10m"),
mustParse("gcr.io/v2-namespace/greetings-world:alpha", "@every 10m"),
}

watcher.Watch(trackedUpdated...)

if len(watcher.watched) != 3 {
t.Errorf("expected to find watching 3 entries, found: %d", len(watcher.watched))
}
}

0 comments on commit 356b4d8

Please sign in to comment.