diff --git a/README.md b/README.md index d36fc98d..3b5a5887 100644 --- a/README.md +++ b/README.md @@ -50,6 +50,8 @@ Goka relies on Kafka for message passing, fault-tolerant state storage and workl * **Views** are local caches of a complete group table. Views provide read-only access to the group tables and can be used to provide external services for example through a gRPC interface. +* **Local storage** keeps a local copy of the group table partitions to speedup recovery and reduce memory utilization. By default, the local storage uses [LevelDB](https://github.com/syndtr/goleveldb), but in-memory map and [Redis-based storage](tree/master/storage/redis) are also available. + ## Get Started diff --git a/examples/redis/README.md b/examples/redis/README.md new file mode 100644 index 00000000..7060c82c --- /dev/null +++ b/examples/redis/README.md @@ -0,0 +1,69 @@ +# Goka Example (Redis) + +Using [Redis](https://redis.io/) as option of storage. + +This example has an **Emitter** and one **Processor** with when this emitter +generates events with random keys (_user_id_) consumed by Kafka that uses +Redis as storage/cache. + + +## Usage + +It's easy to configures this example. + + 1. Starts the Kafka present on [`docker-compose`](kafka.yml) file with command bellow: + + ```console + docker-compose -f kafka.yml -p ci up -d + ``` + + 2. Check the status of docker containers: + + ```console + docker ps + ``` + + Resulting for example: + + ```console + CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES + 38a6efb145ba wurstmeister/kafka "start-kafka.sh" 9 seconds ago Up 4 seconds 0.0.0.0:9092->9092/tcp, 0.0.0.0:9997->9997/tcp kafka + 48df80931a1f confluent/zookeeper "/usr/local/bin/zk-d…" 10 seconds ago Up 5 seconds 2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp zookeeper + 040fbe9dfc13 redis:latest "docker-entrypoint.s…" 10 seconds ago Up 5 seconds 0.0.0.0:6379->6379/tcp redis + ``` + + This same configuration should be present on `config.yaml` file, with kafka an redis like: + + ```yaml + kafka: + brokers: + - "127.0.0.1:9092" + group: "examples" + stream: "events" + redis: "127.0.0.1:6379" + namespace: "producer" + ``` + + Where: + * **brokers** : slice of kafka brokers hosts. + * **group** : group name of this example belongs. + * **stream**: stream name of this example belongs. + * **redis**: address of redis (`localhost:6379`). + * **namespace**: namespace distinguish applications that write to the same keys on Redis. + + 3. Fetch the go [redis](gopkg.in/redis.v5) client package: + + ```console + go get -u gopkg.in/redis.v5 + ``` + + 4. Build and run the example: + + ```console + go build . + ./redis -config config.yaml + ``` + + The events are produced and consumed by Kafka with random keys. It's possible + run several of the same binary and check the behaviour of kafka + rebalancing and removing partitions without broken. diff --git a/examples/redis/codec.go b/examples/redis/codec.go new file mode 100644 index 00000000..d20dab77 --- /dev/null +++ b/examples/redis/codec.go @@ -0,0 +1,18 @@ +package main + +import "encoding/json" + +type Codec struct{} + +// Encode encodes a event struct into an array. +func (c *Codec) Encode(value interface{}) ([]byte, error) { + return json.Marshal(value) +} + +// Decode decodes a event from byte encoded array. +func (c *Codec) Decode(data []byte) (interface{}, error) { + event := new(Event) + + err := json.Unmarshal(data, event) + return event, err +} diff --git a/examples/redis/config.yaml b/examples/redis/config.yaml new file mode 100644 index 00000000..827b6e30 --- /dev/null +++ b/examples/redis/config.yaml @@ -0,0 +1,8 @@ + +kafka: + brokers: + - "127.0.0.1:9092" + group: "examples" + stream: "events" + redis: "127.0.0.1:6379" + namespace: "producer" diff --git a/examples/redis/consumer.go b/examples/redis/consumer.go new file mode 100644 index 00000000..16d3b4c4 --- /dev/null +++ b/examples/redis/consumer.go @@ -0,0 +1,46 @@ +package main + +import ( + "context" + + "github.com/lovoo/goka" + storage "github.com/lovoo/goka/storage/redis" + + redis "gopkg.in/redis.v5" +) + +// Publisher defines an interface to Publish the event somewhere. +type Publisher interface { + Publish(ctx context.Context, key string, event *Event) error + Close() error +} + +// Consume starts goka events consumer. +func Consume(pub Publisher, brokers []string, group string, stream string, store string, namespace string) error { + codec := new(Codec) + + input := goka.Input(goka.Stream(stream), codec, func(ctx goka.Context, msg interface{}) { + event, ok := msg.(*Event) + if ok { + pub.Publish(context.Background(), ctx.Key(), event) + } + }) + graph := goka.DefineGroup(goka.Group(group), input, goka.Persist(codec)) + + opts := []goka.ProcessorOption{} + switch { + case store != "": + client := redis.NewClient(&redis.Options{ + Addr: store, + }) + opts = append(opts, goka.WithStorageBuilder(storage.RedisBuilder(client, namespace))) + defer client.Close() + } + processor, err := goka.NewProcessor(brokers, graph, opts...) + if err != nil { + return err + } + defer processor.Stop() + + return processor.Start() +} diff --git a/examples/redis/event.go b/examples/redis/event.go new file mode 100644 index 00000000..6e47d0b1 --- /dev/null +++ b/examples/redis/event.go @@ -0,0 +1,6 @@ +package main + +type Event struct { + UserID string `json:"user_id"` + Timestamp int64 `json:"timestamp"` +} diff --git a/examples/redis/kafka.yml b/examples/redis/kafka.yml new file mode 100644 index 00000000..94738833 --- /dev/null +++ b/examples/redis/kafka.yml @@ -0,0 +1,39 @@ +version: '2' +services: + +# zookeeper ========================== + zookeeper: + container_name: zookeeper + image: confluent/zookeeper + ports: + - "2181:2181" + +# kafka ============================== + kafka: + container_name: kafka + image: wurstmeister/kafka + volumes: + - /var/run/docker.sock:/var/run/docker.sock + depends_on: + - zookeeper + - redis + ports: + - "9092:9092" + - "9997:9997" + environment: + KAFKA_PORT: 9092 + KAFKA_ADVERTISED_PORT: 9092 + KAFKA_ADVERTISED_HOST_NAME: kafka + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_NUM_PARTITIONS: 10 + KAFKA_DEFAULT_REPLICATION_FACTOR: 1 + KAFKA_REPLICATION_FACTOR: 1 + +# redis ======================================= + redis: + container_name: redis + image: redis:latest + ports: + - "6379:6379" + expose: + - "6379" diff --git a/examples/redis/main.go b/examples/redis/main.go new file mode 100644 index 00000000..5a218c55 --- /dev/null +++ b/examples/redis/main.go @@ -0,0 +1,82 @@ +package main + +import ( + "context" + "flag" + "fmt" + "io/ioutil" + "log" + "math/rand" + "strconv" + "time" + + yaml "gopkg.in/yaml.v2" +) + +type Config struct { + Kafka struct { + Brokers []string `yaml:"brokers"` + Group string `yaml:"group"` + Stream string `yaml:"stream"` + Redis string `yaml:"redis"` + Namespace string `yaml:"namespace"` + } `yaml:"kafka"` +} + +var ( + filename = flag.String("config", "config.yaml", "path to config file") +) + +func main() { + flag.Parse() + + conf, err := readConfig(*filename) + if err != nil { + log.Fatal(err) + } + + // consuming + go func() { + err := Consume(new(nopPublisher), conf.Kafka.Brokers, conf.Kafka.Group, + conf.Kafka.Stream, conf.Kafka.Redis, conf.Kafka.Namespace) + if err != nil { + log.Fatal(err) + } + }() + + // producing + producer, err := NewProducer(conf.Kafka.Brokers, conf.Kafka.Stream) + for { + event := &Event{ + UserID: strconv.FormatInt(rand.Int63n(255), 10), + Timestamp: time.Now().UnixNano(), + } + fmt.Printf("emit ->key: `%v` ->event: `%v`\n", event.UserID, event) + err = producer.Emit(event.UserID, event) + if err != nil { + log.Fatal(err) + } + time.Sleep(5 * time.Second) + } +} + +func readConfig(filename string) (*Config, error) { + b, err := ioutil.ReadFile(filename) + if err != nil { + return nil, err + } + conf := new(Config) + err = yaml.Unmarshal(b, conf) + return conf, err +} + +type nopPublisher struct{} + +func (p *nopPublisher) Publish(ctx context.Context, key string, event *Event) error { + fmt.Printf("published ->key: `%v` ->event: `%v`\n", key, event) + return nil +} + +func (p *nopPublisher) Close() error { + return nil +} diff --git a/examples/redis/producer.go b/examples/redis/producer.go new file mode 100644 index 00000000..a8fa2dac --- /dev/null +++ b/examples/redis/producer.go @@ -0,0 +1,32 @@ +package main + +import "github.com/lovoo/goka" + +// Producer defines an interface whose events are produced on kafka. +type Producer interface { + Emit(key string, event *Event) error + Close() error +} + +type kafkaProducer struct { + emitter *goka.Emitter +} + +// NewProducer returns a new kafka producer. +func NewProducer(brokers []string, stream string) (Producer, error) { + codec := new(Codec) + emitter, err := goka.NewEmitter(brokers, goka.Stream(stream), codec) + if err != nil { + return nil, err + } + return &kafkaProducer{emitter}, nil +} + +func (p *kafkaProducer) Emit(key string, event *Event) error { + return p.emitter.EmitSync(key, event) +} + +func (p *kafkaProducer) Close() error { + p.emitter.Finish() + return nil +} diff --git a/storage/append.go b/storage/append.go index d5ae23a1..f76ffd21 100644 --- a/storage/append.go +++ b/storage/append.go @@ -14,6 +14,7 @@ type file struct { bytesWritten int64 } +// NewFile retuns a new on-disk storage. func NewFile(path string, part int32) (Storage, error) { if err := os.MkdirAll(path, os.ModePerm); err != nil { return nil, fmt.Errorf("error creating storage directory: %v", err) diff --git a/storage/redis/builders.go b/storage/redis/builders.go new file mode 100644 index 00000000..c640bc6a --- /dev/null +++ b/storage/redis/builders.go @@ -0,0 +1,20 @@ +package redis + +import ( + "errors" + "fmt" + + "github.com/lovoo/goka/storage" + + redis "gopkg.in/redis.v5" +) + +// RedisBuilder builds redis storage. +func RedisBuilder(client *redis.Client, namespace string) storage.Builder { + return func(topic string, partition int32) (storage.Storage, error) { + if namespace == "" { + return nil, errors.New("missing namespace to redis storage") + } + return New(client, fmt.Sprintf("%s:%s:%d", namespace, topic, partition)) + } +} diff --git a/storage/redis/redis.go b/storage/redis/redis.go new file mode 100644 index 00000000..797e8358 --- /dev/null +++ b/storage/redis/redis.go @@ -0,0 +1,177 @@ +package redis + +import ( + "errors" + "fmt" + "strconv" + + "github.com/lovoo/goka/storage" + + redis "gopkg.in/redis.v5" +) + +const ( + offsetKey = "__offset" +) + +type redisStorage struct { + client *redis.Client + hash string +} + +// New creates a new Storage backed by Redis. +func New(client *redis.Client, hash string) (storage.Storage, error) { + if client == nil { + return nil, errors.New("invalid redis client") + } + if err := client.Ping().Err(); err != nil { + return nil, err + } + return &redisStorage{ + client: client, + hash: hash, + }, nil +} + +func (s *redisStorage) Has(key string) (bool, error) { + return s.client.HExists(s.hash, key).Result() +} + +func (s *redisStorage) Get(key string) ([]byte, error) { + has, err := s.client.HExists(s.hash, key).Result() + if err != nil { + return nil, fmt.Errorf("error checking for existence in redis (key %s): %v", key, err) + } else if !has { + return nil, nil + } + value, err := s.client.HGet(s.hash, key).Bytes() + if err != nil { + return nil, fmt.Errorf("error getting from redis (key %s): %v", key, err) + } + return value, nil +} + +func (s *redisStorage) GetOffset(defValue int64) (int64, error) { + data, err := s.Get(offsetKey) + if err != nil { + return 0, err + } + if data == nil { + return defValue, nil + } + + value, err := strconv.ParseInt(string(data), 10, 64) + if err != nil { + return 0, fmt.Errorf("error decoding redis offset (%s): %v", string(data), err) + } + return value, nil +} + +func (s *redisStorage) Set(key string, value []byte) error { + err := s.client.HSet(s.hash, key, value).Err() + if err != nil { + return fmt.Errorf("error setting to redis (key %s): %v", key, err) + } + return nil +} + +func (s *redisStorage) SetOffset(offset int64) error { + return s.Set(offsetKey, []byte(strconv.FormatInt(offset, 10))) +} + +func (s *redisStorage) Delete(key string) error { + return s.client.HDel(s.hash, key).Err() +} + +func (s *redisStorage) Iterator() (storage.Iterator, error) { + var current uint64 + var keys []string + var err error + + keys, current, err = s.client.HScan(s.hash, current, "", 0).Result() + if err != nil { + return nil, err + } + return &redisIterator{ + current: current, + keys: keys, + client: s.client, + hash: s.hash, + }, nil +} + +func (s *redisStorage) IteratorWithRange(start, limit []byte) (storage.Iterator, error) { + var current uint64 + var keys []string + var err error + + keys, current, err = s.client.HScan(s.hash, current, string(start), 0).Result() + if err != nil { + return nil, err + } + return &redisIterator{ + current: current, + keys: keys, + client: s.client, + hash: s.hash, + }, nil +} + +func (s *redisStorage) Recovered() bool { + return false +} + +func (s *redisStorage) MarkRecovered() error { + return nil +} + +func (s *redisStorage) Open() error { + return nil +} + +func (s *redisStorage) Close() error { + return nil +} + +type redisIterator struct { + current uint64 + keys []string + client *redis.Client + hash string +} + +func (i *redisIterator) exhausted() bool { + return uint64(len(i.keys)) <= i.current +} + +func (i *redisIterator) Next() bool { + i.current++ + if string(i.Key()) == offsetKey { + i.current++ + } + return !i.exhausted() +} + +func (i *redisIterator) Key() []byte { + if i.exhausted() { + return nil + } + key := i.keys[i.current] + return []byte(key) +} + +func (i *redisIterator) Value() ([]byte, error) { + if i.exhausted() { + return nil, nil + } + key := i.keys[i.current] + return i.client.HGet(i.hash, key).Bytes() +} + +func (i *redisIterator) Release() { + i.current = uint64(len(i.keys)) +} + +func (i *redisIterator) Seek(key []byte) bool { + return !i.exhausted() +}