Skip to content

Commit

Permalink
Merge pull request #38 from lovoo/context-delete
Browse files Browse the repository at this point in the history
deletion semantics
  • Loading branch information
db7 committed Sep 29, 2017
2 parents eea4c59 + 6bf0ab9 commit f475664
Show file tree
Hide file tree
Showing 6 changed files with 235 additions and 7 deletions.
40 changes: 37 additions & 3 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ type Context interface {
// SetValue updates the value of the key in the group table.
SetValue(value interface{})

// Delete deletes a value from the group table. IMPORTANT: this deletes the
// value associated with the key from both the local cache and the persisted
// table in Kafka.
Delete()

// Timestamp returns the timestamp of the input message. If the timestamp is
// invalid, a zero time will be returned.
Timestamp() time.Time
Expand Down Expand Up @@ -90,9 +95,14 @@ func (ctx *context) Emit(topic Stream, key string, value interface{}) {
if c == nil {
ctx.Fail(fmt.Errorf("no codec for topic %s", topic))
}
data, err := c.Encode(value)
if err != nil {
ctx.Fail(fmt.Errorf("error encoding message for topic %s: %v", topic, err))

var data []byte
if value != nil {
var err error
data, err = c.Encode(value)
if err != nil {
ctx.Fail(fmt.Errorf("error encoding message for topic %s: %v", topic, err))
}
}

ctx.emit(string(topic), key, data)
Expand Down Expand Up @@ -123,6 +133,12 @@ func (ctx *context) emit(topic string, key string, value []byte) {
})
}

func (ctx *context) Delete() {
if err := ctx.deleteKey(ctx.Key()); err != nil {
ctx.Fail(err)
}
}

// Value returns the value of the key in the group table.
func (ctx *context) Value() interface{} {
val, err := ctx.valueForKey(string(ctx.msg.Key))
Expand Down Expand Up @@ -204,6 +220,24 @@ func (ctx *context) valueForKey(key string) (interface{}, error) {
return ctx.storage.Get(key)
}

func (ctx *context) deleteKey(key string) error {
if ctx.graph.GroupTable() == nil {
return fmt.Errorf("Cannot access state in stateless processor")
}

ctx.counters.stores++
if err := ctx.storage.Delete(key); err != nil {
return fmt.Errorf("error deleting key (%s) from storage: %v", key, err)
}

ctx.counters.emits++
ctx.emitter(ctx.graph.GroupTable().Topic(), key, nil).Then(func(err error) {
ctx.emitDone(err)
})

return nil
}

// setValueForKey sets a value for a key in the processor state.
func (ctx *context) setValueForKey(key string, value interface{}) error {
if ctx.graph.GroupTable() == nil {
Expand Down
79 changes: 79 additions & 0 deletions context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package goka
import (
"errors"
"fmt"
"regexp"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -171,6 +172,84 @@ func TestContext_GetSetStateless(t *testing.T) {
}()
}

func TestContext_Delete(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
storage := mock.NewMockStorage(ctrl)

offset := int64(123)
ack := 0
key := "key"

ctx := &context{
graph: DefineGroup(group, Persist(new(codec.String))),
storage: storage,
wg: new(sync.WaitGroup),
commit: func() { ack++ },
msg: &message{Offset: offset},
}

gomock.InOrder(
storage.EXPECT().Delete(key),
)
ctx.emitter = newEmitter(nil, nil)

ctx.start()
err := ctx.deleteKey(key)
ensure.Nil(t, err)
ctx.finish()

ctx.wg.Wait()

ensure.DeepEqual(t, ctx.counters, struct {
emits int
dones int
stores int
}{1, 1, 1})
}

func TestContext_DeleteStateless(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

offset := int64(123)
key := "key"

ctx := &context{
graph: DefineGroup(group),
wg: new(sync.WaitGroup),
msg: &message{Offset: offset},
}
ctx.emitter = newEmitter(nil, nil)

err := ctx.deleteKey(key)
ensure.Err(t, err, regexp.MustCompile("^Cannot access state in stateless processor$"))
}

func TestContext_DeleteStorageError(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
storage := mock.NewMockStorage(ctrl)

offset := int64(123)
key := "key"

ctx := &context{
graph: DefineGroup(group, Persist(new(codec.String))),
storage: storage,
wg: new(sync.WaitGroup),
msg: &message{Offset: offset},
}

gomock.InOrder(
storage.EXPECT().Delete(key).Return(fmt.Errorf("storage error")),
)
ctx.emitter = newEmitter(nil, nil)

err := ctx.deleteKey(key)
ensure.Err(t, err, regexp.MustCompile("^error deleting key \\(key\\) from storage: storage error$"))
}

func TestContext_Set(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down
11 changes: 10 additions & 1 deletion examples/testing/context_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
package main

import (
time "time"

gomock "github.com/golang/mock/gomock"
goka "github.com/lovoo/goka"
time "time"
)

// Mock of Context interface
Expand All @@ -30,6 +31,14 @@ func (_m *MockContext) EXPECT() *_MockContextRecorder {
return _m.recorder
}

func (_m *MockContext) Delete() {
_m.ctrl.Call(_m, "Delete")
}

func (_mr *_MockContextRecorder) Delete() *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Delete")
}

func (_m *MockContext) Emit(_param0 goka.Stream, _param1 string, _param2 interface{}) {
_m.ctrl.Call(_m, "Emit", _param0, _param1, _param2)
}
Expand Down
4 changes: 4 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ func DefaultViewStoragePath() string {
// DefaultUpdate can be used in the function passed to WithUpdateCallback and
// WithViewCallback.
func DefaultUpdate(s storage.Storage, partition int32, key string, value []byte) error {
if value == nil {
s.Delete(key)
}

return s.SetEncoded(key, value)
}

Expand Down
12 changes: 9 additions & 3 deletions processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,7 @@ func TestProcessor_StartWithErrorAfterRebalance(t *testing.T) {
final = make(chan bool)
ch = make(chan kafka.Event)
p = createProcessor(t, ctrl, consumer, 3, sb)
value = []byte("value")
)

// -- expectations --
Expand All @@ -586,7 +587,7 @@ func TestProcessor_StartWithErrorAfterRebalance(t *testing.T) {
consumer.EXPECT().AddPartition(tableName(group), int32(2), int64(123))
// 3. message
gomock.InOrder(
st.EXPECT().SetEncoded("key", nil).Return(nil),
st.EXPECT().SetEncoded("key", value).Return(nil),
st.EXPECT().SetOffset(int64(1)),
st.EXPECT().MarkRecovered(),
st.EXPECT().Sync(),
Expand Down Expand Up @@ -621,6 +622,7 @@ func TestProcessor_StartWithErrorAfterRebalance(t *testing.T) {
Partition: 1,
Offset: 1,
Key: "key",
Value: value,
}
err = syncWith(t, ch, 1) // with partition
ensure.Nil(t, err)
Expand Down Expand Up @@ -650,6 +652,7 @@ func TestProcessor_Start(t *testing.T) {
final = make(chan bool)
ch = make(chan kafka.Event)
p = createProcessor(t, ctrl, consumer, 3, sb)
value = []byte("value")
)

// -- expectations --
Expand All @@ -663,7 +666,7 @@ func TestProcessor_Start(t *testing.T) {
consumer.EXPECT().AddPartition(tableName(group), int32(1), int64(123))
consumer.EXPECT().AddPartition(tableName(group), int32(2), int64(123))
// 3. load message partition 1
st.EXPECT().SetEncoded("key", nil).Return(nil)
st.EXPECT().SetEncoded("key", value).Return(nil)
st.EXPECT().SetOffset(int64(1))
st.EXPECT().Sync()
st.EXPECT().MarkRecovered()
Expand Down Expand Up @@ -708,6 +711,7 @@ func TestProcessor_Start(t *testing.T) {
Partition: 1,
Offset: 1,
Key: "key",
Value: value,
}
err = syncWith(t, ch, 1) // with partition 1
ensure.Nil(t, err)
Expand Down Expand Up @@ -802,6 +806,7 @@ func TestProcessor_StartWithTable(t *testing.T) {
ch = make(chan kafka.Event)
p = createProcessorWithTable(ctrl, consumer, 3, sb)
producer = p.producer.(*mock.MockProducer)
value = []byte("value")
)

// -- expectations --
Expand All @@ -818,7 +823,7 @@ func TestProcessor_StartWithTable(t *testing.T) {
consumer.EXPECT().AddPartition(table, int32(1), int64(123))
consumer.EXPECT().AddPartition(table, int32(2), int64(123))
// 3. message to group table
st.EXPECT().SetEncoded("key", nil).Return(nil)
st.EXPECT().SetEncoded("key", value).Return(nil)
st.EXPECT().SetOffset(int64(1))
st.EXPECT().MarkRecovered()
st.EXPECT().Sync()
Expand Down Expand Up @@ -867,6 +872,7 @@ func TestProcessor_StartWithTable(t *testing.T) {
Partition: 1,
Offset: 1,
Key: "key",
Value: value,
}
err = syncWith(t, ch, 1)
ensure.Nil(t, err)
Expand Down
96 changes: 96 additions & 0 deletions storage/append.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package storage

import (
"fmt"
"io"
"os"
"path/filepath"
)

type file struct {
file io.WriteCloser
codec Codec
recovered bool

bytesWritten int64
}

func NewFile(path string, part int32, codec Codec) (Storage, error) {
if err := os.MkdirAll(path, os.ModePerm); err != nil {
return nil, fmt.Errorf("error creating storage directory: %v", err)
}

f, err := os.OpenFile(filepath.Join(path, fmt.Sprintf("part-%d", part)), os.O_CREATE|os.O_RDWR|os.O_APPEND, os.ModePerm)
if err != nil {
return nil, err
}

return &file{file: f}, nil
}

func (f *file) Recovered() bool {
return f.recovered
}

func (f *file) MarkRecovered() error {
f.recovered = true
return nil
}

func (f *file) Has(key string) (bool, error) {
return false, nil
}

func (f *file) Get(key string) (interface{}, error) {
return nil, nil
}

func (f *file) Set(key string, val interface{}) error {
data, err := f.codec.Encode(val)
if err != nil {
return err
}

return f.SetEncoded(key, data)
}

func (f *file) SetEncoded(key string, val []byte) error {
num, err := f.file.Write(val)
if err != nil {
return err
}

f.bytesWritten += int64(num)

if _, err := f.file.Write([]byte("\n")); err != nil {
return err
}

return nil
}

func (f *file) Delete(string) error {
return nil
}

func (f *file) GetOffset(def int64) (int64, error) {
return def, nil
}

func (f *file) SetOffset(val int64) error {
return nil
}

func (f *file) Iterator() (Iterator, error) {
return new(NullIter), nil
}

func (f *file) Open() error {
return nil
}

func (f *file) Close() error {
return f.file.Close()
}

func (f *file) Sync() {}

0 comments on commit f475664

Please sign in to comment.