Skip to content

Commit

Permalink
nsqd: add a benchmark for compression
Browse files Browse the repository at this point in the history
  • Loading branch information
philpearl committed Apr 30, 2024
1 parent 67857ae commit 8e64930
Showing 1 changed file with 88 additions and 0 deletions.
88 changes: 88 additions & 0 deletions nsqd/protocol_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"bytes"
"compress/flate"
"crypto/tls"
_ "embed"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -1931,3 +1932,90 @@ func BenchmarkProtocolV2MultiSub2(b *testing.B) { benchmarkProtocolV2MultiSub(b
func BenchmarkProtocolV2MultiSub4(b *testing.B) { benchmarkProtocolV2MultiSub(b, 4) }
func BenchmarkProtocolV2MultiSub8(b *testing.B) { benchmarkProtocolV2MultiSub(b, 8) }
func BenchmarkProtocolV2MultiSub16(b *testing.B) { benchmarkProtocolV2MultiSub(b, 16) }

//go:embed protocol_v2_test.go
var testData []byte

func BenchmarkCompress(b *testing.B) {
// This benchmark uses the go-nsq library, so the benefits of the
// compression chosen are somewhat limited by the implementation in the
// library. At time of writing the library doesn't properly buffer Snappy,
// and is using a slower flate implemenation than that used by nsqd.
for _, compression := range []string{"none", "snappy", "deflate1", "deflate3", "deflate5", "deflate6", "deflate9"} {
b.Run(compression, func(b *testing.B) {
opts := NewOptions()
defer os.RemoveAll(opts.DataPath)
opts.Logger = test.NilLogger{}

tcpAddr, _, nsqd := mustStartNSQD(opts)
defer nsqd.Exit()

cfg := nsq.NewConfig()

switch compression {
case "none":
case "snappy":
cfg.Snappy = true
case "deflate1":
cfg.Deflate = true
cfg.DeflateLevel = 1
case "deflate3":
cfg.Deflate = true
cfg.DeflateLevel = 3
case "deflate5":
cfg.Deflate = true
cfg.DeflateLevel = 5
case "deflate6":
cfg.Deflate = true
cfg.DeflateLevel = 6
case "deflate9":
cfg.Deflate = true
cfg.DeflateLevel = 9
default:
b.Fatalf("unknown compression: %s", compression)
}

consumer, err := nsq.NewConsumer("test", "ch", cfg)
if err != nil {
b.Fatal(err)
}
defer consumer.Stop()
consumer.SetLogger(test.NilLogger{}, nsq.LogLevelInfo)

var wg sync.WaitGroup
wg.Add(1)

var count int32

consumer.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
if atomic.AddInt32(&count, 1) == int32(b.N) {
wg.Done()
}
return nil
}))

if err := consumer.ConnectToNSQD(tcpAddr.String()); err != nil {
b.Fatal(err)
}

producer, err := nsq.NewProducer(tcpAddr.String(), cfg)
if err != nil {
b.Fatal(err)
}
producer.SetLogger(test.NilLogger{}, nsq.LogLevelInfo)
defer producer.Stop()

b.SetBytes(int64(len(testData)))
b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
if err := producer.Publish("test", testData); err != nil {
b.Fatal(err)
}
}

wg.Wait()
})
}
}

0 comments on commit 8e64930

Please sign in to comment.