From ecfb9820e37309c805e7b273af118eddc0fe6947 Mon Sep 17 00:00:00 2001 From: Diogo Behrens Date: Tue, 1 May 2018 20:43:38 +0200 Subject: [PATCH] introduce runset to start/stop views and processors --- processor_test.go | 8 ++-- runset.go | 55 ++++++++++++++++++++++ runset_test.go | 117 ++++++++++++++++++++++++++++++++++++++++++++++ view_test.go | 6 ++- 4 files changed, 180 insertions(+), 6 deletions(-) create mode 100644 runset.go create mode 100644 runset_test.go diff --git a/processor_test.go b/processor_test.go index 4ce0272f..512a8fbd 100644 --- a/processor_test.go +++ b/processor_test.go @@ -1759,23 +1759,23 @@ func TestProcessor_failOnRecover(t *testing.T) { // Example shows how to use a callback. For each partition of the topics, a new // goroutine will be created. Topics should be co-partitioned (they should have // the same number of partitions and be partitioned by the same key). -func ExampleProcessor_simplest() { +func ExampleProcessor() { var ( brokers = []string{"127.0.0.1:9092"} group Group = "group" topic Stream = "topic" ) - consume := func(ctx Context, m interface{}) { + f := func(ctx Context, m interface{}) { fmt.Printf("Hello world: %v", m) } - p, err := NewProcessor(brokers, DefineGroup(group, Input(topic, rawCodec, consume))) + p, err := NewProcessor(brokers, DefineGroup(group, Input(topic, rawCodec, f))) if err != nil { log.Fatalln(err) } - // start consumer with a goroutine (blocks) + // start processor with a goroutine (blocks) ctx, cancel := context.WithCancel(context.Background()) go func() { err := p.Run(ctx) diff --git a/runset.go b/runset.go new file mode 100644 index 00000000..99bc201f --- /dev/null +++ b/runset.go @@ -0,0 +1,55 @@ +package goka + +import ( + "context" + + "github.com/lovoo/goka/multierr" +) + +// Runnable represents a component that runs goroutines. +type Runnable interface { + // Run starts the runnable and canceling the context stops it. + Run(context.Context) error +} + +// Runset manages the lifecyle of a set of runnables (processors or +// views). All runnables are started together and as soon as the first stops, +// all other runnables are also stopped. +type Runset struct { + ctx context.Context + cancel func() + grp *multierr.ErrGroup +} + +// Start one or more runnables and return a Runset object. +func Start(runnables ...Runnable) *Runset { + ctx, cancel := context.WithCancel(context.Background()) + grp, ctx := multierr.NewErrGroup(ctx) + + s := Runset{ctx, cancel, grp} + + for _, r := range runnables { + grp.Go(func() error { + defer cancel() + return r.Run(ctx) + }) + } + + return &s +} + +// Stop all runnables in the runset. +func (r *Runset) Stop() { + r.cancel() +} + +// Wait for all runnables to stop, returning the aggregated errors if any. +func (r *Runset) Wait() error { + return r.grp.Wait().NilOrError() +} + +// Done returns a channel that is closed once the Runset is stopping. Runnables +// may not yet have returned when the channel is closed. +func (r *Runset) Done() <-chan struct{} { + return r.ctx.Done() +} diff --git a/runset_test.go b/runset_test.go new file mode 100644 index 00000000..c022538e --- /dev/null +++ b/runset_test.go @@ -0,0 +1,117 @@ +package goka + +import ( + "context" + "errors" + "fmt" + "log" + "os" + "os/signal" + "syscall" + "testing" + + "github.com/facebookgo/ensure" +) + +type mockRunnable struct { + ch chan error +} + +func newMockRunnable() *mockRunnable { + return &mockRunnable{ + ch: make(chan error), + } +} + +func (r *mockRunnable) Run(ctx context.Context) error { + select { + case <-ctx.Done(): + return nil + case err := <-r.ch: + return err + } +} + +func TestRunset_Start(t *testing.T) { + p := newMockRunnable() + rs := Start(p) + rs.Stop() + err := doTimed(t, func() { + err := rs.Wait() + ensure.Nil(t, err) + }) + ensure.Nil(t, err) + + // should not return + p = newMockRunnable() + rs = Start(p) + err = doTimed(nil, func() { + _ = rs.Wait() + }) + ensure.NotNil(t, err) + rs.Stop() + + // should return if terminates without error + p = newMockRunnable() + rs = Start(p) + close(p.ch) + err = doTimed(nil, func() { + err := rs.Wait() + ensure.Nil(t, err) + }) + ensure.Nil(t, err) + + // should return if terminates with error + p = newMockRunnable() + rs = Start(p) + p.ch <- errors.New("some error") + err = doTimed(nil, func() { + err := rs.Wait() + ensure.NotNil(t, err) + }) + ensure.Nil(t, err) + + // should return + p = newMockRunnable() + rs = Start(p) + p.ch <- errors.New("some error") + err = doTimed(nil, func() { + <-rs.Done() + }) + ensure.Nil(t, err) + +} + +// Example shows how to control the lifecycle of runnables (processors or +// views) using Runsets. +func ExampleRunset() { + var ( + brokers = []string{"127.0.0.1:9092"} + group Group = "group" + topic Stream = "topic" + ) + + f := func(ctx Context, m interface{}) { + fmt.Printf("Hello world: %v", m) + } + + p, err := NewProcessor(brokers, DefineGroup(group, Input(topic, rawCodec, f))) + if err != nil { + log.Fatalln(err) + } + + // start processor creating a Runset. + rs := Start(p) + + // wait for bad things to happen + wait := make(chan os.Signal, 1) + signal.Notify(wait, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM) + select { + case <-rs.Done(): + case <-wait: // wait for SIGINT/SIGTERM + rs.Stop() // gracefully stop processor + } + if err := rs.Wait(); err != nil { + log.Fatalln(err) + } +} diff --git a/view_test.go b/view_test.go index d5d8554a..61ba84e7 100644 --- a/view_test.go +++ b/view_test.go @@ -488,7 +488,9 @@ func doTimed(t *testing.T, do func()) error { select { case <-time.After(2 * time.Second): - t.Fail() + if t != nil { + t.Fail() + } return errors.New("function took too long to complete") case <-ch: } @@ -496,7 +498,7 @@ func doTimed(t *testing.T, do func()) error { return nil } -func ExampleView_simple() { +func ExampleView() { var ( brokers = []string{"localhost:9092"} group Group = "group-name"