Skip to content

Commit

Permalink
Merge pull request #23 from lovoo/readme-update
Browse files Browse the repository at this point in the history
Readme update
  • Loading branch information
SamiHiltunen committed May 18, 2017
2 parents 36893b8 + 190ebf8 commit 66a5f7a
Show file tree
Hide file tree
Showing 3 changed files with 200 additions and 128 deletions.
205 changes: 77 additions & 128 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ Goka relies on Kafka for message passing, fault-tolerant state storage and workl

* **Emitters** deliver key-value messages into Kafka. As an example, an emitter could be a database handler emitting the state changes into Kafka for other interested applications to consume.

* **Processor** is a set of callback functions that consume and perform operations on these emitted messages. *Processor groups* are formed of one or more instances of a processor. Goka distributes a topic's partitions across all the processor instances in a processor group. This enables effortless scaling and fault-tolerance. If a processor instance fails, its partitions are reassigned to the remaining healthy members of the processor group. Processors can also emit further messages into Kafka.
* **Processor** is a set of callback functions that consume and perform state transformations upon delivery of these emitted messages. *Processor groups* are formed of one or more instances of a processor. Goka distributes the partitions of the input topics across all processor instances in a processor group. This enables effortless scaling and fault-tolerance. If a processor instance fails, its partitions and state are reassigned to the remaining healthy members of the processor group. Processors can also emit further messages into Kafka.

* **Group tables** are partitioned key-value tables stored in Kafka that belong to a single processor group. If a processor instance fails, the remaining instances will take over the group table partitions of the failed instance recovering them from Kafka.

Expand All @@ -52,151 +52,100 @@ Goka relies on Kafka for message passing, fault-tolerant state storage and workl

## Get Started

An example Goka application could look like the following:
An example Goka application could look like the following.
An emitter emits a single message with key "some-key" and value "some-value" into the "example-stream" topic.
A processor processes the "example-stream" topic counting the number of messages delivered for "some-key".
The counter is persisted in the "example-group-state" topic.
To locally start a dockerized Zookeeper and Kafka instances, execute `make start` with the `Makefile` in the [examples] folder.

### Emitter
```go
package main

import (
"fmt"
"log"
"time"

"github.com/lovoo/goka"
"github.com/lovoo/goka/codec"
"fmt"
"log"
"os"
"os/signal"
"syscall"

"github.com/lovoo/goka"
"github.com/lovoo/goka/codec"
)

var (
brokers = []string{"localhost:9092"}
topic goka.Stream = "mini-input"
brokers = []string{"localhost:9092"}
topic goka.Stream = "example-stream"
group goka.Group = "example-group"
)

func main() {
// create a new emitter which allows you to send
// messages to Kafka
emitter, err := goka.NewEmitter(brokers, topic,
new(codec.String))
if err != nil {
log.Fatalf("error creating emitter: %v", err)
}

// emitter Finish should be called always before
// terminating the application to ensure the emitter
// has delivered all the pending messages to Kafka
defer emitter.Finish()

t := time.NewTicker(5 * time.Second)
defer t.Stop()

// on every timer tick, emit a message to containing
// the current timestamp to Kafka
i := 0
for range t.C {
key := fmt.Sprintf("%d", i%10)
value := fmt.Sprintf("%s", time.Now())
emitter.EmitSync(key, value)
i++
}
// emits a single message and leave
func runEmitter() {
emitter, err := goka.NewEmitter(brokers, topic, new(codec.String))
if err != nil {
log.Fatalf("error creating emitter: %v", err)
}
defer emitter.Finish()
err = emitter.EmitSync("some-key", "some-value")
if err != nil {
log.Fatalf("error emitting message: %v", err)
}
fmt.Println("message emitted")
}
```

### Processor
```go
package main

import (
"log"

"github.com/lovoo/goka"
"github.com/lovoo/goka/codec"
)

var (
brokers = []string{"localhost:9092"}
topic goka.Stream = "mini-input"
group goka.Group = "mini-group"
)

func main() {
// Define a new processor group. The group defines all
// the inputs, output, serialization formats and the
// topics of the processor
g := goka.DefineGroup(group,
goka.Input(topic, new(codec.String), process),
goka.Persist(new(codec.Int64)),
)
if p, err := goka.NewProcessor(brokers, g); err != nil {
log.Fatalf("error creating processor: %v", err)
} else if err = p.Start(); err != nil {
log.Fatalf("error running processor: %v", err)
}
// process messages until ctrl-c is pressed
func runProcessor() {
// process callback is invoked for each message delivered from
// "example-stream" topic.
cb := func(ctx goka.Context, msg interface{}) {
var counter int64
// ctx.Value() gets from the group table the value that is stored for
// the message's key.
if val := ctx.Value(); val != nil {
counter = val.(int64)
}
counter++
// SetValue stores the incremented counter in the group table for in
// the message's key.
ctx.SetValue(counter)
log.Printf("key = %s, counter = %v, msg = %v", ctx.Key(), counter, msg)
}

// Define a new processor group. The group defines all inputs, outputs, and
// serialization formats. The group-table topic is "example-group-state".
g := goka.DefineGroup(group,
goka.Input(topic, new(codec.String), cb),
goka.Persist(new(codec.Int64)),
)

p, err := goka.NewProcessor(brokers, g)
if err != nil {
log.Fatalf("error creating processor: %v", err)
}
go func() {
if err = p.Start(); err != nil {
log.Fatalf("error running processor: %v", err)
}
}()

wait := make(chan os.Signal, 1)
signal.Notify(wait, syscall.SIGINT, syscall.SIGTERM)
<-wait // wait for SIGINT/SIGTERM
p.Stop() // gracefully stop processor
}

// process is the callback the processor will call for
// each message that arrives in the "mini-input" topic.
func process(ctx goka.Context, msg interface{}) {
var counter int64
// ctx.Value gets from the group table the value that
// is stored for the message's key.
if val := ctx.Value(); val != nil {
counter = val.(int64)
}
counter++
// SetValue stores the incremented counter in the
// group table for in the message's key.
ctx.SetValue(counter)

log.Println("[proc] key:", ctx.Key(), "count:", counter, "msg:", msg)
func main() {
runEmitter() // emits one message and stops
runProcessor() // press ctrl-c to stop
}
```

### View
```go
package main

import (
"fmt"
"log"
"time"

"github.com/lovoo/goka"
"github.com/lovoo/goka/codec"
)
## How to contribute

var (
brokers = []string{"localhost:9092"}
group goka.Group = "mini-group"
)

func main() {
// creates a new view which is provides read-only
// access to the mini-group's group table
view, err := goka.NewView(brokers,
goka.GroupTable(group),
new(codec.Int64),
)
if err != nil {
log.Fatalf("error creating view: %v", err)
}
// starting the view begins receiving updates
// from Kafka
go view.Start()
defer view.Stop()

t := time.NewTicker(10 * time.Second)
defer t.Stop()

// on every timer tick, print out the values
// stored in the group table
for range t.C {
for i := 0; i < 10; i++ {
val, _ := view.Get(fmt.Sprintf("%d", i))
log.Printf("[view] %d: %v\n", i, val)
}
}
}
```
Contributions are always welcome.
Please fork the repo, create a pull request against master, and be sure tests pass.
See the [GitHub Flow] for details.

[Apache Kafka]: https://kafka.apache.org/
[GoDoc]: https://godoc.org/github.com/lovoo/goka
[examples]: https://github.com/lovoo/goka/tree/master/examples
[GitHub Flow]: https://guides.github.com/introduction/flow
45 changes: 45 additions & 0 deletions examples/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Makefile to get Zookeeper and Kafka up and running

ifdef DOCKER_HOST
DOCKER_IP=$(shell docker-machine ip)
else
DOCKER_IP=127.0.0.1
endif

all:
@echo targets: start restart stop

# delete all images and restart
restart: stop start
@echo done

# start all images
start: run-zk run-kafka
@echo done

# stop all images (forces deletion of images)
stop:
@echo stopping Kafka and Zookeeper
@docker rm -f zookeeper kafka

run-zk:
@echo starting Zookeeper...
@docker run -d -t --name zookeeper \
-p 2181:2181 \
wurstmeister/zookeeper

run-kafka:
@echo starting Kafka...
@docker run -d -t --name kafka \
-e HOST_IP=kafka \
-e KAFKA_ADVERTISED_HOST_NAME=${DOCKER_IP} \
-e KAFKA_ADVERTISED_PORT=9092 \
-e KAFKA_NUM_PARTITIONS=10 \
-e KAFKA_DEFAULT_REPLICATION_FACTOR=1 \
-e KAFKA_REPLICATION_FACTOR=1 \
-p ${DOCKER_IP}:9092:9092 \
-p ${DOCKER_IP}:9997:9997 \
-e KAFKA_BROKER_ID=1 \
-e ZK=zk -p 9092 \
--link zookeeper:zk \
wurstmeister/kafka:0.10.1.0
78 changes: 78 additions & 0 deletions examples/simplest/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package main

import (
"fmt"
"log"
"os"
"os/signal"
"syscall"

"github.com/lovoo/goka"
"github.com/lovoo/goka/codec"
)

var (
brokers = []string{"localhost:9092"}
topic goka.Stream = "example-stream"
group goka.Group = "example-group"
)

// emits a single message and leave
func runEmitter() {
emitter, err := goka.NewEmitter(brokers, topic, new(codec.String))
if err != nil {
log.Fatalf("error creating emitter: %v", err)
}
defer emitter.Finish()
err = emitter.EmitSync("some-key", "some-value")
if err != nil {
log.Fatalf("error emitting message: %v", err)
}
fmt.Println("message emitted")
}

// process messages until ctrl-c is pressed
func runProcessor() {
// process callback is invoked for each message delivered from
// "example-stream" topic.
cb := func(ctx goka.Context, msg interface{}) {
var counter int64
// ctx.Value() gets from the group table the value that is stored for
// the message's key.
if val := ctx.Value(); val != nil {
counter = val.(int64)
}
counter++
// SetValue stores the incremented counter in the group table for in
// the message's key.
ctx.SetValue(counter)
log.Printf("key = %s, counter = %v, msg = %v", ctx.Key(), counter, msg)
}

// Define a new processor group. The group defines all inputs, outputs, and
// serialization formats. The group-table topic is "example-group-state".
g := goka.DefineGroup(group,
goka.Input(topic, new(codec.String), cb),
goka.Persist(new(codec.Int64)),
)

p, err := goka.NewProcessor(brokers, g)
if err != nil {
log.Fatalf("error creating processor: %v", err)
}
go func() {
if err = p.Start(); err != nil {
log.Fatalf("error running processor: %v", err)
}
}()

wait := make(chan os.Signal, 1)
signal.Notify(wait, syscall.SIGINT, syscall.SIGTERM)
<-wait // wait for SIGINT/SIGTERM
p.Stop() // gracefully stop processor
}

func main() {
runEmitter() // emits one message and stops
runProcessor() // press ctrl-c to stop
}

0 comments on commit 66a5f7a

Please sign in to comment.