Skip to content

Commit

Permalink
Merge pull request #127 from lovoo/feature/run-context
Browse files Browse the repository at this point in the history
Feature/run context
  • Loading branch information
db7 committed May 3, 2018
2 parents d58a314 + db62921 commit 4037dcd
Show file tree
Hide file tree
Showing 22 changed files with 217 additions and 205 deletions.
9 changes: 7 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ To locally start a dockerized Zookeeper and Kafka instances, execute `make start
package main

import (
"context"
"fmt"
"log"
"os"
Expand Down Expand Up @@ -124,16 +125,20 @@ func runProcessor() {
if err != nil {
log.Fatalf("error creating processor: %v", err)
}
ctx, cancel := context.WithCancel(context.Background())
done := make(chan bool)
go func() {
if err = p.Start(); err != nil {
defer close(done)
if err = p.Run(ctx); err != nil {
log.Fatalf("error running processor: %v", err)
}
}()

wait := make(chan os.Signal, 1)
signal.Notify(wait, syscall.SIGINT, syscall.SIGTERM)
<-wait // wait for SIGINT/SIGTERM
p.Stop() // gracefully stop processor
cancel() // gracefully stop processor
<-done
}

func main() {
Expand Down
17 changes: 4 additions & 13 deletions examples/1-simplest/main.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package main

import (
"context"
"fmt"
"log"
"os"
"os/signal"
"syscall"

"github.com/lovoo/goka"
"github.com/lovoo/goka/codec"
Expand Down Expand Up @@ -60,16 +58,9 @@ func runProcessor() {
if err != nil {
log.Fatalf("error creating processor: %v", err)
}
go func() {
if err = p.Start(); err != nil {
log.Fatalf("error running processor: %v", err)
}
}()

wait := make(chan os.Signal, 1)
signal.Notify(wait, syscall.SIGINT, syscall.SIGTERM)
<-wait // wait for SIGINT/SIGTERM
p.Stop() // gracefully stop processor
if err = p.Run(context.Background()); err != nil {
log.Fatalf("error running processor: %v", err)
}
}

func main() {
Expand Down
5 changes: 2 additions & 3 deletions examples/2-clicks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ update it in our group table (10) by calling `ctx.SetValue(u)`.
We conclude process() with a print statement showing message’s content, the
current count of the user, and the user ID fetched with ctx.Key().

The context interface never returns errors to the callbacks. Instead, if an error is encountered while executing the context functions, the processor instance is stopped and its Start() method returns an error.
The context interface never returns errors to the callbacks. Instead, if an error is encountered while executing the context functions, the processor instance is stopped and its Run() method returns an error.


We configure the processor using `goka.DefineGroup`, which we later
Expand Down Expand Up @@ -128,8 +128,7 @@ func runView() {
if err != nil {
panic(err)
}
go view.Start()
defer view.Stop()
go view.Run(context.Background())

root := mux.NewRouter()
root.HandleFunc("/{key}", func(w http.ResponseWriter, r *http.Request) {
Expand Down
14 changes: 5 additions & 9 deletions examples/2-clicks/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"encoding/json"
"fmt"
"net/http"
Expand Down Expand Up @@ -91,12 +92,7 @@ func runProcessor() {
panic(err)
}

err = p.Start()
if err != nil {
panic(err)
} else {
fmt.Println("Processor stopped without errors")
}
p.Run(context.Background())
}

func runView() {
Expand All @@ -107,8 +103,6 @@ func runView() {
if err != nil {
panic(err)
}
go view.Start()
defer view.Stop()

root := mux.NewRouter()
root.HandleFunc("/{key}", func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -117,7 +111,9 @@ func runView() {
w.Write(data)
})
fmt.Println("View opened at http://localhost:9095/")
http.ListenAndServe(":9095", root)
go http.ListenAndServe(":9095", root)

view.Run(context.Background())
}

func main() {
Expand Down
6 changes: 0 additions & 6 deletions examples/3-messaging/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,6 @@ g := goka.DefineGroup(goka.Group("collector"),
goka.Input(messaging.ReceivedStream, new(messaging.MessageCodec), collect),
)
p, _ := goka.NewProcessor(brokers, g)
go p.Start()

```

### Feed endpoint
Expand Down Expand Up @@ -165,7 +163,6 @@ view, _ := goka.NewView(
collector.Table,
new(collector.MessageListCodec),
)
go view.Start()
router.HandleFunc("/{user}/feed", feed(view)).Methods("GET")
```

Expand Down Expand Up @@ -289,8 +286,6 @@ g := goka.DefineGroup(goka.Group("filter"),
)

p, _ := goka.NewProcessor(brokers, g)
_ = p.Start()

```

Nothing has to be changed in the collector processor or in the feed endpoint.
Expand Down Expand Up @@ -493,7 +488,6 @@ g := goka.DefineGroup(goka.Group("detector"),
)

p, _ := goka.NewProcessor(brokers, g)
_ = p.Start()
```

### Running the example
Expand Down
21 changes: 12 additions & 9 deletions examples/3-messaging/blocker/blocker.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package blocker

import (
"context"
"encoding/json"

"github.com/lovoo/goka"
Expand Down Expand Up @@ -57,14 +58,16 @@ func block(ctx goka.Context, msg interface{}) {
ctx.SetValue(s)
}

func Run(brokers []string) {
g := goka.DefineGroup(group,
goka.Input(Stream, new(BlockEventCodec), block),
goka.Persist(new(BlockValueCodec)),
)
if p, err := goka.NewProcessor(brokers, g); err != nil {
panic(err)
} else if err = p.Start(); err != nil {
panic(err)
func Run(ctx context.Context, brokers []string) func() error {
return func() error {
g := goka.DefineGroup(group,
goka.Input(Stream, new(BlockEventCodec), block),
goka.Persist(new(BlockValueCodec)),
)
p, err := goka.NewProcessor(brokers, g)
if err != nil {
return err
}
return p.Run(ctx)
}
}
25 changes: 17 additions & 8 deletions examples/3-messaging/cmd/processor/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"flag"
"log"
"os"
Expand All @@ -12,6 +13,7 @@ import (
"github.com/lovoo/goka/examples/3-messaging/detector"
"github.com/lovoo/goka/examples/3-messaging/filter"
"github.com/lovoo/goka/examples/3-messaging/translator"
"golang.org/x/sync/errgroup"
)

var (
Expand All @@ -26,33 +28,40 @@ var (

func main() {
flag.Parse()
ctx, cancel := context.WithCancel(context.Background())
grp, ctx := errgroup.WithContext(ctx)

if *runCollector {
log.Println("starting collector")
go collector.Run(brokers)
grp.Go(collector.Run(ctx, brokers))
}
if *runFilter {
log.Println("starting filter")
go filter.Run(brokers)
grp.Go(filter.Run(ctx, brokers))
}
if *runBlocker {
log.Println("starting blocker")
go blocker.Run(brokers)
grp.Go(blocker.Run(ctx, brokers))
}
if *runDetector {
log.Println("starting detector")
go detector.Run(brokers)
grp.Go(detector.Run(ctx, brokers))
}
if *runTranslator {
log.Println("starting translator")
go translator.Run(brokers)
grp.Go(translator.Run(ctx, brokers))
}

// Wait for SIGINT/SIGTERM
waiter := make(chan os.Signal, 1)
signal.Notify(waiter, syscall.SIGINT, syscall.SIGTERM)

select {
case signal := <-waiter:
log.Printf("Got interrupted by %v", signal)
case <-waiter:
case <-ctx.Done():
}
cancel()
if err := grp.Wait(); err != nil {
log.Println(err)
}
log.Println("done")
}
21 changes: 12 additions & 9 deletions examples/3-messaging/collector/collector.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package collector

import (
"context"
"encoding/json"

"github.com/lovoo/goka"
Expand Down Expand Up @@ -41,14 +42,16 @@ func collect(ctx goka.Context, msg interface{}) {
ctx.SetValue(ml)
}

func Run(brokers []string) {
g := goka.DefineGroup(group,
goka.Input(messaging.ReceivedStream, new(messaging.MessageCodec), collect),
goka.Persist(new(MessageListCodec)),
)
if p, err := goka.NewProcessor(brokers, g); err != nil {
panic(err)
} else if err = p.Start(); err != nil {
panic(err)
func Run(ctx context.Context, brokers []string) func() error {
return func() error {
g := goka.DefineGroup(group,
goka.Input(messaging.ReceivedStream, new(messaging.MessageCodec), collect),
goka.Persist(new(MessageListCodec)),
)
p, err := goka.NewProcessor(brokers, g)
if err != nil {
return err
}
return p.Run(ctx)
}
}
64 changes: 34 additions & 30 deletions examples/3-messaging/detector/detector.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package detector

import (
"context"
"encoding/json"

"github.com/lovoo/goka"
Expand Down Expand Up @@ -48,38 +49,41 @@ func detectSpammer(ctx goka.Context, c *Counters) bool {
return total >= minMessages && rate >= maxRate
}

func Run(brokers []string) {
g := goka.DefineGroup(group,
goka.Input(messaging.SentStream, new(messaging.MessageCodec), func(ctx goka.Context, msg interface{}) {
c := getValue(ctx)
c.Sent++
ctx.SetValue(c)
func Run(ctx context.Context, brokers []string) func() error {
return func() error {
g := goka.DefineGroup(group,
goka.Input(messaging.SentStream, new(messaging.MessageCodec), func(ctx goka.Context, msg interface{}) {
c := getValue(ctx)
c.Sent++
ctx.SetValue(c)

// check if sender is a spammer
if detectSpammer(ctx, c) {
ctx.Emit(blocker.Stream, ctx.Key(), new(blocker.BlockEvent))
}
// check if sender is a spammer
if detectSpammer(ctx, c) {
ctx.Emit(blocker.Stream, ctx.Key(), new(blocker.BlockEvent))
}

// Loop to receiver
m := msg.(*messaging.Message)
ctx.Loopback(m.To, m)
}),
goka.Loop(new(messaging.MessageCodec), func(ctx goka.Context, msg interface{}) {
c := getValue(ctx)
c.Received++
ctx.SetValue(c)
// Loop to receiver
m := msg.(*messaging.Message)
ctx.Loopback(m.To, m)
}),
goka.Loop(new(messaging.MessageCodec), func(ctx goka.Context, msg interface{}) {
c := getValue(ctx)
c.Received++
ctx.SetValue(c)

// check if receiver is a spammer
if detectSpammer(ctx, c) {
ctx.Emit(blocker.Stream, ctx.Key(), new(blocker.BlockEvent))
}
}),
goka.Output(blocker.Stream, new(blocker.BlockEventCodec)),
goka.Persist(new(CountersCodec)),
)
if p, err := goka.NewProcessor(brokers, g); err != nil {
panic(err)
} else if err = p.Start(); err != nil {
panic(err)
// check if receiver is a spammer
if detectSpammer(ctx, c) {
ctx.Emit(blocker.Stream, ctx.Key(), new(blocker.BlockEvent))
}
}),
goka.Output(blocker.Stream, new(blocker.BlockEventCodec)),
goka.Persist(new(CountersCodec)),
)
p, err := goka.NewProcessor(brokers, g)
if err != nil {
return err
}

return p.Run(ctx)
}
}
Loading

0 comments on commit 4037dcd

Please sign in to comment.