Skip to content

Commit

Permalink
*: Refactor extension API
Browse files Browse the repository at this point in the history
RunOptions is now Runtime and represents taskrunner's Runtime, which can
be hooked into by extensions.

It now exposes OnStart, OnStop, Subscribe. Subscriptions have been
simplified to only provide a channel. ctx.Done checks can be replaced
with a range over the channel (range ends when the channel is closed).
  • Loading branch information
berfarah committed Nov 5, 2018
1 parent 7c606e8 commit d672483
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 78 deletions.
12 changes: 6 additions & 6 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ func New(opts ...shell.RunOption) *Cache {
}
}

func (c *Cache) Option(r *taskrunner.RunOptions) {
r.ReporterFns = append(r.ReporterFns, func(ctx context.Context, executor *taskrunner.Executor) error {
func (c *Cache) Option(r *taskrunner.Runtime) {
r.OnStart(func(ctx context.Context, executor *taskrunner.Executor) error {
c.cacheFile = getCacheFilePath(executor.Config().WorkingDir())
c.Start(ctx)
<-ctx.Done()
c.Finish(ctx)
return nil
return c.Start(ctx)
})
r.OnStop(func(ctx context.Context, executor *taskrunner.Executor) error {
return c.Finish(ctx)
})
}

Expand Down
62 changes: 23 additions & 39 deletions clireporter/cli.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package clireporter

import (
"context"
"fmt"
"time"

Expand All @@ -12,44 +11,29 @@ type cli struct {
Executor *taskrunner.Executor
}

func Option(r *taskrunner.RunOptions) {
r.ReporterFns = append(r.ReporterFns, func(ctx context.Context, executor *taskrunner.Executor) error {
New(executor).Run(ctx)
return nil
})
}

func New(executor *taskrunner.Executor) *cli {
return &cli{
Executor: executor,
}
}

func (c *cli) Run(ctx context.Context) {
events, done := c.Executor.Subscribe()
go func() {
<-ctx.Done()
done()
}()

for event := range events {
switch event := event.(type) {
case *taskrunner.TaskInvalidatedEvent:
fmt.Fprintf(event.TaskHandler().LogStdout(), "Invalidating %s for %d reasons:", event.TaskHandler().Definition().Name, len(event.Reasons))
for _, reason := range event.Reasons {
fmt.Fprintf(event.TaskHandler().LogStdout(), "- %s", reason.Description())
func Option(r *taskrunner.Runtime) {
r.Subscribe(func(events <-chan taskrunner.ExecutorEvent) error {
for event := range events {
switch event := event.(type) {
case *taskrunner.TaskInvalidatedEvent:
fmt.Fprintf(event.TaskHandler().LogStdout(), "Invalidating %s for %d reasons:", event.TaskHandler().Definition().Name, len(event.Reasons))
for _, reason := range event.Reasons {
fmt.Fprintf(event.TaskHandler().LogStdout(), "- %s", reason.Description())
}
case *taskrunner.TaskStartedEvent:
fmt.Fprintf(event.TaskHandler().LogStdout(), "Started")
case *taskrunner.TaskCompletedEvent:
fmt.Fprintf(event.TaskHandler().LogStdout(), "Completed (%0.2fs)", float64(event.Duration)/float64(time.Second))
case *taskrunner.TaskFailedEvent:
fmt.Fprintf(event.TaskHandler().LogStdout(), "Failed")
fmt.Fprintln(event.TaskHandler().LogStdout(), event.Error)
case *taskrunner.TaskDiagnosticEvent:
fmt.Fprintf(event.TaskHandler().LogStdout(), "Warning: %s", event.Error.Error())
case *taskrunner.TaskStoppedEvent:
fmt.Fprintf(event.TaskHandler().LogStdout(), "Stopped")
}
case *taskrunner.TaskStartedEvent:
fmt.Fprintf(event.TaskHandler().LogStdout(), "Started")
case *taskrunner.TaskCompletedEvent:
fmt.Fprintf(event.TaskHandler().LogStdout(), "Completed (%0.2fs)", float64(event.Duration)/float64(time.Second))
case *taskrunner.TaskFailedEvent:
fmt.Fprintf(event.TaskHandler().LogStdout(), "Failed")
fmt.Fprintln(event.TaskHandler().LogStdout(), event.Error)
case *taskrunner.TaskDiagnosticEvent:
fmt.Fprintf(event.TaskHandler().LogStdout(), "Warning: %s", event.Error.Error())
case *taskrunner.TaskStoppedEvent:
fmt.Fprintf(event.TaskHandler().LogStdout(), "Stopped")
}
}

return nil
})
}
18 changes: 7 additions & 11 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,22 +82,13 @@ func (e *Executor) Config() *Config { return e.config }
// Subscribe returns a channel of executor-level events. Each invocation
// of Events() returns a new channel. The done function should be called
// to unregister this channel.
func (e *Executor) Subscribe() (events <-chan ExecutorEvent, done func()) {
func (e *Executor) Subscribe() (events <-chan ExecutorEvent) {
e.subscriptionsMu.Lock()
defer e.subscriptionsMu.Unlock()

once := sync.Once{}

ch := make(chan ExecutorEvent, 1024)
e.eventsChs[ch] = struct{}{}
return ch, func() {
once.Do(func() {
e.subscriptionsMu.Lock()
defer e.subscriptionsMu.Unlock()
close(ch)
delete(e.eventsChs, ch)
})
}
return ch
}

func (e *Executor) publishEvent(event ExecutorEvent) {
Expand Down Expand Up @@ -180,6 +171,11 @@ func (e *Executor) Invalidate(task *Task, event InvalidationEvent) {

func (e *Executor) Run(ctx context.Context, taskNames ...string) error {
e.ctx = ctx
defer func() {
for ch := range e.eventsChs {
close(ch)
}
}()
e.runInvalidationLoop()
if e.config.Watch {
e.runWatch(ctx)
Expand Down
9 changes: 4 additions & 5 deletions executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ func TestExecutorInvalidations(t *testing.T) {
}

tasks := []*taskrunner.Task{taskA, taskB}
executor := taskrunner.NewExecutor(config, tasks)

for _, testcase := range []struct {
Name string
Expand All @@ -140,9 +139,9 @@ func TestExecutorInvalidations(t *testing.T) {
{
"dependency invalidated",
func(t *testing.T) {
executor := taskrunner.NewExecutor(config, tasks)
ctx, cancel := context.WithCancel(context.Background())
events, done := executor.Subscribe()
defer done()
events := executor.Subscribe()

go func() {
assert.Equal(t, taskA, consumeUntil(t, events, taskrunner.ExecutorEventKind_TaskCompleted).TaskHandler().Definition(), "expected first task to be taskA")
Expand All @@ -166,9 +165,9 @@ func TestExecutorInvalidations(t *testing.T) {
{
"leaf invalidated",
func(t *testing.T) {
executor := taskrunner.NewExecutor(config, tasks)
ctx, cancel := context.WithCancel(context.Background())
events, done := executor.Subscribe()
defer done()
events := executor.Subscribe()

go func() {
assert.Equal(t, taskA, consumeUntil(t, events, taskrunner.ExecutorEventKind_TaskCompleted).TaskHandler().Definition(), "expected first task to be taskA")
Expand Down
3 changes: 1 addition & 2 deletions gqltaskrunner/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,7 @@ func (server *Server) Run(ctx context.Context) error {
Handler: router,
}

events, done := server.Executor.Subscribe()
defer done()
events := server.Executor.Subscribe()

server.Broadcaster = NewBroadcaster(events)
server.Broadcaster.Run()
Expand Down
47 changes: 32 additions & 15 deletions runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,35 @@ func init() {
flags.BoolVar(&listTasks, "list", false, "List all tasks")
}

type RunOptions struct {
ReporterFns []func(ctx context.Context, executor *Executor) error
ExecutorOptions []ExecutorOption
type Runtime struct {
subscriptions []func(events <-chan ExecutorEvent) error
onStartHooks []func(ctx context.Context, executor *Executor) error
onStopHooks []func(ctx context.Context, executor *Executor) error
executorOptions []ExecutorOption
}

type RunOption func(options *RunOptions)
func (r *Runtime) Subscribe(f func(events <-chan ExecutorEvent) error) {
r.subscriptions = append(r.subscriptions, f)
}

func (r *Runtime) OnStart(f func(context.Context, *Executor) error) {
r.onStartHooks = append(r.onStartHooks, f)
}

func (r *Runtime) OnStop(f func(context.Context, *Executor) error) {
r.onStopHooks = append(r.onStopHooks, f)
}

type RunOption func(options *Runtime)

func ExecutorOptions(opts ...ExecutorOption) RunOption {
return func(r *RunOptions) {
r.ExecutorOptions = append(r.ExecutorOptions, opts...)
return func(r *Runtime) {
r.executorOptions = append(r.executorOptions, opts...)
}
}

func Run(tasks []*Task, options ...RunOption) {
runOptions := &RunOptions{}
runOptions := &Runtime{}
for _, option := range options {
option(runOptions)
}
Expand Down Expand Up @@ -73,7 +87,7 @@ func Run(tasks []*Task, options ...RunOption) {
}

log.Println("Using config", config.ConfigFilePath())
executor := NewExecutor(config, tasks, runOptions.ExecutorOptions...)
executor := NewExecutor(config, tasks, runOptions.executorOptions...)

desiredTasks := config.DesiredTasks
config.Watch = !nonInteractive
Expand All @@ -91,21 +105,24 @@ func Run(tasks []*Task, options ...RunOption) {
ctx, cancel := context.WithCancel(context.Background())
onInterruptSignal(cancel)

g, ctx := errgroup.WithContext(ctx)
for _, hook := range runOptions.onStartHooks {
hook(ctx, executor)
}

// Reporters should use a different context because we want to stage
// their cancellation after the executor itself has been completed.
reporterCtx, cancelReporter := context.WithCancel(context.Background())
for _, reporterFn := range runOptions.ReporterFns {
g, ctx := errgroup.WithContext(ctx)
for _, sub := range runOptions.subscriptions {
g.Go(func() error {
return reporterFn(reporterCtx, executor)
return sub(executor.Subscribe())
})
}

g.Go(func() error {
defer cancelReporter()
err := executor.Run(ctx, desiredTasks...)

for _, hook := range runOptions.onStopHooks {
hook(ctx, executor)
}

// We only care about propagating errors up to the errgroup
// if we were not in watch mode.
if !config.Watch {
Expand Down

0 comments on commit d672483

Please sign in to comment.