Skip to content

Commit

Permalink
Merge pull request #200 from lovoo/protect_storage_writes
Browse files Browse the repository at this point in the history
protect storage writes
  • Loading branch information
Benjamin Riedel committed Sep 4, 2019
2 parents 55b1948 + c42bfe8 commit 80207ca
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 0 deletions.
11 changes: 11 additions & 0 deletions storage/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"fmt"
"strings"
"sync"

"github.com/syndtr/goleveldb/leveldb/util"
)
Expand Down Expand Up @@ -70,6 +71,8 @@ type memory struct {
storage map[string][]byte
offset *int64
recovered bool
// mutex to protect map reads/writes
rwm sync.RWMutex
}

// NewMemory returns a new in-memory storage.
Expand All @@ -81,25 +84,33 @@ func NewMemory() Storage {
}

func (m *memory) Has(key string) (bool, error) {
m.rwm.RLock()
_, has := m.storage[key]
m.rwm.RUnlock()
return has, nil
}

func (m *memory) Get(key string) ([]byte, error) {
m.rwm.RLock()
value, _ := m.storage[key]
m.rwm.RUnlock()
return value, nil
}

func (m *memory) Set(key string, value []byte) error {
if value == nil {
return fmt.Errorf("cannot write nil value")
}
m.rwm.Lock()
m.storage[key] = value
m.rwm.Unlock()
return nil
}

func (m *memory) Delete(key string) error {
m.rwm.Lock()
delete(m.storage, key)
m.rwm.Unlock()
return nil
}

Expand Down
12 changes: 12 additions & 0 deletions tester/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type Tester struct {
codecs map[string]goka.Codec
topicQueues map[string]*queue
mQueues sync.RWMutex
mStorages sync.RWMutex

queuedMessages []*queuedMessage
}
Expand Down Expand Up @@ -231,11 +232,16 @@ func (km *Tester) EmitterProducerBuilder() kafka.ProducerBuilder {
// to a processor
func (km *Tester) StorageBuilder() storage.Builder {
return func(topic string, partition int32) (storage.Storage, error) {
km.mStorages.RLock()
if st, exists := km.storages[topic]; exists {
km.mStorages.RUnlock()
return st, nil
}
km.mStorages.RUnlock()
st := storage.NewMemory()
km.mStorages.Lock()
km.storages[topic] = st
km.mStorages.Unlock()
return st, nil
}
}
Expand Down Expand Up @@ -323,7 +329,9 @@ func (km *Tester) TableValue(table goka.Table, key string) interface{} {
km.waitStartup()

topic := string(table)
km.mStorages.RLock()
st, exists := km.storages[topic]
km.mStorages.RUnlock()
if !exists {
panic(fmt.Errorf("topic %s does not exist", topic))
}
Expand All @@ -348,7 +356,9 @@ func (km *Tester) SetTableValue(table goka.Table, key string, value interface{})
logger.Printf("setting value is not implemented yet.")

topic := string(table)
km.mStorages.RLock()
st, exists := km.storages[topic]
km.mStorages.RUnlock()
if !exists {
panic(fmt.Errorf("storage for topic %s does not exist", topic))
}
Expand All @@ -370,13 +380,15 @@ func (km *Tester) ReplaceEmitHandler(emitter EmitHandler) {

// ClearValues resets all table values
func (km *Tester) ClearValues() {
km.mStorages.Lock()
for topic, st := range km.storages {
logger.Printf("clearing all values from storage for topic %s", topic)
it, _ := st.Iterator()
for it.Next() {
st.Delete(string(it.Key()))
}
}
km.mStorages.Unlock()
}

type topicMgrMock struct {
Expand Down

0 comments on commit 80207ca

Please sign in to comment.