Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added redis storage option #96

Merged
merged 2 commits into from
Mar 15, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ Goka relies on Kafka for message passing, fault-tolerant state storage and workl

* **Views** are local caches of a complete group table. Views provide read-only access to the group tables and can be used to provide external services for example through a gRPC interface.

* **Local storage** keeps a local copy of the group table partitions to speedup recovery and reduce memory utilization. By default, the local storage uses [LevelDB](https://github.com/syndtr/goleveldb), but in-memory map and [Redis-based storage](tree/master/storage/redis) are also available.


## Get Started

Expand Down
69 changes: 69 additions & 0 deletions examples/redis/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Goka Example (Redis)

Using [Redis](https://redis.io/) as option of storage.

This example has an **Emitter** and one **Processor** with when this emitter
generates events with random keys (_user_id_) consumed by Kafka that uses
Redis as storage/cache.


## Usage

It's easy to configures this example.

1. Starts the Kafka present on [`docker-compose`](kafka.yml) file with command bellow:

```console
docker-compose -f kafka.yml -p ci up -d
```

2. Check the status of docker containers:

```console
docker ps
```

Resulting for example:

```console
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
38a6efb145ba wurstmeister/kafka "start-kafka.sh" 9 seconds ago Up 4 seconds 0.0.0.0:9092->9092/tcp, 0.0.0.0:9997->9997/tcp kafka
48df80931a1f confluent/zookeeper "/usr/local/bin/zk-d…" 10 seconds ago Up 5 seconds 2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp zookeeper
040fbe9dfc13 redis:latest "docker-entrypoint.s…" 10 seconds ago Up 5 seconds 0.0.0.0:6379->6379/tcp redis
```

This same configuration should be present on `config.yaml` file, with kafka an redis like:

```yaml
kafka:
brokers:
- "127.0.0.1:9092"
group: "examples"
stream: "events"
redis: "127.0.0.1:6379"
namespace: "producer"
```

Where:
* **brokers** : slice of kafka brokers hosts.
* **group** : group name of this example belongs.
* **stream**: stream name of this example belongs.
* **redis**: address of redis (`localhost:6379`).
* **namespace**: namespace distinguish applications that write to the same keys on Redis.

3. Fetch the go [redis](gopkg.in/redis.v5) client package:

```console
go get -u gopkg.in/redis.v5
```

4. Build and run the example:

```console
go build .
./redis -config config.yaml
```

The events are produced and consumed by Kafka with random keys. It's possible
run several of the same binary and check the behaviour of kafka
rebalancing and removing partitions without broken.
18 changes: 18 additions & 0 deletions examples/redis/codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package main

import "encoding/json"

type Codec struct{}

// Encode encodes a event struct into an array.
func (c *Codec) Encode(value interface{}) ([]byte, error) {
return json.Marshal(value)
}

// Decode decodes a event from byte encoded array.
func (c *Codec) Decode(data []byte) (interface{}, error) {
event := new(Event)

err := json.Unmarshal(data, event)
return event, err
}
8 changes: 8 additions & 0 deletions examples/redis/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@

kafka:
brokers:
- "127.0.0.1:9092"
group: "examples"
stream: "events"
redis: "127.0.0.1:6379"
namespace: "producer"
46 changes: 46 additions & 0 deletions examples/redis/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package main

import (
"context"

"github.com/lovoo/goka"
storage "github.com/lovoo/goka/storage/redis"

redis "gopkg.in/redis.v5"
)

// Publisher defines an interface to Publish the event somewhere.
type Publisher interface {
Publish(ctx context.Context, key string, event *Event) error
Close() error
}

// Consume starts goka events consumer.
func Consume(pub Publisher, brokers []string, group string, stream string, store string, namespace string) error {
codec := new(Codec)

input := goka.Input(goka.Stream(stream), codec, func(ctx goka.Context, msg interface{}) {
event, ok := msg.(*Event)
if ok {
pub.Publish(context.Background(), ctx.Key(), event)
}
})
graph := goka.DefineGroup(goka.Group(group), input, goka.Persist(codec))

opts := []goka.ProcessorOption{}
switch {
case store != "":
client := redis.NewClient(&redis.Options{
Addr: store,
})
opts = append(opts, goka.WithStorageBuilder(storage.RedisBuilder(client, namespace)))
defer client.Close()
}
processor, err := goka.NewProcessor(brokers, graph, opts...)
if err != nil {
return err
}
defer processor.Stop()

return processor.Start()
}
6 changes: 6 additions & 0 deletions examples/redis/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package main

type Event struct {
UserID string `json:"user_id"`
Timestamp int64 `json:"timestamp"`
}
39 changes: 39 additions & 0 deletions examples/redis/kafka.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
version: '2'
services:

# zookeeper ==========================
zookeeper:
container_name: zookeeper
image: confluent/zookeeper
ports:
- "2181:2181"

# kafka ==============================
kafka:
container_name: kafka
image: wurstmeister/kafka
volumes:
- /var/run/docker.sock:/var/run/docker.sock
depends_on:
- zookeeper
- redis
ports:
- "9092:9092"
- "9997:9997"
environment:
KAFKA_PORT: 9092
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_NUM_PARTITIONS: 10
KAFKA_DEFAULT_REPLICATION_FACTOR: 1
KAFKA_REPLICATION_FACTOR: 1

# redis =======================================
redis:
container_name: redis
image: redis:latest
ports:
- "6379:6379"
expose:
- "6379"
82 changes: 82 additions & 0 deletions examples/redis/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package main

import (
"context"
"flag"
"fmt"
"io/ioutil"
"log"
"math/rand"
"strconv"
"time"

yaml "gopkg.in/yaml.v2"
)

type Config struct {
Kafka struct {
Brokers []string `yaml:"brokers"`
Group string `yaml:"group"`
Stream string `yaml:"stream"`
Redis string `yaml:"redis"`
Namespace string `yaml:"namespace"`
} `yaml:"kafka"`
}

var (
filename = flag.String("config", "config.yaml", "path to config file")
)

func main() {
flag.Parse()

conf, err := readConfig(*filename)
if err != nil {
log.Fatal(err)
}

// consuming
go func() {
err := Consume(new(nopPublisher), conf.Kafka.Brokers, conf.Kafka.Group,
conf.Kafka.Stream, conf.Kafka.Redis, conf.Kafka.Namespace)
if err != nil {
log.Fatal(err)
}
}()

// producing
producer, err := NewProducer(conf.Kafka.Brokers, conf.Kafka.Stream)
for {
event := &Event{
UserID: strconv.FormatInt(rand.Int63n(255), 10),
Timestamp: time.Now().UnixNano(),
}
fmt.Printf("emit ->key: `%v` ->event: `%v`\n", event.UserID, event)
err = producer.Emit(event.UserID, event)
if err != nil {
log.Fatal(err)
}
time.Sleep(5 * time.Second)
}
}

func readConfig(filename string) (*Config, error) {
b, err := ioutil.ReadFile(filename)
if err != nil {
return nil, err
}
conf := new(Config)
err = yaml.Unmarshal(b, conf)
return conf, err
}

type nopPublisher struct{}

func (p *nopPublisher) Publish(ctx context.Context, key string, event *Event) error {
fmt.Printf("published ->key: `%v` ->event: `%v`\n", key, event)
return nil
}

func (p *nopPublisher) Close() error {
return nil
}
32 changes: 32 additions & 0 deletions examples/redis/producer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package main

import "github.com/lovoo/goka"

// Producer defines an interface whose events are produced on kafka.
type Producer interface {
Emit(key string, event *Event) error
Close() error
}

type kafkaProducer struct {
emitter *goka.Emitter
}

// NewProducer returns a new kafka producer.
func NewProducer(brokers []string, stream string) (Producer, error) {
codec := new(Codec)
emitter, err := goka.NewEmitter(brokers, goka.Stream(stream), codec)
if err != nil {
return nil, err
}
return &kafkaProducer{emitter}, nil
}

func (p *kafkaProducer) Emit(key string, event *Event) error {
return p.emitter.EmitSync(key, event)
}

func (p *kafkaProducer) Close() error {
p.emitter.Finish()
return nil
}
1 change: 1 addition & 0 deletions storage/append.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type file struct {
bytesWritten int64
}

// NewFile retuns a new on-disk storage.
func NewFile(path string, part int32) (Storage, error) {
if err := os.MkdirAll(path, os.ModePerm); err != nil {
return nil, fmt.Errorf("error creating storage directory: %v", err)
Expand Down
20 changes: 20 additions & 0 deletions storage/redis/builders.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package redis

import (
"errors"
"fmt"

"github.com/lovoo/goka/storage"

redis "gopkg.in/redis.v5"
)

// RedisBuilder builds redis storage.
func RedisBuilder(client *redis.Client, namespace string) storage.Builder {
return func(topic string, partition int32) (storage.Storage, error) {
if namespace == "" {
return nil, errors.New("missing namespace to redis storage")
}
return New(client, fmt.Sprintf("%s:%s:%d", namespace, topic, partition))
}
}
Loading