Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lost lots of messages due to async produce #192

Closed
XiaochenCui opened this issue Jun 20, 2019 · 12 comments
Closed

Lost lots of messages due to async produce #192

XiaochenCui opened this issue Jun 20, 2019 · 12 comments

Comments

@XiaochenCui
Copy link

I recently write a test script which produce 20 million messages to kafka with goka, but there
is only about 14 million messages after produce complete.
The disk space is enough and no error messages in kafak logs and client logs.
The script and kafka is running in docker, and under a subnet.

test scripts:

func GokaAsyncProduce() {
	emitter, err := goka.NewEmitter(
		viper.GetStringSlice("kafkaConfig.brokerUrls"),
		goka.Stream(viper.GetString("kafkaConfig.topic")),
		new(codec.String),
	)
	if err != nil {
		log.Fatalf("error creating emitter: %v", err)
	}

	startTime = time.Now().UnixNano()

	preTime := time.Now().UnixNano()
	preN := 0

	for n := 0; n < count; n++ {
		bs := getPkg()
		_, err = emitter.Emit("", string(bs))
		if err != nil {
			log.Fatalf("error emitting message: %v", err)
		}

		currTime := time.Now().UnixNano()
		if float64(currTime-preTime) > float64(collectInterval)*math.Pow10(9) {
			currN := n - preN
			currSpeed := currN / collectInterval
			fmt.Printf("produce speed: %v pps", currSpeed)

			preTime = currTime
			preN = n

			PrintMemUsage()
			PrintCPUUsage()
		}
	}
	emitter.Finish()

	endTime = time.Now().UnixNano()
}

Count messages using
docker-compose exec kafka kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka:9092 --topic test --time -1 --offsets 1

@db7
Copy link
Collaborator

db7 commented Jun 20, 2019

That sounds weird. What is the retention time of the topic? It's not log compacted, right?

@XiaochenCui
Copy link
Author

Here coms the log retention policy:

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

And log compaction is disabled

I test 20 million messages using samara, and there are exactly 20 million messages after test finish:

docker-compose exec kafka kafka-run-class.sh kafka.tools.GetOffsetShell  --broker-list kafka:9092 --topic test --time -1 --offsets 1
test:0:20000000

I will test goka soon later and post the result here.

Thank you.

@XiaochenCui
Copy link
Author

Oops there are only 11 million messages in the topic.

docker-compose exec kafka kafka-run-class.sh kafka.tools.GetOffsetShell  --broker-list kafka:9092 --topic test --time -1 --offsets 1
test:0:11576072

@yanyuzhy
Copy link

yanyuzhy commented Jun 21, 2019

@XiaochenCui
Could you try multi times and then paste the results? Thus we can get more information about this issue.

@frairon
Copy link
Contributor

frairon commented Jun 21, 2019

@XiaochenCui one error check could also help: the emitter.Emit(...) returns a Promise which will result in an error. Maybe there are errors which you aren't seeing since you're not checking for that. Like this:

prom, err = emitter.Emit("", string(bs))
prom.Then(func(err error){
    if err != nil {
	log.Fatalf("error emitting message (in promise): %v", err)
    }
})

@XiaochenCui
Copy link
Author

XiaochenCui commented Jun 21, 2019

@frairon

2019/06/21 08:30:50 error emitting message: kafka server: Message was too large, server rejected it to avoid allocation error.

@frairon
Copy link
Contributor

frairon commented Jun 21, 2019

Yep, nice :)
So at least one message is too large.
Kafka's limit is 1MB per message by default, but I'd recommend using much smaller messages.
If your message is <1MB you could try to do an emitter.EmitSync(...), which send messages one-by-one. It's much slower than batching but you could narrow the error down to the batching. If your messages are > 1MB you have to split them up. I think the Kafka documentation does not recommend increasing the max-message-size config.

@XiaochenCui
Copy link
Author

@frairon
But the size of message is 30 - 1000 bytes, so what's the root problem?

@frairon
Copy link
Contributor

frairon commented Jul 1, 2019

Well, if Kafka says the messages are too big, then I guess they really are too big, ohne way or another. If your single messages really are only 1000bytes max, then it has to do with the internal batching of the kafka producer (which I doubt, but wouldn't know where else to look). So have you tried sending with EmitSync to check if the errors still occur?

@XiaochenCui
Copy link
Author

@frairon
Not yet, the performance of EmitSync is only about 10 messages per second and far from meeting our expectations

@frairon
Copy link
Contributor

frairon commented Jul 7, 2019

@XiaochenCui yes I understand the performance issue here, but trying that could make sure that it's not sarama's internal batching that causes messages that are too big.
Anyway, we have experienced that error a couple of times and it was always a message being too big. Always.
So to make sure it's not too big you could log the message size in the error handler. If we can't be sure 100% that the messages aren't too big, it's a waste of time to look somewhere else for the cause of the error.

@frairon
Copy link
Contributor

frairon commented Feb 3, 2020

I'll close that for now, feel free to reopen if it's still an issue

@frairon frairon closed this as completed Feb 3, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants