Skip to content

Commit

Permalink
Evict method for Views
Browse files Browse the repository at this point in the history
* Adds an Evict method for Views. Evict clears the key from the
  local cache.
  • Loading branch information
SamiHiltunen committed Oct 4, 2017
1 parent 872d4df commit e7e6a0f
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 0 deletions.
32 changes: 32 additions & 0 deletions mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,43 @@ package goka

import (
"fmt"
"hash"
"log"

"github.com/Shopify/sarama"
)

// constHasher implements a hasher that will always return the specified
// partition. Doesn't properly implement the Hash32 interface, use only in
// tests.
type constHasher struct {
partition int32
}

func (ch *constHasher) Sum(b []byte) []byte {
return nil
}

func (ch *constHasher) Sum32() uint32 {
return 0
}

func (ch *constHasher) BlockSize() int {
return 0
}

func (ch *constHasher) Reset() {}

func (ch *constHasher) Size() int { return 4 }

func (ch *constHasher) Write(p []byte) (n int, err error) {
return len(p), nil
}

func NewConstHasher(part int32) hash.Hash32 {
return &constHasher{partition: part}
}

type clientMock struct {
topics []string
partitions []int32
Expand Down
11 changes: 11 additions & 0 deletions view.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,17 @@ func (v *View) Iterator() (storage.Iterator, error) {
return storage.NewMultiIterator(iters), nil
}

// Evict removes the given key only from the local cache. In order to delete a
// key from Kafka and other Views, context.Delete should be used on a Processor.
func (v *View) Evict(key string) error {
s, err := v.find(key)
if err != nil {
return err
}

return s.Delete(key)
}

func (v *View) run() {
defer close(v.done)
v.opts.log.Printf("View: started")
Expand Down
32 changes: 32 additions & 0 deletions view_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package goka

import (
"errors"
"hash"
"testing"
"time"

Expand Down Expand Up @@ -283,6 +284,37 @@ func TestNewView(t *testing.T) {
ensure.True(t, len(v.partitions) == 3)
}

func TestView_Evict(t *testing.T) {
key := "some-key"
val := "some-val"

st := storage.NewMemory(new(codec.String))
err := st.Set(key, val)
ensure.Nil(t, err)

v := &View{
partitions: []*partition{
{st: &storageProxy{partition: 0, Storage: st}},
},
opts: &voptions{
hasher: func() hash.Hash32 {
return NewConstHasher(0)
},
},
}

vinf, err := v.Get(key)
ensure.Nil(t, err)
ensure.DeepEqual(t, vinf, val)

err = v.Evict(key)
ensure.Nil(t, err)

vinf, err = v.Get(key)
ensure.Nil(t, err)
ensure.Nil(t, vinf)
}

func doTimed(t *testing.T, do func()) error {
ch := make(chan bool)
go func() {
Expand Down

0 comments on commit e7e6a0f

Please sign in to comment.