From 61a456671345c598cc998afce55ad8bf609cfccf Mon Sep 17 00:00:00 2001 From: JP Phillips Date: Mon, 13 Nov 2017 13:46:28 -0600 Subject: [PATCH] fix growing allocation of pendingOffsets when no offsetManager is in use (#436) * fix growing allocation of pendingOffsets when no offsetManager is in use * dont close writeResult channel, avoids potential panic if used as a lib --- pipeline/node.go | 8 +++++--- pipeline/node_bench_test.go | 25 +++++++++++++++++++++++++ 2 files changed, 30 insertions(+), 3 deletions(-) create mode 100644 pipeline/node_bench_test.go diff --git a/pipeline/node.go b/pipeline/node.go index 60041d669..3d5320518 100644 --- a/pipeline/node.go +++ b/pipeline/node.go @@ -544,9 +544,11 @@ func (n *Node) write(msg message.Msg, off offset.Offset) (message.Msg, error) { return nil, err } - n.offsetLock.Lock() - n.pendingOffsets = append(n.pendingOffsets, off) - n.offsetLock.Unlock() + if n.om != nil { + n.offsetLock.Lock() + n.pendingOffsets = append(n.pendingOffsets, off) + n.offsetLock.Unlock() + } ctx, cancel := context.WithTimeout(context.Background(), n.writeTimeout) defer cancel() c := make(chan writeResult) diff --git a/pipeline/node_bench_test.go b/pipeline/node_bench_test.go new file mode 100644 index 000000000..b18d59e8b --- /dev/null +++ b/pipeline/node_bench_test.go @@ -0,0 +1,25 @@ +package pipeline + +import ( + "testing" + + "github.com/compose/transporter/offset" + + "github.com/compose/transporter/message" +) + +func BenchmarkNodeWrite(b *testing.B) { + node, err := NewNodeWithOptions("benchwriter", "test", ".*", + WithWriteTimeout("500ms"), + ) + if err != nil { + b.Error(err) + } + + msg := &message.Base{} + off := offset.Offset{} + b.ReportAllocs() + for i := 0; i < b.N; i++ { + node.write(msg, off) + } +}