Skip to content

Commit

Permalink
drain errors when failing
Browse files Browse the repository at this point in the history
  • Loading branch information
Diogo Behrens authored and db7 committed Dec 8, 2017
1 parent 6f7f0d3 commit e759df5
Showing 1 changed file with 12 additions and 6 deletions.
18 changes: 12 additions & 6 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,14 +336,20 @@ func (g *Processor) pushToPartitionView(topic string, part int32, ev kafka.Event
func (g *Processor) run() {
g.opts.log.Printf("Processor: started")
defer g.opts.log.Printf("Processor: stopped")
var failed bool

for ev := range g.consumer.Events() {
if _, ok := ev.(*kafka.Error); failed && !ok {
// if we failed and message is not a further error, drop message
continue
}

switch ev := ev.(type) {
case *kafka.Assignment:
err := g.rebalance(*ev)
if err != nil {
g.fail(fmt.Errorf("error on rebalance: %v", err))
return
failed = true
}

case *kafka.Message:
Expand All @@ -355,7 +361,7 @@ func (g *Processor) run() {
}
if err != nil {
g.fail(fmt.Errorf("error consuming message: %v", err))
return
failed = true
}

case *kafka.BOF:
Expand All @@ -367,7 +373,7 @@ func (g *Processor) run() {
}
if err != nil {
g.fail(fmt.Errorf("error consuming BOF: %v", err))
return
failed = true
}

case *kafka.EOF:
Expand All @@ -379,7 +385,7 @@ func (g *Processor) run() {
}
if err != nil {
g.fail(fmt.Errorf("error consuming EOF: %v", err))
return
failed = true
}

case *kafka.NOP:
Expand All @@ -391,11 +397,11 @@ func (g *Processor) run() {

case *kafka.Error:
g.fail(ev.Err)
return
failed = true

default:
g.fail(fmt.Errorf("processor: cannot handle %T = %v", ev, ev))
return
failed = true
}
}
}
Expand Down

0 comments on commit e759df5

Please sign in to comment.