diff --git a/plugins/inputs/statsd/statsd.go b/plugins/inputs/statsd/statsd.go index a905c2092f7e0..76dd25054a8c5 100644 --- a/plugins/inputs/statsd/statsd.go +++ b/plugins/inputs/statsd/statsd.go @@ -814,7 +814,6 @@ func (s *Statsd) remember(id string, conn *net.TCPConn) { func (s *Statsd) Stop() { s.Lock() - defer s.Unlock() log.Println("I! Stopping the statsd service") close(s.done) switch s.Protocol { @@ -838,9 +837,14 @@ func (s *Statsd) Stop() { default: s.UDPlistener.Close() } + s.Unlock() + s.wg.Wait() + + s.Lock() close(s.in) log.Println("I! Stopped Statsd listener service on ", s.ServiceAddress) + s.Unlock() } func init() { diff --git a/plugins/inputs/statsd/statsd_test.go b/plugins/inputs/statsd/statsd_test.go index 5c6b2065a4d1f..91af51d67ebf3 100644 --- a/plugins/inputs/statsd/statsd_test.go +++ b/plugins/inputs/statsd/statsd_test.go @@ -124,6 +124,36 @@ func TestCloseConcurrentConns(t *testing.T) { listener.Stop() } +// benchmark how long it takes to accept & process 100,000 metrics: +func BenchmarkUDP(b *testing.B) { + listener := Statsd{ + Protocol: "udp", + ServiceAddress: ":8125", + AllowedPendingMessages: 250000, + } + acc := &testutil.Accumulator{Discard: true} + + // send multiple messages to socket + for n := 0; n < b.N; n++ { + err := listener.Start(acc) + if err != nil { + panic(err) + } + + time.Sleep(time.Millisecond * 25) + conn, err := net.Dial("udp", "127.0.0.1:8125") + if err != nil { + panic(err) + } + for i := 0; i < 250000; i++ { + fmt.Fprintf(conn, testMsg) + } + // wait for 250,000 metrics to get added to accumulator + time.Sleep(time.Millisecond) + listener.Stop() + } +} + // benchmark how long it takes to accept & process 100,000 metrics: func BenchmarkTCP(b *testing.B) { listener := Statsd{