Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cleanup processor rebalance #147

Merged
merged 1 commit into from
Oct 25, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,8 @@ func (p *partition) load(ctx context.Context, catchup bool) (rerr error) {
case *kafka.Message:
lastMessage = time.Now()
if ev.Topic != p.topic {
return fmt.Errorf("load: wrong topic = %s", ev.Topic)
p.log.Printf("dropping message from topic = %s while loading", ev.Topic)
continue
}
if err := p.storeEvent(ev); err != nil {
return fmt.Errorf("load: error updating storage: %v", err)
Expand Down
97 changes: 72 additions & 25 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ type Processor struct {

consumer kafka.Consumer
producer kafka.Producer
asCh chan kafka.Assignment

errg *multierr.ErrGroup
errors *multierr.Errors
cancel func()
}
Expand Down Expand Up @@ -113,6 +113,8 @@ func NewProcessor(brokers []string, gg *GroupGraph, options ...ProcessorOption)
views: views,

graph: gg,

asCh: make(chan kafka.Assignment, 1),
}

return processor, nil
Expand Down Expand Up @@ -273,7 +275,7 @@ func (g *Processor) Run(ctx context.Context) (rerr error) {

// create errorgroup
ctx, g.cancel = context.WithCancel(ctx)
g.errg, ctx = multierr.NewErrGroup(ctx)
errg, ctx := multierr.NewErrGroup(ctx)
defer g.cancel()

// collect all errors before leaving
Expand Down Expand Up @@ -314,7 +316,7 @@ func (g *Processor) Run(ctx context.Context) (rerr error) {
// start all views
for t, v := range g.views {
t, v := t, v
g.errg.Go(func() error {
errg.Go(func() error {
if err := v.Run(ctx); err != nil {
return fmt.Errorf("error starting lookup table %s: %v", t, err)
}
Expand All @@ -333,15 +335,18 @@ func (g *Processor) Run(ctx context.Context) (rerr error) {
}
if err := g.consumer.Subscribe(topics); err != nil {
g.cancel()
_ = g.errors.Merge(g.errg.Wait())
_ = g.errors.Merge(errg.Wait())
return fmt.Errorf("error subscribing topics: %v", err)
}

// start processor dispatcher
g.errg.Go(func() error { return g.run(ctx) })
errg.Go(func() error {
g.asCh <- kafka.Assignment{}
return g.waitAssignment(ctx)
})

// wait for goroutines to return
_ = g.errors.Merge(g.errg.Wait())
_ = g.errors.Merge(errg.Wait())

// remove all partitions first
g.opts.log.Printf("Processor: removing partitions")
Expand Down Expand Up @@ -380,18 +385,58 @@ func (g *Processor) pushToPartitionView(ctx context.Context, topic string, part
return nil
}

func (g *Processor) run(ctx context.Context) error {
g.opts.log.Printf("Processor: started")
defer g.opts.log.Printf("Processor: stopped")
func (g *Processor) waitAssignment(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
case a := <-g.asCh:
if err := g.runAssignment(ctx, a); err != nil {
return err
}
}
}
}

func (g *Processor) runAssignment(ctx context.Context, a kafka.Assignment) error {
errs := new(multierr.Errors)
ctx, cancel := context.WithCancel(ctx)
errg, ctx := multierr.NewErrGroup(ctx)
defer cancel()

// create partitions based on assignmend
if err := g.rebalance(errg, ctx, a); err.HasErrors() {
return errs.Collect(err).NilOrError()
}

// start dispatcher
errg.Go(func() error {
err := g.dispatcher(ctx)
// cancel context even if dispatcher returned nil -- can only be a rebalance
cancel()
return err
})

// wait until dispatcher or partitions have returned
_ = errs.Merge(errg.Wait())

// all partitions should have returned at this point, so clean up
_ = errs.Merge(g.removePartitions())

return errs.NilOrError()
}

func (g *Processor) dispatcher(ctx context.Context) error {
g.opts.log.Printf("Processor: dispatcher started")
defer g.opts.log.Printf("Processor: dispatcher stopped")

for {
select {
case ev := <-g.consumer.Events():
switch ev := ev.(type) {
case *kafka.Assignment:
if err := g.rebalance(ctx, *ev); err != nil {
return fmt.Errorf("error on rebalance: %v", err)
}
g.asCh <- *ev
return nil

case *kafka.Message:
var err error
Expand Down Expand Up @@ -487,7 +532,7 @@ func (g *Processor) newStorage(topic string, id int32, update UpdateCallback) (*
}, nil
}

func (g *Processor) createPartitionViews(ctx context.Context, id int32) error {
func (g *Processor) createPartitionViews(errg *multierr.ErrGroup, ctx context.Context, id int32) error {
g.m.Lock()
defer g.m.Unlock()

Expand All @@ -511,7 +556,7 @@ func (g *Processor) createPartitionViews(ctx context.Context, id int32) error {
)
g.partitionViews[id][t.Topic()] = p

g.errg.Go(func() (err error) {
errg.Go(func() (err error) {
defer func() {
if rerr := recover(); rerr != nil {
g.opts.log.Printf("partition view %s/%d: panic", p.topic, id)
Expand All @@ -533,7 +578,7 @@ func (g *Processor) createPartitionViews(ctx context.Context, id int32) error {
return nil
}

func (g *Processor) createPartition(ctx context.Context, id int32) error {
func (g *Processor) createPartition(errg *multierr.ErrGroup, ctx context.Context, id int32) error {
if _, has := g.partitions[id]; has {
return nil
}
Expand Down Expand Up @@ -565,7 +610,7 @@ func (g *Processor) createPartition(ctx context.Context, id int32) error {
g.opts.partitionChannelSize,
)
par := g.partitions[id]
g.errg.Go(func() (err error) {
errg.Go(func() (err error) {
defer func() {
if rerr := recover(); rerr != nil {
g.opts.log.Printf("partition %s/%d: panic", par.topic, id)
Expand All @@ -586,27 +631,29 @@ func (g *Processor) createPartition(ctx context.Context, id int32) error {
return nil
}

func (g *Processor) rebalance(ctx context.Context, partitions kafka.Assignment) error {
func (g *Processor) rebalance(errg *multierr.ErrGroup, ctx context.Context, partitions kafka.Assignment) *multierr.Errors {
errs := new(multierr.Errors)
g.opts.log.Printf("Processor: rebalancing: %+v", partitions)

for id := range partitions {
// create partition views
if err := g.createPartitionViews(ctx, id); err != nil {
return errs.Collect(err).NilOrError()
if err := g.createPartitionViews(errg, ctx, id); err != nil {
errs.Collect(err)
}
// create partition processor
if err := g.createPartition(ctx, id); err != nil {
return errs.Collect(err).NilOrError()
if err := g.createPartition(errg, ctx, id); err != nil {
errs.Collect(err)
}
}
return errs
}

func (g *Processor) removePartitions() *multierr.Errors {
errs := new(multierr.Errors)
for partition := range g.partitions {
if _, has := partitions[partition]; !has {
_ = errs.Merge(g.removePartition(partition))
}
_ = errs.Merge(g.removePartition(partition))
}
return errs.NilOrError()
return errs
}

func (g *Processor) removePartition(partition int32) *multierr.Errors {
Expand Down