Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Why is kafka-go very slow compared to Sarama? #417

Closed
aneeskA opened this issue Mar 19, 2020 · 5 comments
Closed

Why is kafka-go very slow compared to Sarama? #417

aneeskA opened this issue Mar 19, 2020 · 5 comments
Labels

Comments

@aneeskA
Copy link

aneeskA commented Mar 19, 2020

Describe the bug
I am trying to send 100GB of data into kafka topic by breaking the 100GB into batches of 100 lines.

Using kafka-go, I see that it writes 1 message per second, as. per : https://www.gitmemory.com/issue/segmentio/kafka-go/326/519375403 .

To over this issue, I created a go routine for each write. This immediately improved the throughput.

But the application was quickly killed by OOM Killer since the data to be written was created faster than writing to kafka and the data that was accumulated with kafka-go exhausted the memory.

But when I did the same experiment using sarama, 100GB of data was moved to kafka in 2hr10mins. There was no concurrent routines to do the writes. It was done one after the other.

Why is this so? Is there an example in kafka-go to move high volume data with high throughput?

@aneeskA aneeskA added the bug label Mar 19, 2020
@aneeskA
Copy link
Author

aneeskA commented Mar 19, 2020

I wrote the following benchmark test to confirm the findings of mine.

func BenchmarkWriter(b *testing.B) {
	b.StopTimer()
	writer := kafka.NewWriter(kafka.WriterConfig{
		Brokers:      []string{"localhost:9092"},
		Topic:        "Topic1",
		Balancer:     &kafka.RoundRobin{},
		BatchTimeout: 10 * time.Millisecond,
		BatchSize:    1000,
	})
	b.StartTimer()
	for i := 0; i < b.N; i++ {
		err := writer.WriteMessages(context.Background(),
			kafka.Message{
				Key:   []byte("key"),
				Value: []byte("iter:" + strconv.Itoa(i)),
			},
		)
		if err != nil {
			b.Error(err)
		}
	}
}

When bechmark was run for 10s using $go test -bench=Writer -benchtime 10s , it was able to run 896 times only.

goos: darwin
goarch: amd64
pkg: ...
BenchmarkWriter-12    	     896	  14077251 ns/op
PASS
ok  	...	14.157s

@stevevls stevevls added question and removed bug labels Mar 19, 2020
@lotos2512
Copy link

lotos2512 commented Mar 23, 2020

try fix to

kafka.NewWriter(kafka.WriterConfig{
				BatchBytes:       1000000,
				BatchTimeout:     time.Millisecond * 5,
				Brokers:          viper.GetStringSlice("kafka.writerBlocked.brokers"),
				Topic:            viper.GetString("kafka.writerBlocked.topic"),
}),

i meen BatchTimeout and BatchTimeout

@stevevls
Copy link
Contributor

Hi @aneeskA.

Keep in mind that kafka-go and sarama have very different APIs. The writer.WriteMessages call is working exactly as advertised. WriteMessages is a synchronous function, which means that it will not return until the messages you pass in are produced to Kafka.

The BatchTimeout, BatchSize, and BatchBytes parameters are set so that you can gain efficiencies by having multiple goroutines producing in parallel. In your loop, though, they only serve to slow things down. On each loop, your call to WriteMessages will wait until either 1000 messages are available to publish or 10 milliseconds pass. Since you only have a single goroutine producing messages, that means each loop will have to wait for the timeout. So, each messages will take 10 ms plus the time to produce a single message to Kafka.

I think that you'll find that if you change your code to account for the synchronous logic, it will run very quickly:

	var msgs []kafka.Message
	for i := 0; i < b.N; i++ {
		msgs = append(msgs, kafka.Message{
			Key:   []byte("key"),
			Value: []byte("iter:" + strconv.Itoa(i)),
		});
	}
	err := writer.WriteMessages(context.Background(), msgs...)
	if err != nil {
		b.Error(err)
	}

@aneeskA
Copy link
Author

aneeskA commented Mar 24, 2020

@stevevls Thanks! This is very helpful. Is there an async version of WriteMessages() available? By the way, I love the simplicity of kafka-go.

@stevevls
Copy link
Contributor

When you create the Writer, you can configure it to be async:

kafka-go/writer.go

Lines 115 to 119 in efce7b6

// Setting this flag to true causes the WriteMessages method to never block.
// It also means that errors are ignored since the caller will not receive
// the returned value. Use this only if you don't care about guarantees of
// whether the messages were written to kafka.
Async bool
. The only thing to be careful about is that you lose delivery guarantees. Another option is to manage concurrency yourself by spawning goroutines that call WriteMessaages.

Glad you enjoy using kafka-go! 😄

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants