Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

forward partition consumer errors #64

Merged
merged 2 commits into from
Dec 8, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 57 additions & 27 deletions kafka/simple_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@ func (c *simpleConsumer) AddPartition(topic string, partition int32, offset int6
c.wg.Add(1)
go func() {
defer c.wg.Done()
defer func() {
if err := recover(); err != nil {
c.events <- &Error{
Err: fmt.Errorf("panic: %v", err),
}
}
}()
c.run(pc, topic, partition, start, hwm)
}()
return nil
Expand Down Expand Up @@ -125,48 +132,71 @@ func (c *simpleConsumer) run(pc sarama.PartitionConsumer, topic string, partitio

count := 0
// wait for messages to arrive
for m := range pc.Messages() {
for {
select {
case c.events <- &Message{
Topic: m.Topic,
Partition: m.Partition,
Offset: m.Offset,
Key: string(m.Key),
Value: m.Value,
Timestamp: m.Timestamp,
}:
case <-c.dying:
return
}

// is this EOF?
// TODO (franz): check how fast the topic has to be until we don't get an EOF for
// *every* message.
if m.Offset == pc.HighWaterMarkOffset()-1 {
case m, ok := <-pc.Messages():
if !ok {
// Closed, this might happen due to an error. Continue the loop
// so we can send the error further down.
continue
}
select {
case c.events <- &EOF{
case c.events <- &Message{
Topic: m.Topic,
Partition: m.Partition,
Hwm: m.Offset + 1,
Offset: m.Offset,
Key: string(m.Key),
Value: m.Value,
Timestamp: m.Timestamp,
}:
case <-c.dying:
return
}

}
// is this EOF?
// TODO (franz): check how fast the topic has to be until we don't get an EOF for
// *every* message.
if m.Offset == pc.HighWaterMarkOffset()-1 {
select {
case c.events <- &EOF{
Topic: m.Topic,
Partition: m.Partition,
Hwm: m.Offset + 1,
}:
case <-c.dying:
return
}
}

count++
if count%1000 == 0 && m.Offset >= hwm { // was this EOF?
count++
if count%1000 == 0 && m.Offset >= hwm { // was this EOF?
select {
case c.events <- &EOF{
Topic: m.Topic,
Partition: m.Partition,
Hwm: pc.HighWaterMarkOffset(),
}:
case <-c.dying:
return
}
}
case err, ok := <-pc.Errors():
if !ok {
// Closed, this might happend if the error is not recoverable and will
// shutdown the partition consumer. Continue the loop and wait for a
// dying message.
continue
}
select {
case c.events <- &EOF{
Topic: m.Topic,
Partition: m.Partition,
Hwm: pc.HighWaterMarkOffset(),
case c.events <- &Error{
Err: err,
}:
case <-c.dying:
return
}

return
case <-c.dying:
return
}
}
}
Expand Down
12 changes: 9 additions & 3 deletions kafka/simple_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func TestSimpleConsumer_AddPartition(t *testing.T) {
hwm int64 = 1237
messages = make(chan *sarama.ConsumerMessage)
ch = make(chan Event)
cherr = make(chan *sarama.ConsumerError)
)

c := &simpleConsumer{
Expand All @@ -121,7 +122,8 @@ func TestSimpleConsumer_AddPartition(t *testing.T) {
client.EXPECT().GetOffset(topic, partition, sarama.OffsetOldest).Return(oldest, nil)
client.EXPECT().GetOffset(topic, partition, sarama.OffsetNewest).Return(hwm, nil)
consumer.EXPECT().ConsumePartition(topic, partition, offset).Return(pc, nil)
pc.EXPECT().Messages().Return(messages)
pc.EXPECT().Messages().Return(messages).AnyTimes()
pc.EXPECT().Errors().Return(cherr).AnyTimes()
err = c.AddPartition(topic, partition, offset)
ensure.Nil(t, err)
m, ok := (<-ch).(*BOF)
Expand Down Expand Up @@ -173,6 +175,7 @@ func TestSimpleConsumer_RemovePartition(t *testing.T) {
hwm int64 = 1237
messages = make(chan *sarama.ConsumerMessage)
ch = make(chan Event)
cherr = make(chan *sarama.ConsumerError)
)

c := &simpleConsumer{
Expand All @@ -188,7 +191,8 @@ func TestSimpleConsumer_RemovePartition(t *testing.T) {
client.EXPECT().GetOffset(topic, partition, sarama.OffsetOldest).Return(oldest, nil)
client.EXPECT().GetOffset(topic, partition, sarama.OffsetNewest).Return(hwm, nil)
consumer.EXPECT().ConsumePartition(topic, partition, offset).Return(pc, nil)
pc.EXPECT().Messages().Return(messages)
pc.EXPECT().Messages().Return(messages).AnyTimes()
pc.EXPECT().Errors().Return(cherr).AnyTimes()
err = c.AddPartition(topic, partition, offset)
ensure.Nil(t, err)

Expand Down Expand Up @@ -242,6 +246,7 @@ func TestSimpleConsumer_ErrorBlocked(t *testing.T) {
hwm int64 = 1237
messages = make(chan *sarama.ConsumerMessage)
ch = make(chan Event)
cherr = make(chan *sarama.ConsumerError)
)

c := &simpleConsumer{
Expand All @@ -265,7 +270,8 @@ func TestSimpleConsumer_ErrorBlocked(t *testing.T) {
client.EXPECT().GetOffset(topic, partition, sarama.OffsetOldest).Return(oldest, nil)
client.EXPECT().GetOffset(topic, partition, sarama.OffsetNewest).Return(hwm, nil)
consumer.EXPECT().ConsumePartition(topic, partition, offset).Return(pc, nil)
pc.EXPECT().Messages().Return(messages)
pc.EXPECT().Messages().Return(messages).AnyTimes()
pc.EXPECT().Errors().Return(cherr).AnyTimes()
err = c.AddPartition(topic, partition, offset)
ensure.Nil(t, err)
m, ok := (<-ch).(*BOF)
Expand Down
18 changes: 12 additions & 6 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,14 +336,20 @@ func (g *Processor) pushToPartitionView(topic string, part int32, ev kafka.Event
func (g *Processor) run() {
g.opts.log.Printf("Processor: started")
defer g.opts.log.Printf("Processor: stopped")
var failed bool

for ev := range g.consumer.Events() {
if _, ok := ev.(*kafka.Error); failed && !ok {
// if we failed and message is not a further error, drop message
continue
}

switch ev := ev.(type) {
case *kafka.Assignment:
err := g.rebalance(*ev)
if err != nil {
g.fail(fmt.Errorf("error on rebalance: %v", err))
return
failed = true
}

case *kafka.Message:
Expand All @@ -355,7 +361,7 @@ func (g *Processor) run() {
}
if err != nil {
g.fail(fmt.Errorf("error consuming message: %v", err))
return
failed = true
}

case *kafka.BOF:
Expand All @@ -367,7 +373,7 @@ func (g *Processor) run() {
}
if err != nil {
g.fail(fmt.Errorf("error consuming BOF: %v", err))
return
failed = true
}

case *kafka.EOF:
Expand All @@ -379,7 +385,7 @@ func (g *Processor) run() {
}
if err != nil {
g.fail(fmt.Errorf("error consuming EOF: %v", err))
return
failed = true
}

case *kafka.NOP:
Expand All @@ -391,11 +397,11 @@ func (g *Processor) run() {

case *kafka.Error:
g.fail(ev.Err)
return
failed = true

default:
g.fail(fmt.Errorf("processor: cannot handle %T = %v", ev, ev))
return
failed = true
}
}
}
Expand Down