Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dropping message while rebalancing #207

Closed
WideLee opened this issue Nov 6, 2019 · 8 comments
Closed

Dropping message while rebalancing #207

WideLee opened this issue Nov 6, 2019 · 8 comments

Comments

@WideLee
Copy link

WideLee commented Nov 6, 2019

I have tested on 2-clicks example by running one producer to emit user's clicks and two consumers to statistics clicks. the message emitted by producer such as follow:

user-0  0-<time>
user-1  1-<time>
user-2  2-<time>
user-3  3-<time>
user-4 4-<time>
user-5  5-<time>
user-6  6-<time>
user-7  7-<time>
user-8  8-<time>
user-9  9-<time>
user-0  10-<time>
user-1  11-<time>
user-2  12-<time>
...
user-8  18-<time>
user-9  19-<time>
user-0  20-<time>

However, after the second consumer started and kafka rebalancing, the clicks count of first consumer is smaller than expected, the missing count equals to the number of the message "dropping message from topic = user-click while loading".

[proc] key: user-0 clicks: 16, msg: 150-2019-11-06 23:32:01.300978 +0800 CST m=+16.488060985
[proc] key: user-1 clicks: 16, msg: 151-2019-11-06 23:32:01.4099 +0800 CST m=+16.596983868
[proc] key: user-2 clicks: 16, msg: 152-2019-11-06 23:32:01.518171 +0800 CST m=+16.705255671
[proc] key: user-3 clicks: 16, msg: 153-2019-11-06 23:32:01.628471 +0800 CST m=+16.815555953
[proc] key: user-4 clicks: 16, msg: 154-2019-11-06 23:32:01.73763 +0800 CST m=+16.924715895
[proc] key: user-5 clicks: 16, msg: 155-2019-11-06 23:32:01.847644 +0800 CST m=+17.034731167
[proc] key: user-6 clicks: 16, msg: 156-2019-11-06 23:32:01.957907 +0800 CST m=+17.144994713
[proc] key: user-7 clicks: 16, msg: 157-2019-11-06 23:32:02.066497 +0800 CST m=+17.253585582
[proc] key: user-8 clicks: 16, msg: 158-2019-11-06 23:32:02.175928 +0800 CST m=+17.363017306
2019/11/06 23:32:02 Processor: dispatcher stopped
2019/11/06 23:32:02 partition mini-group-15-table/0: exit
2019/11/06 23:32:02 partition mini-group-15-table/1: exit
2019/11/06 23:32:02 partition mini-group-15-table/2: exit
2019/11/06 23:32:02 partition mini-group-15-table/3: exit
2019/11/06 23:32:02 Removing partition 1
2019/11/06 23:32:02 Removing partition 2
2019/11/06 23:32:02 Removing partition 3
2019/11/06 23:32:02 Removing partition 0
2019/11/06 23:32:02 Processor: rebalancing: map[0:-1 1:-1]
2019/11/06 23:32:02 finished building storage for topic mini-group-15-table
2019/11/06 23:32:02 finished building storage for topic mini-group-15-table
2019/11/06 23:32:02 Processor: dispatcher started
2019/11/06 23:32:02 dropping message from topic = user-click-15 while loading
2019/11/06 23:32:02 dropping message from topic = user-click-15 while loading
2019/11/06 23:32:03 dropping message from topic = user-click-15 while loading
2019/11/06 23:32:03 dropping message from topic = user-click-15 while loading
2019/11/06 23:32:03 dropping message from topic = user-click-15 while loading
2019/11/06 23:32:03 dropping message from topic = user-click-15 while loading
[proc] key: user-5 clicks: 17, msg: 165-2019-11-06 23:32:02.933283 +0800 CST m=+18.120377988
[proc] key: user-9 clicks: 16, msg: 169-2019-11-06 23:32:03.373335 +0800 CST m=+18.560433160
[proc] key: user-1 clicks: 17, msg: 171-2019-11-06 23:32:03.592268 +0800 CST m=+18.779368149
[proc] key: user-5 clicks: 18, msg: 175-2019-11-06 23:32:04.068551 +0800 CST m=+19.255654438
[proc] key: user-4 clicks: 17, msg: 174-2019-11-06 23:32:03.945563 +0800 CST m=+19.132665611

For example, user-9's click seems to be 17 which the 169 message emit by producer, but it is 16 on processor.

I wonder how to gracefully upgrade consumers on production environment without drop any messages? it seems during upgrade we need to restart each consumer and kafka rebalancing each time, and rebalancing results drop message while loading.

Thanks very much ^_^

@frairon
Copy link
Contributor

frairon commented Nov 8, 2019

Hi WideLee,

thanks for the report.
Goka is designed to never drop/discard/miss any messages. The message "dropping message while loading" occurs only during catchup/recover of a processor-table or view. The reason for this is the internal multiplexing of messages. For example in case of a rebalance, messages from the previous session could still be in the internal pipeline, which must be ignored.
When the recover for a rebalance is finished, the processor will continue at the offset, it (or another instance) stopped before.
If you do experience data loss, it's definitely a bug which we have to address.

The example you started runs an Emitter that emits messages periodically until stopped. This makes it hard to make sure there is no message loss.
To verify corrrect behavior, you could modify the emitter function to stop after a certain number of messages. Then check that all partitions have received the configured number of events.
Let me know if there is really an issue here. If you do this, feel free to fork the repo and share your changes in a branch so we can have a look too.

Thanks a lot,
Franz

@WideLee
Copy link
Author

WideLee commented Nov 8, 2019

This is code of emitter, it emits 1000 message and the last message is user-9 999-<time>

user-0  0-<time>
user-1  1-<time>
...
user-7  997-<time>
user-8  998-<time>
user-9  999-<time>
func runEmitter() {
	emitter, err := goka.NewEmitter(brokers, topic, new(codec.String))
	if err != nil {
		panic(err)
	}
	defer emitter.Finish()

	var i int
	for i = 0; i < 1000; i++ {
		key := fmt.Sprintf("user-%d", i%10)
		value := fmt.Sprintf("%v-%s", i, time.Now())
		emitter.EmitSync(key, value)
		fmt.Printf("emit: %v  %v\n", key, value)
	}
}

Here is the consumer, only added the gracefully quit and specify the samara kafka verison to the example in examples/2-clicks/main.go

func process(ctx goka.Context, msg interface{}) {
	var u *user
	if val := ctx.Value(); val != nil {
		u = val.(*user)
	} else {
		u = new(user)
	}

	u.Clicks++
	ctx.SetValue(u)
	fmt.Printf("[proc] key: %s clicks: %d, msg: %v\n", ctx.Key(), u.Clicks, msg)
}

func runProcessor() {
	ctx, cancel := context.WithCancel(context.Background())

	g := goka.DefineGroup(group,
		goka.Input(topic, new(codec.String), process),
		goka.Persist(new(userCodec)),
	)

	config := kafka.NewConfig()
	config.Version = sarama.V0_11_0_0
	p, err := goka.NewProcessor(brokers, g,
		goka.WithConsumerBuilder(kafka.ConsumerBuilderWithConfig(config)),
		goka.WithProducerBuilder(kafka.ProducerBuilderWithConfig(config)))
	if err != nil {
		panic(err)
	}

	done := make(chan bool)
	go func() {
		defer close(done)
		if err = p.Run(ctx); err != nil {
			log.Fatalf("error running processor: %v", err)
		}
	}()

	wait := make(chan os.Signal, 1)
	signal.Notify(wait, syscall.SIGINT, syscall.SIGTERM)
	<-wait   // wait for SIGINT/SIGTERM
	cancel() // gracefully stop processor
	<-done
}

Normally, if I run only one consumer, it gets the output as below, each user clicks 100 times,

[proc] key: user-0 clicks: 100, msg: 990-<time>
[proc] key: user-1 clicks: 100, msg: 991-<time>
[proc] key: user-2 clicks: 100, msg: 992-<time>
[proc] key: user-3 clicks: 100, msg: 993-<time>
[proc] key: user-4 clicks: 100, msg: 994-<time>
[proc] key: user-5 clicks: 100, msg: 995-<time>
[proc] key: user-6 clicks: 100, msg: 996-<time>
[proc] key: user-7 clicks: 100, msg: 997-<time>
[proc] key: user-8 clicks: 100, msg: 998-<time>
[proc] key: user-9 clicks: 100, msg: 999-<time>

But I if I run two consumers, after the second one started and rebalancing, the click count seems wired in the first consumer.

Here is the output

[proc] key: user-3 clicks: 12, msg: 113-2019-11-08 18:30:16.805249 +0800 CST m=+12.609494504
[proc] key: user-4 clicks: 12, msg: 114-2019-11-08 18:30:16.924074 +0800 CST m=+12.728320170
[proc] key: user-5 clicks: 12, msg: 115-2019-11-08 18:30:17.037732 +0800 CST m=+12.841978782
[proc] key: user-6 clicks: 12, msg: 116-2019-11-08 18:30:17.14698 +0800 CST m=+12.951227773
[proc] key: user-7 clicks: 12, msg: 117-2019-11-08 18:30:17.253815 +0800 CST m=+13.058063574
[proc] key: user-8 clicks: 12, msg: 118-2019-11-08 18:30:17.364289 +0800 CST m=+13.168538195
[proc] key: user-9 clicks: 12, msg: 119-2019-11-08 18:30:17.475215 +0800 CST m=+13.279464332
[proc] key: user-0 clicks: 13, msg: 120-2019-11-08 18:30:17.584925 +0800 CST m=+13.389175213
[proc] key: user-1 clicks: 13, msg: 121-2019-11-08 18:30:17.694985 +0800 CST m=+13.499235245
2019/11/08 18:30:18 Processor: dispatcher stopped
2019/11/08 18:30:18 partition mini-group-22-table/3: exit
2019/11/08 18:30:18 partition mini-group-22-table/1: exit
2019/11/08 18:30:18 partition mini-group-22-table/2: exit
2019/11/08 18:30:18 partition mini-group-22-table/0: exit
2019/11/08 18:30:18 Removing partition 0
2019/11/08 18:30:18 Removing partition 1
2019/11/08 18:30:18 Removing partition 2
2019/11/08 18:30:18 Removing partition 3
2019/11/08 18:30:18 Processor: rebalancing: map[2:-1 3:-1]
2019/11/08 18:30:18 finished building storage for topic mini-group-22-table
2019/11/08 18:30:18 finished building storage for topic mini-group-22-table
2019/11/08 18:30:18 Processor: dispatcher started
2019/11/08 18:30:18 dropping message from topic = user-click-22 while loading
[proc] key: user-6 clicks: 13, msg: 126-2019-11-08 18:30:18.247725 +0800 CST m=+14.051979156
2019/11/08 18:30:18 dropping message from topic = user-click-22 while loading
2019/11/08 18:30:18 dropping message from topic = user-click-22 while loading
[proc] key: user-2 clicks: 13, msg: 132-2019-11-08 18:30:18.908592 +0800 CST m=+14.712850016
[proc] key: user-3 clicks: 13, msg: 133-2019-11-08 18:30:19.032942 +0800 CST m=+14.837200567
[proc] key: user-6 clicks: 14, msg: 136-2019-11-08 18:30:19.369541 +0800 CST m=+15.173802335
[proc] key: user-7 clicks: 13, msg: 137-2019-11-08 18:30:19.480862 +0800 CST m=+15.285123123
[proc] key: user-2 clicks: 14, msg: 142-2019-11-08 18:30:20.029871 +0800 CST m=+15.834135927
[proc] key: user-3 clicks: 14, msg: 143-2019-11-08 18:30:20.139712 +0800 CST m=+15.943977246
[proc] key: user-6 clicks: 15, msg: 146-2019-11-08 18:30:20.469384 +0800 CST m=+16.273651245
[proc] key: user-7 clicks: 14, msg: 147-2019-11-08 18:30:20.575578 +0800 CST m=+16.379846442
[proc] key: user-2 clicks: 15, msg: 152-2019-11-08 18:30:21.12785 +0800 CST m=+16.932121795

The clicks of user-6 is normal but user-2, user-3 and user-7 expected 14. Finally, after the emitter finished 1000 messages, the total count of user-2, user-3 and user-7 is 99, other users is 100.

I am not sure if IT IS a normal behavior.
It seems that it started to consume messages before the partition storage is ready, and these messages have been dropped without processing. I have checked the total messages number in kafka topic and it is 1000. If I start one new consumer with different group and process these messages from beginning, the users' click count is 100 as expected.

BTW, the version of Kafka is 0.11.0.

@frairon
Copy link
Contributor

frairon commented Nov 8, 2019

Did you start the two consumers on the same machine locally? If so, there can be problems with storage, since they're using the same directory.

@WideLee
Copy link
Author

WideLee commented Nov 8, 2019

Nope, two consumers are in different hosts, one in is Mac host and the other in my Parallel Desktop virtual machine (CentOS). Even the storage is /tmp/goka, but it is still different.

@WideLee
Copy link
Author

WideLee commented Nov 8, 2019

If I run them in the same machine, it will panic with an error, but not in this case.

2019/11/08 19:57:24 error running processor: processor: error creating storage: error opening leveldb: resource temporarily unavailable

@WideLee
Copy link
Author

WideLee commented Nov 8, 2019

I have noticed that when the dropping message is occurred,

ev.Topic is "user-click-22", but p.Topic is "mini-group-22-table"

if ev.Topic != p.topic {
	p.log.Printf("dropping message from topic = %s while loading", ev.Topic)
	continue
}

@WideLee
Copy link
Author

WideLee commented Nov 9, 2019

@frairon I found the reason why click count is less than expected.

After rebalancing and message dispatcher is restarted, the partition message channel received messages as follow:

<Processor: dispatcher started>
message of topic "user-click" from groupConsumer
message of topic "user-click" from groupConsumer
......
EOF message from simpleConsumer
message of topic "user-click" from groupConsumer

And the message loop is in partition.recover() until EOF received. For this reason, the normal message from groupConsumer is dropped without processing, and these messages cannot be processed forever.

However, the message channel of groupConsumer and simpleConsumer is the same, Is it possible to solve this problem and let every messages being processed?

Thanks very much 😄

@frairon
Copy link
Contributor

frairon commented Feb 3, 2020

issue seems to be fixed, thanks @WideLee for the PR

@frairon frairon closed this as completed Feb 3, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants