Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move trigger related code to the watcher #1422

Merged
merged 1 commit into from
Dec 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions pkg/skaffold/runner/dev.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,7 @@ func (r *SkaffoldRunner) Dev(ctx context.Context, out io.Writer, artifacts []*la
// Create watcher and register artifacts to build current state of files.
changed := changes{}
onChange := func() error {
defer func() {
changed.reset()
r.Trigger.WatchForChanges(out)
}()
defer changed.reset()

logger.Mute()

Expand Down Expand Up @@ -149,6 +146,5 @@ func (r *SkaffoldRunner) Dev(ctx context.Context, out io.Writer, artifacts []*la
}
}

r.Trigger.WatchForChanges(out)
return r.Watcher.Run(ctx, r.Trigger, onChange)
return r.Watcher.Run(ctx, out, onChange)
}
11 changes: 6 additions & 5 deletions pkg/skaffold/runner/dev_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package runner
import (
"context"
"errors"
"io"
"io/ioutil"
"testing"

Expand All @@ -29,21 +30,21 @@ import (

type NoopWatcher struct{}

func (t *NoopWatcher) Register(deps func() ([]string, error), onChange func(watch.Events)) error {
func (t *NoopWatcher) Register(func() ([]string, error), func(watch.Events)) error {
return nil
}

func (t *NoopWatcher) Run(ctx context.Context, trigger watch.Trigger, onChange func() error) error {
func (t *NoopWatcher) Run(context.Context, io.Writer, func() error) error {
return nil
}

type FailWatcher struct{}

func (t *FailWatcher) Register(deps func() ([]string, error), onChange func(watch.Events)) error {
func (t *FailWatcher) Register(func() ([]string, error), func(watch.Events)) error {
return nil
}

func (t *FailWatcher) Run(ctx context.Context, trigger watch.Trigger, onChange func() error) error {
func (t *FailWatcher) Run(context.Context, io.Writer, func() error) error {
return errors.New("BUG")
}

Expand All @@ -58,7 +59,7 @@ func (t *TestWatcher) Register(deps func() ([]string, error), onChange func(watc
return nil
}

func (t *TestWatcher) Run(ctx context.Context, trigger watch.Trigger, onChange func() error) error {
func (t *TestWatcher) Run(ctx context.Context, out io.Writer, onChange func() error) error {
for _, evt := range t.events {
t.testBench.enterNewCycle()

Expand Down
4 changes: 1 addition & 3 deletions pkg/skaffold/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ type SkaffoldRunner struct {
deploy.Deployer
test.Tester
tag.Tagger
watch.Trigger
sync.Syncer
watch.Watcher

Expand Down Expand Up @@ -107,9 +106,8 @@ func NewForConfig(opts *config.SkaffoldOptions, cfg *latest.SkaffoldPipeline) (*
Tester: tester,
Deployer: deployer,
Tagger: tagger,
Trigger: trigger,
Syncer: &kubectl.Syncer{},
Watcher: watch.NewWatcher(),
Watcher: watch.NewWatcher(trigger),
opts: opts,
imageList: kubernetes.NewImageList(),
}, nil
Expand Down
28 changes: 18 additions & 10 deletions pkg/skaffold/watch/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package watch

import (
"context"
"io"

"github.com/pkg/errors"
)
Expand All @@ -28,14 +29,19 @@ type Factory func() Watcher
// Watcher monitors files changes for multiples components.
type Watcher interface {
Register(deps func() ([]string, error), onChange func(Events)) error
Run(ctx context.Context, trigger Trigger, onChange func() error) error
Run(ctx context.Context, out io.Writer, onChange func() error) error
}

type watchList []*component
type watchList struct {
components []*component
trigger Trigger
}

// NewWatcher creates a new Watcher.
func NewWatcher() Watcher {
return &watchList{}
func NewWatcher(trigger Trigger) Watcher {
return &watchList{
trigger: trigger,
}
}

type component struct {
Expand All @@ -52,7 +58,7 @@ func (w *watchList) Register(deps func() ([]string, error), onChange func(Events
return errors.Wrap(err, "listing files")
}

*w = append(*w, &component{
w.components = append(w.components, &component{
deps: deps,
onChange: onChange,
state: state,
Expand All @@ -61,19 +67,20 @@ func (w *watchList) Register(deps func() ([]string, error), onChange func(Events
}

// Run watches files until the context is cancelled or an error occurs.
func (w *watchList) Run(ctx context.Context, trigger Trigger, onChange func() error) error {
t, cleanup := trigger.Start()
func (w *watchList) Run(ctx context.Context, out io.Writer, onChange func() error) error {
t, cleanup := w.trigger.Start()
defer cleanup()

changedComponents := map[int]bool{}

w.trigger.WatchForChanges(out)
for {
select {
case <-ctx.Done():
return nil
case <-t:
changed := 0
for i, component := range *w {
for i, component := range w.components {
state, err := Stat(component.deps)
if err != nil {
return errors.Wrap(err, "listing files")
Expand All @@ -93,9 +100,9 @@ func (w *watchList) Run(ctx context.Context, trigger Trigger, onChange func() er
// To prevent that, we debounce changes that happen too quickly
// by waiting for a full turn where nothing happens and trigger a rebuild for
// the accumulated changes.
debounce := trigger.Debounce()
debounce := w.trigger.Debounce()
if (!debounce && changed > 0) || (debounce && changed == 0 && len(changedComponents) > 0) {
for i, component := range *w {
for i, component := range w.components {
if changedComponents[i] {
component.onChange(component.events)
}
Expand All @@ -106,6 +113,7 @@ func (w *watchList) Run(ctx context.Context, trigger Trigger, onChange func() er
}

changedComponents = map[int]bool{}
w.trigger.WatchForChanges(out)
}
}
}
Expand Down
11 changes: 5 additions & 6 deletions pkg/skaffold/watch/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package watch

import (
"context"
"io/ioutil"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -70,7 +71,9 @@ func TestWatch(t *testing.T) {
somethingChanged := newCallback()

// Watch folder
watcher := NewWatcher()
watcher := NewWatcher(&pollTrigger{
Interval: 10 * time.Millisecond,
})
err := watcher.Register(folder.List, folderChanged.call)
testutil.CheckError(t, false, err)

Expand All @@ -79,11 +82,7 @@ func TestWatch(t *testing.T) {
var stopped sync.WaitGroup
stopped.Add(1)
go func() {
trigger := &pollTrigger{
Interval: 10 * time.Millisecond,
}

err = watcher.Run(ctx, trigger, somethingChanged.callNoErr)
err = watcher.Run(ctx, ioutil.Discard, somethingChanged.callNoErr)
stopped.Done()
testutil.CheckError(t, false, err)
}()
Expand Down