Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

snapshot and batching removal, recovery transactions #31

Merged
merged 1 commit into from
Aug 25, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kafkamock.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type Tester interface {
// NewKafkaMock returns a new testprocessor mocking every external service
func NewKafkaMock(t Tester, groupName Group) *KafkaMock {
kafkaMock := &KafkaMock{
storage: storage.NewMock(new(codec.Bytes)),
storage: storage.NewMemory(new(codec.Bytes)),
t: t,
incomingEvents: make(chan kafka.Event),
consumerEvents: make(chan kafka.Event),
Expand Down
10 changes: 10 additions & 0 deletions mock/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,16 @@ func (_mr *_MockStorageRecorder) Iterator() *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Iterator")
}

func (_m *MockStorage) MarkRecovered() error {
ret := _m.ctrl.Call(_m, "MarkRecovered")
ret0, _ := ret[0].(error)
return ret0
}

func (_mr *_MockStorageRecorder) MarkRecovered() *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "MarkRecovered")
}

func (_m *MockStorage) Open() error {
ret := _m.ctrl.Call(_m, "Open")
ret0, _ := ret[0].(error)
Expand Down
112 changes: 40 additions & 72 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ package goka
import (
"fmt"
"path/filepath"
"time"

"github.com/lovoo/goka/kafka"
"github.com/lovoo/goka/logger"
"github.com/lovoo/goka/storage"

metrics "github.com/rcrowley/go-metrics"
"github.com/syndtr/goleveldb/leveldb"
)

// UpdateCallback is invoked upon arrival of a message for a table partition.
Expand All @@ -25,9 +25,22 @@ type StorageBuilder func(topic string, partition int32, codec Codec, reg metrics
///////////////////////////////////////////////////////////////////////////////

const (
defaultBaseStoragePath = "/tmp/goka"

defaultClientID = "goka"
)

// DefaultProcessorStoragePath is the default path where processor state
// will be stored.
func DefaultProcessorStoragePath(group Group) string {
return filepath.Join(defaultBaseStoragePath, "processor", string(group))
}

// DefaultViewStoragePath returns the default path where view state will be stored.
func DefaultViewStoragePath() string {
return filepath.Join(defaultBaseStoragePath, "view")
}

// DefaultUpdate is the default callback used to update the local storage with
// from the table topic in Kafka. It is called for every message received
// during recovery of processors and during the normal operation of views.
Expand All @@ -37,6 +50,20 @@ func DefaultUpdate(s storage.Storage, partition int32, key string, value []byte)
return s.SetEncoded(key, value)
}

// DefaultStorageBuilder builds a LevelDB storage with default configuration.
// The database will be stored in the given path.
func DefaultStorageBuilder(path string) StorageBuilder {
return func(topic string, partition int32, codec Codec, reg metrics.Registry) (storage.Storage, error) {
fp := filepath.Join(path, fmt.Sprintf("%s.%d", topic, partition))
db, err := leveldb.OpenFile(fp, nil)
if err != nil {
return nil, fmt.Errorf("error opening leveldb: %v", err)
}

return storage.New(db, codec)
}
}

type consumerBuilder func(brokers []string, group string, registry metrics.Registry) (kafka.Consumer, error)
type producerBuilder func(brokers []string, registry metrics.Registry) (kafka.Producer, error)
type topicmgrBuilder func(brokers []string) (kafka.TopicManager, error)
Expand Down Expand Up @@ -67,11 +94,9 @@ type poptions struct {
log logger.Logger
clientID string

updateCallback UpdateCallback
storagePath string
storageSnapshotInterval time.Duration
registry metrics.Registry
partitionChannelSize int
updateCallback UpdateCallback
registry metrics.Registry
partitionChannelSize int

builders struct {
storage StorageBuilder
Expand Down Expand Up @@ -141,13 +166,6 @@ func WithProducer(p kafka.Producer) ProcessorOption {
}
}

// WithStoragePath defines the base path for the local storage on disk
func WithStoragePath(storagePath string) ProcessorOption {
return func(o *poptions) {
o.storagePath = storagePath
}
}

// WithKafkaMetrics sets a go-metrics registry to collect
// kafka metrics.
// The metric-points are https://godoc.org/github.com/Shopify/sarama
Expand All @@ -166,15 +184,6 @@ func WithPartitionChannelSize(size int) ProcessorOption {
}
}

// WithStorageSnapshotInterval sets the interval in which the storage will snapshot to disk (if it is supported by the storage at all)
// Greater interval -> less writes to disk, more memory usage
// Smaller interval -> more writes to disk, less memory usage
func WithStorageSnapshotInterval(interval time.Duration) ProcessorOption {
return func(o *poptions) {
o.storageSnapshotInterval = interval
}
}

// WithLogger sets the logger the processor should use. By default, processors
// use the standard library logger.
func WithLogger(log logger.Logger) ProcessorOption {
Expand All @@ -199,9 +208,9 @@ func (opt *poptions) applyOptions(group string, opts ...ProcessorOption) error {
o(opt)
}

// config not set, use default one
// StorageBuilder should always be set as a default option in NewProcessor
if opt.builders.storage == nil {
opt.builders.storage = opt.defaultStorageBuilder
return fmt.Errorf("StorageBuilder not set")
}
if opt.builders.consumer == nil {
opt.builders.consumer = defaultConsumerBuilder
Expand All @@ -212,9 +221,6 @@ func (opt *poptions) applyOptions(group string, opts ...ProcessorOption) error {
if opt.builders.topicmgr == nil {
opt.builders.topicmgr = defaultTopicManagerBuilder
}
if opt.storageSnapshotInterval == 0 {
opt.storageSnapshotInterval = storage.DefaultStorageSnapshotInterval
}

// prefix registry
opt.gokaRegistry = metrics.NewPrefixedChildRegistry(opt.registry, fmt.Sprintf("goka.processor-%s.", group))
Expand All @@ -227,14 +233,6 @@ func (opt *poptions) applyOptions(group string, opts ...ProcessorOption) error {
return nil
}

func (opt *poptions) storagePathForPartition(topic string, partitionID int32) string {
return filepath.Join(opt.storagePath, "processor", fmt.Sprintf("%s.%d", topic, partitionID))
}

func (opt *poptions) defaultStorageBuilder(topic string, partition int32, codec Codec, reg metrics.Registry) (storage.Storage, error) {
return storage.New(opt.log, opt.storagePathForPartition(topic, partition), codec, reg, opt.storageSnapshotInterval)
}

///////////////////////////////////////////////////////////////////////////////
// view options
///////////////////////////////////////////////////////////////////////////////
Expand All @@ -243,13 +241,11 @@ func (opt *poptions) defaultStorageBuilder(topic string, partition int32, codec
type ViewOption func(*voptions)

type voptions struct {
log logger.Logger
tableCodec Codec
updateCallback UpdateCallback
storagePath string
storageSnapshotInterval time.Duration
registry metrics.Registry
partitionChannelSize int
log logger.Logger
tableCodec Codec
updateCallback UpdateCallback
registry metrics.Registry
partitionChannelSize int

builders struct {
storage StorageBuilder
Expand Down Expand Up @@ -316,22 +312,6 @@ func WithViewTopicManager(tm kafka.TopicManager) ViewOption {
}
}

// WithViewStoragePath defines the base path for the local storage on disk
func WithViewStoragePath(storagePath string) ViewOption {
return func(o *voptions) {
o.storagePath = storagePath
}
}

// WithViewStorageSnapshotInterval sets the interval in which the storage will snapshot to disk (if it is supported by the storage at all)
// Greater interval -> less writes to disk, more memory usage
// Smaller interval -> more writes to disk, less memory usage
func WithViewStorageSnapshotInterval(interval time.Duration) ViewOption {
return func(o *voptions) {
o.storageSnapshotInterval = interval
}
}

// WithViewKafkaMetrics sets a go-metrics registry to collect
// kafka metrics.
// The metric-points are https://godoc.org/github.com/Shopify/sarama
Expand All @@ -355,9 +335,9 @@ func (opt *voptions) applyOptions(topic Table, opts ...ViewOption) error {
o(opt)
}

// config not set, use default one
// StorageBuilder should always be set as a default option in NewView
if opt.builders.storage == nil {
opt.builders.storage = opt.defaultStorageBuilder
return fmt.Errorf("StorageBuilder not set")
}
if opt.builders.consumer == nil {
opt.builders.consumer = defaultConsumerBuilder
Expand All @@ -374,21 +354,9 @@ func (opt *voptions) applyOptions(topic Table, opts ...ViewOption) error {
// prefix registry
opt.gokaRegistry = metrics.NewPrefixedChildRegistry(opt.registry, fmt.Sprintf("goka.view-%s.", topic))

if opt.storageSnapshotInterval == 0 {
opt.storageSnapshotInterval = storage.DefaultStorageSnapshotInterval
}

return nil
}

func (opt *voptions) storagePathForPartition(topic string, partitionID int32) string {
return filepath.Join(opt.storagePath, "view", fmt.Sprintf("%s.%d", topic, partitionID))
}

func (opt *voptions) defaultStorageBuilder(topic string, partition int32, codec Codec, reg metrics.Registry) (storage.Storage, error) {
return storage.New(opt.log, opt.storagePathForPartition(topic, partition), codec, reg, opt.storageSnapshotInterval)
}

///////////////////////////////////////////////////////////////////////////////
// emitter options
///////////////////////////////////////////////////////////////////////////////
Expand Down
14 changes: 5 additions & 9 deletions options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package goka

import (
"fmt"
"regexp"
"testing"

"github.com/facebookgo/ensure"
Expand All @@ -10,17 +11,12 @@ import (
func newMockOptions(t *testing.T) *poptions {
opts := new(poptions)
err := opts.applyOptions("")
ensure.Err(t, err, regexp.MustCompile("StorageBuilder not set$"))

opts.builders.storage = nullStorageBuilder()
err = opts.applyOptions("")
ensure.Nil(t, err)
opts.storagePath = "/tmp/goka-test"

fmt.Printf("%+v\n", opts)
return opts
}

func TestOptions_storagePathForPartition(t *testing.T) {
topic := "test"
var id int32
opts := newMockOptions(t)
path := opts.storagePathForPartition(topic, id)
ensure.DeepEqual(t, path, fmt.Sprintf("/tmp/goka-test/processor/%s.%d", topic, id))
}
28 changes: 22 additions & 6 deletions partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ type partition struct {
readyFlag int32
initialHwm int64

st *storageProxy
proxy kafkaProxy
process processCallback
st *storageProxy
recoveredOnce sync.Once
proxy kafkaProxy
process processCallback

// metrics
registry metrics.Registry
Expand Down Expand Up @@ -89,9 +90,10 @@ func newPartition(log logger.Logger, topic string, cb processCallback, st *stora
dying: make(chan bool),
done: make(chan bool),

st: st,
proxy: proxy,
process: cb,
st: st,
recoveredOnce: sync.Once{},
proxy: proxy,
process: cb,

// metrics
registry: reg,
Expand Down Expand Up @@ -289,6 +291,13 @@ func (p *partition) load(catchup bool) error {
p.log.Printf("readyFlag was false when EOF arrived")
p.mxStatus.Update(partitionRecovered)
atomic.StoreInt32(&p.readyFlag, 1)
var err error
p.recoveredOnce.Do(func() {
err = p.st.MarkRecovered()
})
if err != nil {
return fmt.Errorf("error setting recovered: %v", err)
}
}
if catchup {
continue
Expand Down Expand Up @@ -318,6 +327,13 @@ func (p *partition) load(catchup bool) error {
p.mxStatus.Update(partitionRecovering)
} else {
p.mxStatus.Update(partitionRecovered)
var err error
p.recoveredOnce.Do(func() {
err = p.st.MarkRecovered()
})
if err != nil {
return fmt.Errorf("error setting recovered: %v", err)
}
}
case *kafka.NOP:
// don't do anything
Expand Down
6 changes: 5 additions & 1 deletion partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func newStorageProxy(st storage.Storage, id int32, update UpdateCallback) *stora

func newNullStorageProxy(id int32) *storageProxy {
return &storageProxy{
Storage: storage.NewMock(nil),
Storage: storage.NewMemory(nil),
partition: id,
stateless: true,
}
Expand Down Expand Up @@ -344,6 +344,7 @@ func TestPartition_runStatefulWithError(t *testing.T) {
st.EXPECT().Open().Return(nil),
st.EXPECT().GetOffset(int64(-2)).Return(int64(offset), nil),
proxy.EXPECT().Add(topic, offset),
st.EXPECT().MarkRecovered(),
proxy.EXPECT().Remove(topic),
proxy.EXPECT().AddGroup(),
st.EXPECT().Sync(),
Expand Down Expand Up @@ -427,6 +428,7 @@ func TestPartition_loadStateful(t *testing.T) {
st.EXPECT().SetEncoded(key, value),
st.EXPECT().SetOffset(int64(offset)).Return(nil),
st.EXPECT().Sync(),
st.EXPECT().MarkRecovered(),
proxy.EXPECT().Remove(topic),
proxy.EXPECT().AddGroup(),
st.EXPECT().Sync(),
Expand Down Expand Up @@ -629,6 +631,7 @@ func TestPartition_catchupStateful(t *testing.T) {
st.EXPECT().SetEncoded(key, value),
st.EXPECT().SetOffset(offset+1).Return(nil),
st.EXPECT().Sync(),
st.EXPECT().MarkRecovered(),
st.EXPECT().SetEncoded(key, value),
st.EXPECT().SetOffset(offset+2).Return(nil),
st.EXPECT().Sync(),
Expand Down Expand Up @@ -749,6 +752,7 @@ func TestPartition_catchupStatefulWithError(t *testing.T) {
st.EXPECT().SetEncoded(key, value),
st.EXPECT().SetOffset(offset+1).Return(nil),
st.EXPECT().Sync(),
st.EXPECT().MarkRecovered(),
proxy.EXPECT().Remove(topic),
st.EXPECT().Close().Return(nil),
proxy.EXPECT().Stop(),
Expand Down
4 changes: 2 additions & 2 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ func NewProcessor(brokers []string, gg *GroupGraph, options ...ProcessorOption)
WithRegistry(metrics.NewRegistry()),
WithLogger(logger.Default()),
WithUpdateCallback(DefaultUpdate),
WithStoragePath("/tmp/goka"),
WithPartitionChannelSize(defaultPartitionChannelSize),
WithStorageBuilder(DefaultStorageBuilder(DefaultProcessorStoragePath(gg.Group()))),
},

// user-defined options (may overwrite default ones)
Expand Down Expand Up @@ -470,7 +470,7 @@ func (g *Processor) newJoinStorage(topic string, id int32, codec Codec, update U
func (g *Processor) newStorage(topic string, id int32, codec Codec, update UpdateCallback, reg metrics.Registry) (*storageProxy, error) {
if g.isStateless() {
return &storageProxy{
Storage: storage.NewMock(codec),
Storage: storage.NewMemory(codec),
partition: id,
stateless: true,
}, nil
Expand Down
Loading