Skip to content

Commit

Permalink
introduce runset to start/stop views and processors
Browse files Browse the repository at this point in the history
  • Loading branch information
Diogo Behrens committed May 3, 2018
1 parent 4037dcd commit ecfb982
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 6 deletions.
8 changes: 4 additions & 4 deletions processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1759,23 +1759,23 @@ func TestProcessor_failOnRecover(t *testing.T) {
// Example shows how to use a callback. For each partition of the topics, a new
// goroutine will be created. Topics should be co-partitioned (they should have
// the same number of partitions and be partitioned by the same key).
func ExampleProcessor_simplest() {
func ExampleProcessor() {
var (
brokers = []string{"127.0.0.1:9092"}
group Group = "group"
topic Stream = "topic"
)

consume := func(ctx Context, m interface{}) {
f := func(ctx Context, m interface{}) {
fmt.Printf("Hello world: %v", m)
}

p, err := NewProcessor(brokers, DefineGroup(group, Input(topic, rawCodec, consume)))
p, err := NewProcessor(brokers, DefineGroup(group, Input(topic, rawCodec, f)))
if err != nil {
log.Fatalln(err)
}

// start consumer with a goroutine (blocks)
// start processor with a goroutine (blocks)
ctx, cancel := context.WithCancel(context.Background())
go func() {
err := p.Run(ctx)
Expand Down
55 changes: 55 additions & 0 deletions runset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package goka

import (
"context"

"github.com/lovoo/goka/multierr"
)

// Runnable represents a component that runs goroutines.
type Runnable interface {
// Run starts the runnable and canceling the context stops it.
Run(context.Context) error
}

// Runset manages the lifecyle of a set of runnables (processors or
// views). All runnables are started together and as soon as the first stops,
// all other runnables are also stopped.
type Runset struct {
ctx context.Context
cancel func()
grp *multierr.ErrGroup
}

// Start one or more runnables and return a Runset object.
func Start(runnables ...Runnable) *Runset {
ctx, cancel := context.WithCancel(context.Background())
grp, ctx := multierr.NewErrGroup(ctx)

s := Runset{ctx, cancel, grp}

for _, r := range runnables {
grp.Go(func() error {
defer cancel()
return r.Run(ctx)
})
}

return &s
}

// Stop all runnables in the runset.
func (r *Runset) Stop() {
r.cancel()
}

// Wait for all runnables to stop, returning the aggregated errors if any.
func (r *Runset) Wait() error {
return r.grp.Wait().NilOrError()
}

// Done returns a channel that is closed once the Runset is stopping. Runnables
// may not yet have returned when the channel is closed.
func (r *Runset) Done() <-chan struct{} {
return r.ctx.Done()
}
117 changes: 117 additions & 0 deletions runset_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package goka

import (
"context"
"errors"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"testing"

"github.com/facebookgo/ensure"
)

type mockRunnable struct {
ch chan error
}

func newMockRunnable() *mockRunnable {
return &mockRunnable{
ch: make(chan error),
}
}

func (r *mockRunnable) Run(ctx context.Context) error {
select {
case <-ctx.Done():
return nil
case err := <-r.ch:
return err
}
}

func TestRunset_Start(t *testing.T) {
p := newMockRunnable()
rs := Start(p)
rs.Stop()
err := doTimed(t, func() {
err := rs.Wait()
ensure.Nil(t, err)
})
ensure.Nil(t, err)

// should not return
p = newMockRunnable()
rs = Start(p)
err = doTimed(nil, func() {
_ = rs.Wait()
})
ensure.NotNil(t, err)
rs.Stop()

// should return if terminates without error
p = newMockRunnable()
rs = Start(p)
close(p.ch)
err = doTimed(nil, func() {
err := rs.Wait()
ensure.Nil(t, err)
})
ensure.Nil(t, err)

// should return if terminates with error
p = newMockRunnable()
rs = Start(p)
p.ch <- errors.New("some error")
err = doTimed(nil, func() {
err := rs.Wait()
ensure.NotNil(t, err)
})
ensure.Nil(t, err)

// should return
p = newMockRunnable()
rs = Start(p)
p.ch <- errors.New("some error")
err = doTimed(nil, func() {
<-rs.Done()
})
ensure.Nil(t, err)

}

// Example shows how to control the lifecycle of runnables (processors or
// views) using Runsets.
func ExampleRunset() {
var (
brokers = []string{"127.0.0.1:9092"}
group Group = "group"
topic Stream = "topic"
)

f := func(ctx Context, m interface{}) {
fmt.Printf("Hello world: %v", m)
}

p, err := NewProcessor(brokers, DefineGroup(group, Input(topic, rawCodec, f)))
if err != nil {
log.Fatalln(err)
}

// start processor creating a Runset.
rs := Start(p)

// wait for bad things to happen
wait := make(chan os.Signal, 1)
signal.Notify(wait, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
select {
case <-rs.Done():
case <-wait: // wait for SIGINT/SIGTERM
rs.Stop() // gracefully stop processor
}
if err := rs.Wait(); err != nil {
log.Fatalln(err)
}
}
6 changes: 4 additions & 2 deletions view_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,15 +488,17 @@ func doTimed(t *testing.T, do func()) error {

select {
case <-time.After(2 * time.Second):
t.Fail()
if t != nil {
t.Fail()
}
return errors.New("function took too long to complete")
case <-ch:
}

return nil
}

func ExampleView_simple() {
func ExampleView() {
var (
brokers = []string{"localhost:9092"}
group Group = "group-name"
Expand Down

0 comments on commit ecfb982

Please sign in to comment.