Skip to content

Commit

Permalink
fix merge conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
Helton Marques committed Mar 14, 2018
1 parent f4c1096 commit be3a119
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 43 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ Goka relies on Kafka for message passing, fault-tolerant state storage and workl

* **Views** are local caches of a complete group table. Views provide read-only access to the group tables and can be used to provide external services for example through a gRPC interface.

* **Local storage** keeps a local copy of the group table partitions to speedup recovery and reduce memory utilization. By default, the local storage uses [LevelDB](https://github.com/syndtr/goleveldb), but in-memory map and [Redis-based storage](tree/master/storage/redis) are also available.


## Get Started

Expand Down
8 changes: 8 additions & 0 deletions examples/redis/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,16 @@ It's easy to configures this example.
group: "examples"
stream: "events"
redis: "127.0.0.1:6379"
namespace: "producer"
```

Where:
* **brokers** : slice of kafka brokers hosts.
* **group** : group name of this example belongs.
* **stream**: stream name of this example belongs.
* **redis**: address of redis (`localhost:6379`).
* **namespace**: namespace distinguish applications that write to the same keys on Redis.

3. Fetch the go [redis](gopkg.in/redis.v5) client package:

```console
Expand Down
1 change: 1 addition & 0 deletions examples/redis/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ kafka:
group: "examples"
stream: "events"
redis: "127.0.0.1:6379"
namespace: "producer"
10 changes: 5 additions & 5 deletions examples/redis/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package main

import (
"context"
"time"

"github.com/lovoo/goka"
"github.com/lovoo/goka/storage"
storage "github.com/lovoo/goka/storage/redis"

redis "gopkg.in/redis.v5"
)

Expand All @@ -16,7 +16,7 @@ type Publisher interface {
}

// Consume starts goka events consumer.
func Consume(pub Publisher, brokers []string, group string, stream string, store string) error {
func Consume(pub Publisher, brokers []string, group string, stream string, store string, namespace string) error {
codec := new(Codec)

input := goka.Input(goka.Stream(stream), codec, func(ctx goka.Context, msg interface{}) {
Expand All @@ -30,11 +30,11 @@ func Consume(pub Publisher, brokers []string, group string, stream string, store
opts := []goka.ProcessorOption{}
switch {
case store != "":
var retention time.Duration
client := redis.NewClient(&redis.Options{
Addr: store,
})
opts = append(opts, goka.WithStorageBuilder(storage.RedisBuilder(client, retention)))
opts = append(opts, goka.WithStorageBuilder(storage.RedisBuilder(client, namespace)))
defer client.Close()
}
processor, err := goka.NewProcessor(brokers, graph, opts...)
if err != nil {
Expand Down
12 changes: 7 additions & 5 deletions examples/redis/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ import (

type Config struct {
Kafka struct {
Brokers []string `yaml:"brokers"`
Group string `yaml:"group"`
Stream string `yaml:"stream"`
Redis string `yaml:"redis"`
Brokers []string `yaml:"brokers"`
Group string `yaml:"group"`
Stream string `yaml:"stream"`
Redis string `yaml:"redis"`
Namespace string `yaml:"namespace"`
} `yaml:"kafka"`
}

Expand All @@ -36,7 +37,8 @@ func main() {

// consuming
go func() {
err := Consume(new(nopPublisher), conf.Kafka.Brokers, conf.Kafka.Group, conf.Kafka.Stream, conf.Kafka.Redis)
err := Consume(new(nopPublisher), conf.Kafka.Brokers, conf.Kafka.Group,
conf.Kafka.Stream, conf.Kafka.Redis, conf.Kafka.Namespace)
if err != nil {
log.Fatal(err)
}
Expand Down
9 changes: 0 additions & 9 deletions storage/builders.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@ package storage
import (
"fmt"
"path/filepath"
"time"

"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt"
redis "gopkg.in/redis.v5"
)

// Builder creates a local storage (a persistent cache) for a topic
Expand Down Expand Up @@ -46,10 +44,3 @@ func MemoryBuilder() Builder {
return NewMemory(), nil
}
}

// RedisBuilder builds redis storage.
func RedisBuilder(client *redis.Client, retention time.Duration) Builder {
return func(topic string, partition int32) (Storage, error) {
return NewRedis(client, retention)
}
}
20 changes: 20 additions & 0 deletions storage/redis/builders.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package redis

import (
"errors"
"fmt"

"github.com/lovoo/goka/storage"

redis "gopkg.in/redis.v5"
)

// RedisBuilder builds redis storage.
func RedisBuilder(client *redis.Client, namespace string) storage.Builder {
return func(topic string, partition int32) (storage.Storage, error) {
if namespace == "" {
return nil, errors.New("missing namespace to redis storage")
}
return New(client, fmt.Sprintf("%s:%s:%d", namespace, topic, partition))
}
}
56 changes: 32 additions & 24 deletions storage/redis.go → storage/redis/redis.go
Original file line number Diff line number Diff line change
@@ -1,49 +1,54 @@
package storage
package redis

import (
"errors"
"fmt"
"strconv"
"time"

"github.com/lovoo/goka/storage"

redis "gopkg.in/redis.v5"
)

const (
offsetKey = "__offset"
)

type redisStorage struct {
client *redis.Client
retention time.Duration
client *redis.Client
hash string
}

// NewRedis creates a new Storage backed by Redis.
func NewRedis(client *redis.Client, retention time.Duration) (Storage, error) {
// New creates a new Storage backed by Redis.
func New(client *redis.Client, hash string) (storage.Storage, error) {
if client == nil {
return nil, errors.New("invalid redis client")
}
if err := client.Ping().Err(); err != nil {
return nil, err
}
return &redisStorage{
client: client,
retention: retention,
client: client,
hash: hash,
}, nil
}

func (s *redisStorage) Has(key string) (bool, error) {
return s.client.Exists(key).Result()
return s.client.HExists(s.hash, key).Result()
}

func (s *redisStorage) Get(key string) ([]byte, error) {
has := s.client.Exists(key)
if err := has.Err(); err != nil {
has, err := s.client.HExists(s.hash, key).Result()
if err != nil {
return nil, fmt.Errorf("error checking for existence in redis (key %s): %v", key, err)
} else if !has.Val() {
} else if !has {
return nil, nil
}
resp := s.client.Get(key)
if err := resp.Err(); err != nil {
value, err := s.client.HGet(s.hash, key).Bytes()
if err != nil {
return nil, fmt.Errorf("error getting from redis (key %s): %v", key, err)
}
return resp.Bytes()
return value, nil
}

func (s *redisStorage) GetOffset(defValue int64) (int64, error) {
Expand All @@ -63,8 +68,8 @@ func (s *redisStorage) GetOffset(defValue int64) (int64, error) {
}

func (s *redisStorage) Set(key string, value []byte) error {
resp := s.client.Set(key, value, s.retention)
if err := resp.Err(); err != nil {
err := s.client.HSet(s.hash, key, value).Err()
if err != nil {
return fmt.Errorf("error setting to redis (key %s): %v", key, err)
}
return nil
Expand All @@ -75,38 +80,40 @@ func (s *redisStorage) SetOffset(offset int64) error {
}

func (s *redisStorage) Delete(key string) error {
return s.client.Del(key).Err()
return s.client.HDel(s.hash, key).Err()
}

func (s *redisStorage) Iterator() (Iterator, error) {
func (s *redisStorage) Iterator() (storage.Iterator, error) {
var current uint64
var keys []string
var err error

keys, current, err = s.client.Scan(current, "", 0).Result()
keys, current, err = s.client.HScan(s.hash, current, "", 0).Result()
if err != nil {
return nil, err
}
return &redisIterator{
current: current,
keys: keys,
client: s.client,
hash: s.hash,
}, nil
}

func (s *redisStorage) IteratorWithRange(start, limit []byte) (Iterator, error) {
func (s *redisStorage) IteratorWithRange(start, limit []byte) (storage.Iterator, error) {
var current uint64
var keys []string
var err error

keys, current, err = s.client.Scan(current, string(start), 0).Result()
keys, current, err = s.client.HScan(s.hash, current, string(start), 0).Result()
if err != nil {
return nil, err
}
return &redisIterator{
current: current,
keys: keys,
client: s.client,
hash: s.hash,
}, nil
}

Expand All @@ -123,13 +130,14 @@ func (s *redisStorage) Open() error {
}

func (s *redisStorage) Close() error {
return s.client.Close()
return nil
}

type redisIterator struct {
current uint64
keys []string
client *redis.Client
hash string
}

func (i *redisIterator) exhausted() bool {
Expand Down Expand Up @@ -157,7 +165,7 @@ func (i *redisIterator) Value() ([]byte, error) {
return nil, nil
}
key := i.keys[i.current]
return i.client.Get(key).Bytes()
return i.client.HGet(i.hash, key).Bytes()
}

func (i *redisIterator) Release() {
Expand Down

0 comments on commit be3a119

Please sign in to comment.