Skip to content

Commit

Permalink
Merge pull request #88 from lovoo/example-messaging
Browse files Browse the repository at this point in the history
messaging example
  • Loading branch information
db7 committed Feb 26, 2018
2 parents cea03b6 + 24af59a commit 6a289b6
Show file tree
Hide file tree
Showing 20 changed files with 1,215 additions and 12 deletions.
14 changes: 9 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ Goka extends the concept of Kafka consumer groups by binding a state table to th
## Documentation

This README provides a brief, high level overview of the ideas behind Goka.
A more detailed introduction of the project can be found in this [blog post](https://tech.lovoo.com/2017/05/23/goka/).

Package API documentation is available at [GoDoc].
Package API documentation is available at [GoDoc] and the [Wiki](https://github.com/lovoo/goka/wiki/Tips#configuring-log-compaction-for-table-topics) provides several tips for configuring, extending, and deploying Goka applications.

## Installation

Expand All @@ -45,17 +46,17 @@ Goka relies on Kafka for message passing, fault-tolerant state storage and workl

* **Processor** is a set of callback functions that consume and perform state transformations upon delivery of these emitted messages. *Processor groups* are formed of one or more instances of a processor. Goka distributes the partitions of the input topics across all processor instances in a processor group. This enables effortless scaling and fault-tolerance. If a processor instance fails, its partitions and state are reassigned to the remaining healthy members of the processor group. Processors can also emit further messages into Kafka.

* **Group tables** are partitioned key-value tables stored in Kafka that belong to a single processor group. If a processor instance fails, the remaining instances will take over the group table partitions of the failed instance recovering them from Kafka.
* **Group table** is the state of a processor group. It is a partitioned key-value table stored in Kafka that belongs to a single processor group. If a processor instance fails, the remaining instances will take over the group table partitions of the failed instance recovering them from Kafka.

* **Views** are local caches of a processor group's complete group table. Views provide read-only access to the group tables and can be used to provide external services for example through a gRPC interface.
* **Views** are local caches of a complete group table. Views provide read-only access to the group tables and can be used to provide external services for example through a gRPC interface.


## Get Started

An example Goka application could look like the following.
An emitter emits a single message with key "some-key" and value "some-value" into the "example-stream" topic.
A processor processes the "example-stream" topic counting the number of messages delivered for "some-key".
The counter is persisted in the "example-group-state" topic.
The counter is persisted in the "example-group-table" topic.
To locally start a dockerized Zookeeper and Kafka instances, execute `make start` with the `Makefile` in the [examples] folder.

```go
Expand Down Expand Up @@ -111,7 +112,7 @@ func runProcessor() {
}

// Define a new processor group. The group defines all inputs, outputs, and
// serialization formats. The group-table topic is "example-group-state".
// serialization formats. The group-table topic is "example-group-table".
g := goka.DefineGroup(group,
goka.Input(topic, new(codec.String), cb),
goka.Persist(new(codec.Int64)),
Expand Down Expand Up @@ -139,6 +140,9 @@ func main() {
}
```

Note that tables have to be configured in Kafka with log compaction.
For details check the [Wiki](https://github.com/lovoo/goka/wiki/Tips#configuring-log-compaction-for-table-topics).

## How to contribute

Contributions are always welcome.
Expand Down
2 changes: 1 addition & 1 deletion examples/1-simplest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func runProcessor() {
}

// Define a new processor group. The group defines all inputs, outputs, and
// serialization formats. The group-table topic is "example-group-state".
// serialization formats. The group-table topic is "example-group-table".
g := goka.DefineGroup(group,
goka.Input(topic, new(codec.String), cb),
goka.Persist(new(codec.Int64)),
Expand Down
8 changes: 4 additions & 4 deletions examples/2-clicks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ This example shows how to:

* Write a processor that consumes data from kafka, counting clicks for a user
* Write an emitter to push data to kafka
* Writing a view to query the user state
* Writing a view to query the user table

To get an introduction into goka, see this [blog post](http://tech.lovoo.com/2017/05/23/goka).

Expand All @@ -22,7 +22,7 @@ go run main.go
This should output something like

```
2017/05/23 15:09:20 Table mini-group-state has 10 partitions
2017/05/23 15:09:20 Table mini-group-table has 10 partitions
2017/05/23 15:09:20 Processor: started
View opened at http://localhost:9095/
2017/05/23 15:09:20 View: started
Expand Down Expand Up @@ -95,9 +95,9 @@ The consumer of a topic must use the same codec as the writer, otherwise we'll g
unmarshalling will simply fail.

* `goka.Persist` makes the processor store its group table persistently using kafka. That means on every
restart (either the same host or somewhere else), the state will be restored.
restart (either the same host or somewhere else), the group table will be restored.
This option also makes the processor cache the group table locally using a key-value store.
That avoids holding the full state in memory and a long-running recovery on every restart.
That avoids holding the full group table in memory and a long-running recovery on every restart.

To persist the group table, again we need a `Codec` which encodes the user for this case.
We want to store objects of type `*user`, so we have to implement our own codec. In our example,
Expand Down
Loading

0 comments on commit 6a289b6

Please sign in to comment.