diff --git a/mocks.go b/mocks.go index 35306142..bd33978f 100644 --- a/mocks.go +++ b/mocks.go @@ -2,11 +2,43 @@ package goka import ( "fmt" + "hash" "log" "github.com/Shopify/sarama" ) +// constHasher implements a hasher that will always return the specified +// partition. Doesn't properly implement the Hash32 interface, use only in +// tests. +type constHasher struct { + partition uint32 +} + +func (ch *constHasher) Sum(b []byte) []byte { + return nil +} + +func (ch *constHasher) Sum32() uint32 { + return ch.partition +} + +func (ch *constHasher) BlockSize() int { + return 0 +} + +func (ch *constHasher) Reset() {} + +func (ch *constHasher) Size() int { return 4 } + +func (ch *constHasher) Write(p []byte) (n int, err error) { + return len(p), nil +} + +func NewConstHasher(part uint32) hash.Hash32 { + return &constHasher{partition: part} +} + type clientMock struct { topics []string partitions []int32 diff --git a/view.go b/view.go index abd39ea1..2cb87597 100644 --- a/view.go +++ b/view.go @@ -265,6 +265,17 @@ func (v *View) Iterator() (storage.Iterator, error) { return storage.NewMultiIterator(iters), nil } +// Evict removes the given key only from the local cache. In order to delete a +// key from Kafka and other Views, context.Delete should be used on a Processor. +func (v *View) Evict(key string) error { + s, err := v.find(key) + if err != nil { + return err + } + + return s.Delete(key) +} + func (v *View) run() { defer close(v.done) v.opts.log.Printf("View: started") diff --git a/view_test.go b/view_test.go index 04ecb50e..2677630c 100644 --- a/view_test.go +++ b/view_test.go @@ -2,6 +2,7 @@ package goka import ( "errors" + "hash" "testing" "time" @@ -283,6 +284,37 @@ func TestNewView(t *testing.T) { ensure.True(t, len(v.partitions) == 3) } +func TestView_Evict(t *testing.T) { + key := "some-key" + val := "some-val" + + st := storage.NewMemory(new(codec.String)) + err := st.Set(key, val) + ensure.Nil(t, err) + + v := &View{ + partitions: []*partition{ + {st: &storageProxy{partition: 0, Storage: st}}, + }, + opts: &voptions{ + hasher: func() hash.Hash32 { + return NewConstHasher(0) + }, + }, + } + + vinf, err := v.Get(key) + ensure.Nil(t, err) + ensure.DeepEqual(t, vinf, val) + + err = v.Evict(key) + ensure.Nil(t, err) + + vinf, err = v.Get(key) + ensure.Nil(t, err) + ensure.Nil(t, vinf) +} + func doTimed(t *testing.T, do func()) error { ch := make(chan bool) go func() {