diff --git a/nsqd/protocol_v2_test.go b/nsqd/protocol_v2_test.go index 7b340f3c2..de7c0f8c4 100644 --- a/nsqd/protocol_v2_test.go +++ b/nsqd/protocol_v2_test.go @@ -5,6 +5,7 @@ import ( "bytes" "compress/flate" "crypto/tls" + _ "embed" "encoding/json" "errors" "fmt" @@ -1931,3 +1932,90 @@ func BenchmarkProtocolV2MultiSub2(b *testing.B) { benchmarkProtocolV2MultiSub(b func BenchmarkProtocolV2MultiSub4(b *testing.B) { benchmarkProtocolV2MultiSub(b, 4) } func BenchmarkProtocolV2MultiSub8(b *testing.B) { benchmarkProtocolV2MultiSub(b, 8) } func BenchmarkProtocolV2MultiSub16(b *testing.B) { benchmarkProtocolV2MultiSub(b, 16) } + +//go:embed protocol_v2_test.go +var testData []byte + +func BenchmarkCompress(b *testing.B) { + // This benchmark uses the go-nsq library, so the benefits of the + // compression chosen are somewhat limited by the implementation in the + // library. At time of writing the library doesn't properly buffer Snappy, + // and is using a slower flate implemenation than that used by nsqd. + for _, compression := range []string{"none", "snappy", "deflate1", "deflate3", "deflate5", "deflate6", "deflate9"} { + b.Run(compression, func(b *testing.B) { + opts := NewOptions() + defer os.RemoveAll(opts.DataPath) + opts.Logger = test.NilLogger{} + + tcpAddr, _, nsqd := mustStartNSQD(opts) + defer nsqd.Exit() + + cfg := nsq.NewConfig() + + switch compression { + case "none": + case "snappy": + cfg.Snappy = true + case "deflate1": + cfg.Deflate = true + cfg.DeflateLevel = 1 + case "deflate3": + cfg.Deflate = true + cfg.DeflateLevel = 3 + case "deflate5": + cfg.Deflate = true + cfg.DeflateLevel = 5 + case "deflate6": + cfg.Deflate = true + cfg.DeflateLevel = 6 + case "deflate9": + cfg.Deflate = true + cfg.DeflateLevel = 9 + default: + b.Fatalf("unknown compression: %s", compression) + } + + consumer, err := nsq.NewConsumer("test", "ch", cfg) + if err != nil { + b.Fatal(err) + } + defer consumer.Stop() + consumer.SetLogger(test.NilLogger{}, nsq.LogLevelInfo) + + var wg sync.WaitGroup + wg.Add(1) + + var count int32 + + consumer.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error { + if atomic.AddInt32(&count, 1) == int32(b.N) { + wg.Done() + } + return nil + })) + + if err := consumer.ConnectToNSQD(tcpAddr.String()); err != nil { + b.Fatal(err) + } + + producer, err := nsq.NewProducer(tcpAddr.String(), cfg) + if err != nil { + b.Fatal(err) + } + producer.SetLogger(test.NilLogger{}, nsq.LogLevelInfo) + defer producer.Stop() + + b.SetBytes(int64(len(testData))) + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + if err := producer.Publish("test", testData); err != nil { + b.Fatal(err) + } + } + + wg.Wait() + }) + } +}