diff --git a/README.md b/README.md index dd7d6ec1..d36fc98d 100644 --- a/README.md +++ b/README.md @@ -28,8 +28,9 @@ Goka extends the concept of Kafka consumer groups by binding a state table to th ## Documentation This README provides a brief, high level overview of the ideas behind Goka. +A more detailed introduction of the project can be found in this [blog post](https://tech.lovoo.com/2017/05/23/goka/). -Package API documentation is available at [GoDoc]. +Package API documentation is available at [GoDoc] and the [Wiki](https://github.com/lovoo/goka/wiki/Tips#configuring-log-compaction-for-table-topics) provides several tips for configuring, extending, and deploying Goka applications. ## Installation @@ -45,9 +46,9 @@ Goka relies on Kafka for message passing, fault-tolerant state storage and workl * **Processor** is a set of callback functions that consume and perform state transformations upon delivery of these emitted messages. *Processor groups* are formed of one or more instances of a processor. Goka distributes the partitions of the input topics across all processor instances in a processor group. This enables effortless scaling and fault-tolerance. If a processor instance fails, its partitions and state are reassigned to the remaining healthy members of the processor group. Processors can also emit further messages into Kafka. -* **Group tables** are partitioned key-value tables stored in Kafka that belong to a single processor group. If a processor instance fails, the remaining instances will take over the group table partitions of the failed instance recovering them from Kafka. +* **Group table** is the state of a processor group. It is a partitioned key-value table stored in Kafka that belongs to a single processor group. If a processor instance fails, the remaining instances will take over the group table partitions of the failed instance recovering them from Kafka. -* **Views** are local caches of a processor group's complete group table. Views provide read-only access to the group tables and can be used to provide external services for example through a gRPC interface. +* **Views** are local caches of a complete group table. Views provide read-only access to the group tables and can be used to provide external services for example through a gRPC interface. ## Get Started @@ -55,7 +56,7 @@ Goka relies on Kafka for message passing, fault-tolerant state storage and workl An example Goka application could look like the following. An emitter emits a single message with key "some-key" and value "some-value" into the "example-stream" topic. A processor processes the "example-stream" topic counting the number of messages delivered for "some-key". -The counter is persisted in the "example-group-state" topic. +The counter is persisted in the "example-group-table" topic. To locally start a dockerized Zookeeper and Kafka instances, execute `make start` with the `Makefile` in the [examples] folder. ```go @@ -111,7 +112,7 @@ func runProcessor() { } // Define a new processor group. The group defines all inputs, outputs, and - // serialization formats. The group-table topic is "example-group-state". + // serialization formats. The group-table topic is "example-group-table". g := goka.DefineGroup(group, goka.Input(topic, new(codec.String), cb), goka.Persist(new(codec.Int64)), @@ -139,6 +140,9 @@ func main() { } ``` +Note that tables have to be configured in Kafka with log compaction. +For details check the [Wiki](https://github.com/lovoo/goka/wiki/Tips#configuring-log-compaction-for-table-topics). + ## How to contribute Contributions are always welcome. diff --git a/examples/1-simplest/main.go b/examples/1-simplest/main.go index 0d4fde9a..840a3227 100644 --- a/examples/1-simplest/main.go +++ b/examples/1-simplest/main.go @@ -50,7 +50,7 @@ func runProcessor() { } // Define a new processor group. The group defines all inputs, outputs, and - // serialization formats. The group-table topic is "example-group-state". + // serialization formats. The group-table topic is "example-group-table". g := goka.DefineGroup(group, goka.Input(topic, new(codec.String), cb), goka.Persist(new(codec.Int64)), diff --git a/examples/2-clicks/README.md b/examples/2-clicks/README.md index 10dbb7fa..f5cecd11 100644 --- a/examples/2-clicks/README.md +++ b/examples/2-clicks/README.md @@ -6,7 +6,7 @@ This example shows how to: * Write a processor that consumes data from kafka, counting clicks for a user * Write an emitter to push data to kafka -* Writing a view to query the user state +* Writing a view to query the user table To get an introduction into goka, see this [blog post](http://tech.lovoo.com/2017/05/23/goka). @@ -22,7 +22,7 @@ go run main.go This should output something like ``` -2017/05/23 15:09:20 Table mini-group-state has 10 partitions +2017/05/23 15:09:20 Table mini-group-table has 10 partitions 2017/05/23 15:09:20 Processor: started View opened at http://localhost:9095/ 2017/05/23 15:09:20 View: started @@ -95,9 +95,9 @@ The consumer of a topic must use the same codec as the writer, otherwise we'll g unmarshalling will simply fail. * `goka.Persist` makes the processor store its group table persistently using kafka. That means on every -restart (either the same host or somewhere else), the state will be restored. +restart (either the same host or somewhere else), the group table will be restored. This option also makes the processor cache the group table locally using a key-value store. -That avoids holding the full state in memory and a long-running recovery on every restart. +That avoids holding the full group table in memory and a long-running recovery on every restart. To persist the group table, again we need a `Codec` which encodes the user for this case. We want to store objects of type `*user`, so we have to implement our own codec. In our example, diff --git a/examples/3-messaging/README.md b/examples/3-messaging/README.md new file mode 100644 index 00000000..c099bced --- /dev/null +++ b/examples/3-messaging/README.md @@ -0,0 +1,511 @@ +# A long example: Building a messaging service + +In this example, we build a ficticious messaging service that employs several features available in Goka. +Our main goal is to explore all methods available in `Context` and to illustrate how components can be composed with joins and lookups. + +The messaging service offers only two endpoints: + +1. `localhost:8080/{user}/send` +2. `localhost:8080/{user}/feed` + +The *user* in the URLs refers to the user performing the request. +The *send* endpoint takes JSON data containing the recipient and the content of a message and emits it to Kafka. +The *feed* endpoint shows the latest 5 messages received by the user. + +We will develop the example in 4 steps, building the pipeline between both endpoints: + +1. We start with a simple implementation that collects emitted messages in a table. +2. Next, we add the capability of blocking annoying users. +3. We then introduce a word translator to help (or amuse) users. +4. Finally, we block annoying users automatically with a simplistic spam detector. + +## 1. Basic components and features + +Goka provides three components to build systems: emitters, processors, and views. +The following figure depicts our initial design using these three components together with Kafka and the endpoints. + +![Architecture](figs/goka-arch-simple.png) + +The architecture here follows the approach in [this blog post](https://tech.lovoo.com/2017/05/23/goka/). + +### Send endpoint + +The main type we will be dealing with is the [`Message`](message.go#L14) type: + +```go +type Message struct { + From string + To string + Content string +} +``` + +If Bob wants to send a message to Alice, he would send a request to the send endpoint with the recipient and the content of the message. +For example: + +```sh +curl -X POST \ + -d '{"to": "Alice", "content": "Hey, how are you doing?"}' \ + http://localhost:8080/Bob/send +``` + +The send handler parses message type, completing the `From` field. +Afterwards, it emits the message into the `ReceivedStream` topic using the receiver user as key (`m.To`): + +```go +func send(emitter *goka.Emitter) func(w http.ResponseWriter, r *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + m := messaging.Message{"From": mux.Vars(r)["user"]} + b, _ := ioutil.ReadAll(r.Body) + json.Unmarshal(b, &m) + emitter.EmitSync(m.To, &m) + } +} +``` + +The emitter is configured to only emit into the `ReceivedStream` topic and to use `MessageCodec` to encode the message `m`. +This is how the emitter is [created](service/service.go#L24): +```go +emitter, _ := goka.NewEmitter( + brokers, + messaging.ReceivedStream, + new(messaging.MessageCodec), +) +router.HandleFunc("/{user}/send", send(emitter)).Methods("POST") +``` + +Note we are ignoring errors for the sake of readability. +The complete example in the repository handles them, though. + +### Collecting messages with `Context.Value()` and `Context.SetValue()` + +We define the *collector table* to contain the latest 5 messages received by each user. +The *collector processor* keeps the table up-to-date by consuming `ReceivedStream`. +The collector callback is defined as [follows](collector/collector.go#L29): + +```go +// collect callback is called for every message from ReceivedStream. +// ctx allows access to collector table and msg is the input message. +func collect(ctx goka.Context, msg interface{}) { + var ml []messaging.Message + if v := ctx.Value(); v != nil { + ml = v.([]messaging.Message) + } + + m := msg.(*messaging.Message) + ml = append(ml, *m) + + if len(ml) > maxMessages { + ml = ml[len(ml)-maxMessages-1:] + } + ctx.SetValue(ml) +} + +``` + +The `ctx` is scoped with the key of the input message -- remember we used the receiver as key in the emitter. +With `ctx.Value()` we fetch the table value for that key. +In this processor, the value is a slice of messages. +We then append the received message and cap the length of the slice with the constant `maxMessages`, which is 5. +Finally, we store the value back in the table with `ctx.SetValue()`. + + +To create the processor, we need to define the group input stream and table persistency: +```go +g := goka.DefineGroup(goka.Group("collector"), + // the group table ("collector-table") persists message lists + goka.Persist(new(MessageListCodec)), + // input stream is ReceivedStream with MessageCodec and collect callback + goka.Input(messaging.ReceivedStream, new(messaging.MessageCodec), collect), +) +p, _ := goka.NewProcessor(brokers, g) +go p.Start() + +``` + +### Feed endpoint + +When Alice wants to see her 5 latest received messages, she requests that from the feed endpoint. +For example: +``` +$ curl localhost:8080/Alice/feed +Latest messages for Alice +0 Bob: Hey, how are you doing? +1 Charlie: See you later. +``` + +The handler employs a view on `collector.Table` to retrieve the messages for Alice. +It gets the user from the URL and tries to get the value from the view. +If no value is available, the user has received no messages yet. +Otherwise, the handler loops over the messages in the slice and formats the output. + +```go +func feed(view *goka.View) func(w http.ResponseWriter, r *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + user := mux.Vars(r)["user"] + val, _ := view.Get(user) + if val == nil { + fmt.Fprintf(w, "%s not found!", user) + return + } + messages := val.([]messaging.Message) + fmt.Fprintf(w, "Latest messages for %s\n", user) + for i, m := range messages { + fmt.Fprintf(w, "%d %10s: %v\n", i, m.From, m.Content) + } + } +} +``` + +When creating the view, it is configured to watch the `collector.Table` and use `collector.MessageListCodec` to decode table values. + +```go +view, _ := goka.NewView( + brokers, + collector.Table, + new(collector.MessageListCodec), +) +go view.Start() +router.HandleFunc("/{user}/feed", feed(view)).Methods("GET") +``` + +`MessageListCodec` simply encodes and decodes slices of `Message`s into and from JSON (see [implementation](message.go#L31) for details). + +### Running the example + +To make the system actually run, we still have to decide how to combine these components. +One option is to start emitter, processor, and view all in the same Go program, but that would make the system inflexible because we cannot scale the components independently. +We can also have each component running in its own Go program, but that may complicate deployment. +In this example, we put the endpoint handlers and, consequently, emitter and view in the same Go program. +In another Go program, we start the collector processor. +This solution allows us to start, stop, and scale them independently. + +Before starting any Go program, run `make start` in `examples` to start Docker containers for ZooKeeper and Kafka. +Next, to start the service, change directory to `examples/3-messaging` and type: + +```sh +go run cmd/service/main.go # start endpoint handlers, emitter and view +``` + +In another terminal, start the processor: + +```sh +go run cmd/processor/main.go -collector # start collector processor +``` + +After you started both Go programs, you can use `curl` to see the messages sent to Alice: + +```sh +curl localhost:8080/Alice/feed +``` + +or open [http://localhost:8080/Alice/feed](http://localhost:8080/Alice/feed) in the browser. + +You can send messages using `curl`, for example, + +```sh +curl -X POST \ + -d '{"to": "Alice", "content": "Hey, how are you doing?"}' \ + http://localhost:8080/Bob/send +``` + +To simplify the use of this example, we also have a load generator, which periodically +generates messages and sends them calling the send endpoint. +To start it, type the following in a third terminal: + +```sh +go run cmd/loadgen/main.go +``` + +## 2. Blocking users + +After running the example with the load generator for a while, one can recognize that Alice and other users receive quite a few messages from Bob. +Bob is actually a spammer and should be blocked! + +### Blocker processor + +For that we create a [blocker processor](blocker/blocker.go#L44), which keeps a table of users that have been blocked. +The blocker processor consumes from `blocker.Stream` and stores a `BlockValue` in the `blocker.Table`: + +```go +func block(ctx goka.Context, msg interface{}) { + var s *BlockValue + if v := ctx.Value(); v == nil { + s = new(BlockValue) + } else { + s = v.(*BlockValue) + } + + if msg.(*BlockEvent).Unblock { + s.Blocked = false + } else { + s.Blocked = true + } + ctx.SetValue(s) +} +``` + +To add or remove a user from the blocker table, we can use the command line tool cmd/block-user: + +```sh +go run cmd/block-user/main.go -user Bob # use -unblock to unblock the user +``` + +### Filter messages from blocked users with `Context.Join()` + +Of course, just adding Bob into `blocker.Table` does not yet guarantee users do not receive messages from him. +For that we need to add a filter between the send endpoint and the collector, which drops messages from blocked users before forwarding them to `ReceivedStream`. + +Our block processor already keeps a table with a block flag for blocked keys, i.e., blocked users. +So, we change the emitter such that it emits into `SentStream`, with the key being now the *sender* instead of the recipient. +Next, we introduce a filter processor that consumes from `SentStream` and only if the key is not blocked, it emits the message into `ReceivedStream`. +The filter is stateless, i.e., it updates no group table. +See the following figure for the resulting architecture. + +![Architecture 2](figs/goka-arch-blocker.png) + +The [filter processor](filter/filter.go#L21) fetches the `BlockValue` for the sender by calling `ctx.Join(blocker.Table)`. +If such value exists and the flag is set to true, then the sender is blocked and the message has to be dropped. +Otherwise, the message is forwarded to `ReceivedStream` with the *recipient* as key. + +```go +func filter(ctx goka.Context, msg interface{}) { + // messaging.SentStream and blocker.Table are copartitioned; + // ctx.Join() gets the value in blocker.Table for key given in ctx.Key() + v := ctx.Join(blocker.Table) + if v != nil && v.(*blocker.BlockValue).Blocked { + return + } + ctx.Emit(messaging.ReceivedStream, m.To, m) +} +``` + +The group graph of the filter processor has to be created with an edge to `blocker.Table`: +```go +g := goka.DefineGroup(goka.Group("filter"), + goka.Input(messaging.SentStream, new(messaging.MessageCodec), filter), + goka.Output(messaging.ReceivedStream, new(messaging.MessageCodec)), + goka.Join(blocker.Table, new(blocker.BlockValueCodec)), +) + +p, _ := goka.NewProcessor(brokers, g) +_ = p.Start() + +``` + +Nothing has to be changed in the collector processor or in the feed endpoint. + +### Restarting the example + +At this point, let's make a short recap. So far we have created: + +- a [service](service/service.go) with send and feed endpoints; +- a [collector processor](collector/collector.go) to collect messages sent to users; +- a [block processor](blocker/blocker.go) to keep a table tracking blocked users; +- a [filter processor](filter/filter.go) to drop messages from blocked users before they reach the collector processor; and +- a [block-user tool](cmd/block-user) to add users to the block table. + + +To enable the blocker and filter processors, stop `cmd/processor` and restart it as follows: +```sh +go run cmd/processor/main.go -collector -blocker -filter +``` + +Internally the Go Program will start three Goka processors. +Alternatively, you can run the processors individually by starting the program multiple times with the respective flags. + +We still need to inform the send endpoint to emit into `SentStream`. +For that, restart the process with `-sent` flag: + +```sh +go run cmd/service/main.go -sent +``` + +After blocking Bob with `cmd/block-user`, we can see that quickly Bob disappears from the last received messages of Alice. + +## 3. Adding some l33t speak + +To make our example a bit more interesting and introduce the concept of lookup tables, we extend the filter processor to additionally rewrite the content of messages with [l33tspeak](https://en.wikipedia.org/wiki/Leet). +Before extending the filter, though, we create the translator processor that keeps a mapping from word to translation in a similar fashion as the blocker processor. +Note that, differently from `blocker.Table`, the `translator.Table` has words as keys instead of users. + +The [`cmd/translate-word`](cmd/translate-word/main.go) command allows us to add word translations into the table. +Here are some l33tspeak examples: + +```sh +go run cmd/translate-word/main.go -word "together" -with "t°9e+her" +go run cmd/translate-word/main.go -word "lunch" -with "1[_]n<)-(" +go run cmd/translate-word/main.go -word "Hello" -with "H£1|_°" +``` + +### Querying non-copartitioned tables with `Context.Lookup()` + +The keys in the `translator.Table` are words instead of users, so the filter processor cannot join the table with the `SentStream` based on the keys. +Instead, we should extend add a `Lookup()` edge to the group graph when creating the filter processor as follows: + +```go +g := goka.DefineGroup(goka.Group("filter"), + goka.Lookup(translator.Table, new(translator.ValueCodec)), + ... +) +``` + +We extend the filter callback to drop blocked users and translate messages. + +```go +func filter(ctx goka.Context, msg interface{}) { + if shouldDrop(ctx) { + return + } + m := translate(ctx, msg.(*messaging.Message)) + ctx.Emit(messaging.ReceivedStream, m.To, m) +} + +func shouldDrop(ctx goka.Context) bool { + // Join() returns the value for ctx.Key() in blocker.Table + v := ctx.Join(blocker.Table) + return v != nil && v.(*blocker.BlockValue).Blocked +} + +func translate(ctx goka.Context, m *messaging.Message) *messaging.Message { + words := strings.Split(m.Content, " ") + for i, w := range words { + // Lookup() returns the value for key w in translator.Table + if tw := ctx.Lookup(translator.Table, w); tw != nil { + words[i] = tw.(string) + } + } + return &messaging.Message{ + From: m.From, + To: m.To, + Content: strings.Join(words, " "), + } +} +``` + +The upside of lookup tables is that they can be queried by any key inside the processor callback. +The downside is that if we spawn multiple processor instances, i.e., if we partition the load among multiple program instances, the complete lookup table has to be kept in each of these instances because we cannot know beforehand which table keys will be queried in which stream partitions. +In contrast, joined tables are copartitioned with the input streams and the group table and, consequently, only the partitions served by each processor instance have to be kept up-to-date. + +### Running example + +In step three, we have changed and added some components: + +- added a [translator processor](translator/translator.go) to keep translations of words to l33tspeak; and +- changed the [filter processor](filter/filter.go) to not only drop messages from blocked users but also rewrite messages with l33t translations + +Start `cmd/processor` with `-translator` flag and translate words using `cmd/translate-word`. +No further changes are necessary. + +## 4. Automatic spammer detection + +Our final step in this example is to block spammers automatically. +Let's build -- a rather naive -- spammer detector. + +### Detecting spammers +Assume that spammers have the property of sending many more messages than receiving. +So, if we can detect users that fulfill that property, we can block them. + +We want to build a [detector processor](detector/detector.go#L48) that counts how many messages a user sends and receives and issues a `BlockEvent` if the ratio *sent/(sent+received)* exceeds a threshold. +The detector table should keep the following value for each user. +```go +type Counters struct { + Sent int + Received int +} +``` + +Whenever the table value is updated, it should check whether the user is a spammer. +If the number of messages sent is higher than `minMessages` and the sent rate is higher than some `maxRate`, we declare the user to be a spammer and issue a `BlockEvent`. + +```go +func detectSpammer(ctx goka.Context, c *Counters) bool { + var ( + total = float64(c.Sent + c.Received) + rate = float64(c.Sent) / total + ) + return total >= minMessages && rate >= maxRate +} +``` + +### Counting sent and received messages with one processor with `Context.Loopback()` + +Now, we defined an approach to detect spammers, but we have to keep the values in the group table updated. +We define the group graph in parts. +We start with the callback for `SentStream`: + +```go +input := goka.Input(messaging.SentStream, new(messaging.MessageCodec), + func(ctx goka.Context, msg interface{}) { + c := getValue(ctx) + c.Sent++ + ctx.SetValue(c) + if detectSpammer(ctx, c) { + ctx.Emit(blocker.Stream, ctx.Key(), new(blocker.BlockEvent)) + } + m := msg.(*messaging.Message) + ctx.Loopback(m.To, m) + }, +) + +func getValue(ctx goka.Context) *Counters { + if v := ctx.Value(); v != nil { + return v.(*Counters) + } + return &Counters{} +} +``` + +For every message received from `SentStream`, we first get the value for the key or create a new `Counters` object. +`SentStream` has the sender as key, so we increment `c.Sent` and store back in the group table with `ctx.SetValue()`. +Next, we call `detectSpammer(ctx, c)`, which will check whether sent rate is higher than a threshold. +Finally, we forward the message to the key of the recipient of the message using `ctx.Loopback()`. + + +`ctx.Loopback()` writes in a special topic which is consumed by the loop callback. +If we have multiple instances of the detector sharing the load, the loop callback may even be handled in another instance than the one that called `ctx.Loopback()`. +We define the callback as follows: +```go +loop := goka.Loop(new(messaging.MessageCodec), + func(ctx goka.Context, msg interface{}) { + c := getValue(ctx) + c.Received++ + ctx.SetValue(c) + if detectSpammer(ctx, c) { + ctx.Emit(blocker.Stream, ctx.Key(), new(blocker.BlockEvent)) + } + }, +) + +``` +Here again, we first get the value for the key. +Since the key is now the receiver user, we increment `c.Received` and update that value in the group table. +Next, we check whether the user is a spammer with the following function. + +### Group graph +Finally, we define the complete group as follows: +```go +g := goka.DefineGroup(goka.Group("detector"), + input, + loop, + goka.Output(blocker.Stream, new(blocker.BlockEventCodec)), + goka.Persist(new(CountersCodec)), +) + +p, _ := goka.NewProcessor(brokers, g) +_ = p.Start() +``` + +### Running the example + +In this final step, we added a [spam detector](detector/detector.go) which consumes messages from `SentStream` and emits block events into `blocker.Stream` if the sender or receiver of the message seem to be a spammer. + +To test the detector, start `cmd/processor` with `-detector` flag and unblock Bob. +He should be quickly blocked again. + +Note that in practice detecting spammers is much more complicated than the naive approach taken here. +Watch [this video](https://tech.lovoo.com/2017/06/16/bbuzz-17-anti-spam-and-machine-learning-at-lovoo/) +for details. + +Note that tables have to be configured in Kafka with log compaction. +For details check the [Wiki](https://github.com/lovoo/goka/wiki/Tips#configuring-log-compaction-for-table-topics). diff --git a/examples/3-messaging/blocker/blocker.go b/examples/3-messaging/blocker/blocker.go new file mode 100644 index 00000000..8150165f --- /dev/null +++ b/examples/3-messaging/blocker/blocker.go @@ -0,0 +1,70 @@ +package blocker + +import ( + "encoding/json" + + "github.com/lovoo/goka" +) + +var ( + group goka.Group = "blocker" + Table goka.Table = goka.GroupTable(group) + Stream goka.Stream = "block_user" +) + +type BlockEvent struct { + Unblock bool +} + +type BlockEventCodec struct{} + +func (c *BlockEventCodec) Encode(value interface{}) ([]byte, error) { + return json.Marshal(value) +} + +func (c *BlockEventCodec) Decode(data []byte) (interface{}, error) { + var m BlockEvent + return &m, json.Unmarshal(data, &m) +} + +type BlockValue struct { + Blocked bool +} +type BlockValueCodec struct{} + +func (c *BlockValueCodec) Encode(value interface{}) ([]byte, error) { + return json.Marshal(value) +} + +func (c *BlockValueCodec) Decode(data []byte) (interface{}, error) { + var m BlockValue + return &m, json.Unmarshal(data, &m) +} + +func block(ctx goka.Context, msg interface{}) { + var s *BlockValue + if v := ctx.Value(); v == nil { + s = new(BlockValue) + } else { + s = v.(*BlockValue) + } + + if msg.(*BlockEvent).Unblock { + s.Blocked = false + } else { + s.Blocked = true + } + ctx.SetValue(s) +} + +func Run(brokers []string) { + g := goka.DefineGroup(group, + goka.Input(Stream, new(BlockEventCodec), block), + goka.Persist(new(BlockValueCodec)), + ) + if p, err := goka.NewProcessor(brokers, g); err != nil { + panic(err) + } else if err = p.Start(); err != nil { + panic(err) + } +} diff --git a/examples/3-messaging/cmd/block-user/main.go b/examples/3-messaging/cmd/block-user/main.go new file mode 100644 index 00000000..bf563818 --- /dev/null +++ b/examples/3-messaging/cmd/block-user/main.go @@ -0,0 +1,34 @@ +package main + +import ( + "flag" + "fmt" + "os" + + "github.com/lovoo/goka" + "github.com/lovoo/goka/examples/3-messaging/blocker" +) + +var ( + user = flag.String("user", "", "user to block") + unblock = flag.Bool("unblock", false, "unblock user instead of blocking") + broker = flag.String("broker", "localhost:9092", "boostrap Kafka broker") +) + +func main() { + flag.Parse() + if *user == "" { + fmt.Println("cannot block user ''") + os.Exit(1) + } + emitter, err := goka.NewEmitter([]string{*broker}, blocker.Stream, new(blocker.BlockEventCodec)) + if err != nil { + panic(err) + } + defer emitter.Finish() + + err = emitter.EmitSync(*user, &blocker.BlockEvent{Unblock: *unblock}) + if err != nil { + panic(err) + } +} diff --git a/examples/3-messaging/cmd/loadgen/main.go b/examples/3-messaging/cmd/loadgen/main.go new file mode 100644 index 00000000..0ddf4a6a --- /dev/null +++ b/examples/3-messaging/cmd/loadgen/main.go @@ -0,0 +1,86 @@ +package main + +import ( + "bytes" + "encoding/json" + "fmt" + "log" + "math/rand" + "net/http" + "time" + + "github.com/lovoo/goka/examples/3-messaging" +) + +const ( + urlTmpl = "http://localhost:8080/%s/send" + spamProb = 0.3 +) + +var ( + users = []string{ + "Alice", + "Bob", + "Charlie", + "Dave", + "Eve", + } + + contents = []string{ + "Hi how are you doing", + "Hello let's have lunch together", + "Bye", + } +) + +func send(from, to, content string) { + m := messaging.Message{ + To: to, + Content: content, + } + + b, err := json.Marshal(&m) + if err != nil { + log.Printf("error encoding message: %v", err) + return + } + + url := fmt.Sprintf(urlTmpl, from) + + req, err := http.NewRequest("POST", url, bytes.NewBuffer(b)) + if err != nil { + log.Printf("error creating request: %v", err) + return + } + req.Header.Set("Content-Type", "application/json") + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + log.Printf("error sending request: %v", err) + return + } + defer resp.Body.Close() + //TODO(diogo) check response status code +} + +func main() { + t := time.NewTicker(200 * time.Millisecond) + defer t.Stop() + + for range t.C { + var ( + cnt = "spam!" + from = "Bob" + ) + if rand.Float64() < 1-spamProb { + from = users[rand.Intn(len(users))] + cnt = contents[rand.Intn(len(contents))] + } + to := users[rand.Intn(len(users))] + for to == from { + to = users[rand.Intn(len(users))] + } + send(from, to, cnt) + } +} diff --git a/examples/3-messaging/cmd/processor/main.go b/examples/3-messaging/cmd/processor/main.go new file mode 100644 index 00000000..e78cf491 --- /dev/null +++ b/examples/3-messaging/cmd/processor/main.go @@ -0,0 +1,58 @@ +package main + +import ( + "flag" + "log" + "os" + "os/signal" + "syscall" + + "github.com/lovoo/goka/examples/3-messaging/blocker" + "github.com/lovoo/goka/examples/3-messaging/collector" + "github.com/lovoo/goka/examples/3-messaging/detector" + "github.com/lovoo/goka/examples/3-messaging/filter" + "github.com/lovoo/goka/examples/3-messaging/translator" +) + +var ( + brokers = []string{"localhost:9092"} + runFilter = flag.Bool("filter", false, "run filter processor") + runCollector = flag.Bool("collector", false, "run collector processor") + runTranslator = flag.Bool("translator", false, "run translator processor") + runBlocker = flag.Bool("blocker", false, "run blocker processor") + runDetector = flag.Bool("detector", false, "run detector processor") + broker = flag.String("broker", "localhost:9092", "boostrap Kafka broker") +) + +func main() { + flag.Parse() + if *runCollector { + log.Println("starting collector") + go collector.Run(brokers) + } + if *runFilter { + log.Println("starting filter") + go filter.Run(brokers) + } + if *runBlocker { + log.Println("starting blocker") + go blocker.Run(brokers) + } + if *runDetector { + log.Println("starting detector") + go detector.Run(brokers) + } + if *runTranslator { + log.Println("starting translator") + go translator.Run(brokers) + } + + // Wait for SIGINT/SIGTERM + waiter := make(chan os.Signal, 1) + signal.Notify(waiter, syscall.SIGINT, syscall.SIGTERM) + + select { + case signal := <-waiter: + log.Printf("Got interrupted by %v", signal) + } +} diff --git a/examples/3-messaging/cmd/service/main.go b/examples/3-messaging/cmd/service/main.go new file mode 100644 index 00000000..3a19c7ee --- /dev/null +++ b/examples/3-messaging/cmd/service/main.go @@ -0,0 +1,22 @@ +package main + +import ( + "flag" + + "github.com/lovoo/goka/examples/3-messaging" + "github.com/lovoo/goka/examples/3-messaging/service" +) + +var ( + sent = flag.Bool("sent", false, "emit to SentStream") + broker = flag.String("broker", "localhost:9092", "boostrap Kafka broker") +) + +func main() { + flag.Parse() + if *sent { + service.Run([]string{*broker}, messaging.SentStream) + } else { + service.Run([]string{*broker}, messaging.ReceivedStream) + } +} diff --git a/examples/3-messaging/cmd/translate-word/main.go b/examples/3-messaging/cmd/translate-word/main.go new file mode 100644 index 00000000..8b9e4559 --- /dev/null +++ b/examples/3-messaging/cmd/translate-word/main.go @@ -0,0 +1,35 @@ +package main + +import ( + "flag" + "fmt" + "os" + + "github.com/lovoo/goka" + "github.com/lovoo/goka/codec" + "github.com/lovoo/goka/examples/3-messaging/translator" +) + +var ( + word = flag.String("word", "", "word to translate") + with = flag.String("with", "", "word translation") + broker = flag.String("broker", "localhost:9092", "boostrap Kafka broker") +) + +func main() { + flag.Parse() + if *word == "" { + fmt.Println("cannot translate word ''") + os.Exit(1) + } + emitter, err := goka.NewEmitter([]string{*broker}, translator.Stream, new(codec.String)) + if err != nil { + panic(err) + } + defer emitter.Finish() + + err = emitter.EmitSync(*word, *with) + if err != nil { + panic(err) + } +} diff --git a/examples/3-messaging/collector/collector.go b/examples/3-messaging/collector/collector.go new file mode 100644 index 00000000..93c07d47 --- /dev/null +++ b/examples/3-messaging/collector/collector.go @@ -0,0 +1,54 @@ +package collector + +import ( + "encoding/json" + + "github.com/lovoo/goka" + "github.com/lovoo/goka/examples/3-messaging" +) + +const maxMessages = 5 + +var ( + group goka.Group = "collector" + Table goka.Table = goka.GroupTable(group) +) + +type MessageListCodec struct{} + +func (c *MessageListCodec) Encode(value interface{}) ([]byte, error) { + return json.Marshal(value) +} + +func (c *MessageListCodec) Decode(data []byte) (interface{}, error) { + var m []messaging.Message + err := json.Unmarshal(data, &m) + return m, err +} + +func collect(ctx goka.Context, msg interface{}) { + var ml []messaging.Message + if v := ctx.Value(); v != nil { + ml = v.([]messaging.Message) + } + + m := msg.(*messaging.Message) + ml = append(ml, *m) + + if len(ml) > maxMessages { + ml = ml[len(ml)-maxMessages:] + } + ctx.SetValue(ml) +} + +func Run(brokers []string) { + g := goka.DefineGroup(group, + goka.Input(messaging.ReceivedStream, new(messaging.MessageCodec), collect), + goka.Persist(new(MessageListCodec)), + ) + if p, err := goka.NewProcessor(brokers, g); err != nil { + panic(err) + } else if err = p.Start(); err != nil { + panic(err) + } +} diff --git a/examples/3-messaging/detector/detector.go b/examples/3-messaging/detector/detector.go new file mode 100644 index 00000000..0d61c85f --- /dev/null +++ b/examples/3-messaging/detector/detector.go @@ -0,0 +1,85 @@ +package detector + +import ( + "encoding/json" + + "github.com/lovoo/goka" + "github.com/lovoo/goka/examples/3-messaging" + "github.com/lovoo/goka/examples/3-messaging/blocker" +) + +const ( + minMessages = 200 + maxRate = 0.5 +) + +var ( + group goka.Group = "detector" +) + +type Counters struct { + Sent int + Received int +} + +type CountersCodec struct{} + +func (c *CountersCodec) Encode(value interface{}) ([]byte, error) { + return json.Marshal(value) +} + +func (c *CountersCodec) Decode(data []byte) (interface{}, error) { + var m Counters + return &m, json.Unmarshal(data, &m) +} + +func getValue(ctx goka.Context) *Counters { + if v := ctx.Value(); v != nil { + return v.(*Counters) + } + return &Counters{} +} + +func detectSpammer(ctx goka.Context, c *Counters) bool { + var ( + total = float64(c.Sent + c.Received) + rate = float64(c.Sent) / total + ) + return total >= minMessages && rate >= maxRate +} + +func Run(brokers []string) { + g := goka.DefineGroup(group, + goka.Input(messaging.SentStream, new(messaging.MessageCodec), func(ctx goka.Context, msg interface{}) { + c := getValue(ctx) + c.Sent++ + ctx.SetValue(c) + + // check if sender is a spammer + if detectSpammer(ctx, c) { + ctx.Emit(blocker.Stream, ctx.Key(), new(blocker.BlockEvent)) + } + + // Loop to receiver + m := msg.(*messaging.Message) + ctx.Loopback(m.To, m) + }), + goka.Loop(new(messaging.MessageCodec), func(ctx goka.Context, msg interface{}) { + c := getValue(ctx) + c.Received++ + ctx.SetValue(c) + + // check if receiver is a spammer + if detectSpammer(ctx, c) { + ctx.Emit(blocker.Stream, ctx.Key(), new(blocker.BlockEvent)) + } + }), + goka.Output(blocker.Stream, new(blocker.BlockEventCodec)), + goka.Persist(new(CountersCodec)), + ) + if p, err := goka.NewProcessor(brokers, g); err != nil { + panic(err) + } else if err = p.Start(); err != nil { + panic(err) + } +} diff --git a/examples/3-messaging/figs/goka-arch-blocker.png b/examples/3-messaging/figs/goka-arch-blocker.png new file mode 100644 index 00000000..2b7b2155 Binary files /dev/null and b/examples/3-messaging/figs/goka-arch-blocker.png differ diff --git a/examples/3-messaging/figs/goka-arch-simple.png b/examples/3-messaging/figs/goka-arch-simple.png new file mode 100644 index 00000000..2346dc7e Binary files /dev/null and b/examples/3-messaging/figs/goka-arch-simple.png differ diff --git a/examples/3-messaging/filter/filter.go b/examples/3-messaging/filter/filter.go new file mode 100644 index 00000000..73cbd590 --- /dev/null +++ b/examples/3-messaging/filter/filter.go @@ -0,0 +1,55 @@ +package filter + +import ( + "strings" + + "github.com/lovoo/goka" + messaging "github.com/lovoo/goka/examples/3-messaging" + "github.com/lovoo/goka/examples/3-messaging/blocker" + "github.com/lovoo/goka/examples/3-messaging/translator" +) + +var ( + group goka.Group = "message_filter" +) + +func shouldDrop(ctx goka.Context) bool { + v := ctx.Join(blocker.Table) + return v != nil && v.(*blocker.BlockValue).Blocked +} + +func filter(ctx goka.Context, msg interface{}) { + if shouldDrop(ctx) { + return + } + m := translate(ctx, msg.(*messaging.Message)) + ctx.Emit(messaging.ReceivedStream, m.To, m) +} + +func translate(ctx goka.Context, m *messaging.Message) *messaging.Message { + words := strings.Split(m.Content, " ") + for i, w := range words { + if tw := ctx.Lookup(translator.Table, w); tw != nil { + words[i] = tw.(string) + } + } + return &messaging.Message{ + From: m.From, + To: m.To, + Content: strings.Join(words, " "), + } +} + +func Run(brokers []string) { + g := goka.DefineGroup(group, + goka.Input(messaging.SentStream, new(messaging.MessageCodec), filter), + goka.Output(messaging.ReceivedStream, new(messaging.MessageCodec)), + goka.Join(blocker.Table, new(blocker.BlockValueCodec)), + goka.Lookup(translator.Table, new(translator.ValueCodec)), + ) + if p, err := goka.NewProcessor(brokers, g); err != nil { + panic(err) + } else if err = p.Start(); err != nil { + panic(err) + } +} diff --git a/examples/3-messaging/message.go b/examples/3-messaging/message.go new file mode 100644 index 00000000..17c61a5f --- /dev/null +++ b/examples/3-messaging/message.go @@ -0,0 +1,41 @@ +package messaging + +import ( + "encoding/json" + + "github.com/lovoo/goka" +) + +var ( + SentStream goka.Stream = "message_sent" + ReceivedStream goka.Stream = "message_received" +) + +type Message struct { + From string + To string + Content string +} + +type MessageCodec struct{} + +func (c *MessageCodec) Encode(value interface{}) ([]byte, error) { + return json.Marshal(value) +} + +func (c *MessageCodec) Decode(data []byte) (interface{}, error) { + var m Message + return &m, json.Unmarshal(data, &m) +} + +type MessageListCodec struct{} + +func (c *MessageListCodec) Encode(value interface{}) ([]byte, error) { + return json.Marshal(value) +} + +func (c *MessageListCodec) Decode(data []byte) (interface{}, error) { + var m []Message + err := json.Unmarshal(data, &m) + return m, err +} diff --git a/examples/3-messaging/service/service.go b/examples/3-messaging/service/service.go new file mode 100644 index 00000000..3f372ec4 --- /dev/null +++ b/examples/3-messaging/service/service.go @@ -0,0 +1,84 @@ +package service + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "log" + "net/http" + + "github.com/gorilla/mux" + "github.com/lovoo/goka" + "github.com/lovoo/goka/examples/3-messaging" + "github.com/lovoo/goka/examples/3-messaging/collector" +) + +func Run(brokers []string, stream goka.Stream) { + view, err := goka.NewView(brokers, collector.Table, new(collector.MessageListCodec)) + if err != nil { + panic(err) + } + go view.Start() + defer view.Stop() + + emitter, err := goka.NewEmitter(brokers, stream, new(messaging.MessageCodec)) + if err != nil { + panic(err) + } + defer emitter.Finish() + + router := mux.NewRouter() + router.HandleFunc("/{user}/send", send(emitter, stream)).Methods("POST") + router.HandleFunc("/{user}/feed", feed(view)).Methods("GET") + + log.Printf("Listen port 8080") + log.Fatal(http.ListenAndServe(":8080", router)) +} + +func send(emitter *goka.Emitter, stream goka.Stream) func(w http.ResponseWriter, r *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + var m messaging.Message + + b, err := ioutil.ReadAll(r.Body) + if err != nil { + fmt.Fprintf(w, "error: %v", err) + return + } + + err = json.Unmarshal(b, &m) + if err != nil { + fmt.Fprintf(w, "error: %v", err) + return + } + + m.From = mux.Vars(r)["user"] + + if stream == messaging.ReceivedStream { + err = emitter.EmitSync(m.To, &m) + } else { + err = emitter.EmitSync(m.From, &m) + } + if err != nil { + fmt.Fprintf(w, "error: %v", err) + return + } + log.Printf("Sent message:\n %v\n", m) + fmt.Fprintf(w, "Sent message:\n %v\n", m) + } +} + +func feed(view *goka.View) func(w http.ResponseWriter, r *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + user := mux.Vars(r)["user"] + val, _ := view.Get(user) + if val == nil { + fmt.Fprintf(w, "%s not found!", user) + return + } + messages := val.([]messaging.Message) + fmt.Fprintf(w, "Latest messages for %s\n", user) + for i, m := range messages { + fmt.Fprintf(w, "%d %10s: %v\n", i, m.From, m.Content) + } + } +} diff --git a/examples/3-messaging/translator/translator.go b/examples/3-messaging/translator/translator.go new file mode 100644 index 00000000..a1f0efad --- /dev/null +++ b/examples/3-messaging/translator/translator.go @@ -0,0 +1,32 @@ +package translator + +import ( + "github.com/lovoo/goka" + "github.com/lovoo/goka/codec" +) + +var ( + group goka.Group = "translator" + Table goka.Table = goka.GroupTable(group) + Stream goka.Stream = "translate-word" +) + +type ValueCodec struct { + codec.String +} + +func translate(ctx goka.Context, msg interface{}) { + ctx.SetValue(msg.(string)) +} + +func Run(brokers []string) { + g := goka.DefineGroup(group, + goka.Input(Stream, new(ValueCodec), translate), + goka.Persist(new(ValueCodec)), + ) + if p, err := goka.NewProcessor(brokers, g); err != nil { + panic(err) + } else if err = p.Start(); err != nil { + panic(err) + } +} diff --git a/examples/create-kafka-commands.sh b/examples/create-kafka-commands.sh new file mode 100644 index 00000000..a58c1dfa --- /dev/null +++ b/examples/create-kafka-commands.sh @@ -0,0 +1,32 @@ +#!/bin/bash + +## lists the scripts inside the Kafka container and creates local scripts to call them with docker. + +set -e + +# directory to save the scripts +TARGET=$1 +mkdir -p $TARGET + +# create Kafka scripts +SCRIPTS=$(docker run --rm -it --entrypoint /bin/bash wurstmeister/kafka -c "ls \$KAFKA_HOME/bin/*.sh") +for SCRIPT in $SCRIPTS; do + SCRIPT=$(echo $SCRIPT | tr -d '\r') + FN=$TARGET/$(basename $SCRIPT) + echo creating $FN + cat <<-EOF > $FN + #!/bin/bash + CMD="$SCRIPT \$@" + docker run --net=host --rm -it --entrypoint /bin/bash wurstmeister/kafka -c "\$CMD" +EOF + chmod +x $FN +done + +# create ZooKeeper client scriptt +echo creating $TARGET/zkCli.sh +cat <<-EOF > $TARGET/zkCli.sh + #!/bin/bash + CMD="bin/zkCli.sh \$@" + docker run --net=host --rm -it wurstmeister/zookeeper bash -c "\$CMD" +EOF +chmod +x $TARGET/zkCli.sh diff --git a/kafkamock.go b/kafkamock.go index 7244b396..1ab35edd 100644 --- a/kafkamock.go +++ b/kafkamock.go @@ -379,8 +379,8 @@ func (km *consumerMock) Commit(topic string, partition int32, offset int64) erro return nil } -// AddPartition marks the topic as a state topic. -// The mock has to know the state topic to ignore emit calls (which would never be consumed) +// AddPartition marks the topic as a table topic. +// The mock has to know the group table topic to ignore emit calls (which would never be consumed) func (km *consumerMock) AddPartition(topic string, partition int32, initialOffset int64) { }