Skip to content

Commit

Permalink
Reduce UDP service per-packet allocation size
Browse files Browse the repository at this point in the history
This will reduce memory pressure and number of GC cycles, my results
sending 100,000 UDP points were:

- udp-payload-size=0: 242 GC cycles
- udp-payload-size=1500: 142 GC cycles
- udp-payload-size=0 (with change): 114 GC cycles
- udp-payload-size=1500 (with change): 112 GC cycles
  • Loading branch information
sparrc committed Apr 7, 2016
1 parent f232c05 commit 253975d
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 26 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### Features

- [#6237](https://github.com/influxdata/influxdb/issues/6237): Enable continuous integration testing on Windows platform via AppVeyor. Thanks @mvadu
- [#6263](https://github.com/influxdata/influxdb/pull/6263): Reduce UDP Service allocation size.

### Bugfixes

Expand Down
25 changes: 3 additions & 22 deletions services/udp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,24 +40,6 @@ const (
// Linux: sudo sysctl -w net.core.rmem_max=<read-buffer>
// BSD/Darwin: sudo sysctl -w kern.ipc.maxsockbuf=<read-buffer>
DefaultReadBuffer = 0

// DefaultUDPPayloadSize sets the default value of the incoming UDP packet
// to the spec max, i.e. 65536. That being said, this value should likely
// be tuned lower to match your udp_payload size if using tools like
// telegraf.
//
// https://en.wikipedia.org/wiki/User_Datagram_Protocol#Packet_structure
//
// Reading packets from a UDP socket in go actually only pulls
// one packet at a time, requiring a very fast reader to keep up with
// incoming data at scale. Reducing the overhead of the expected packet
// helps allocate memory faster (~10-25µs --> ~150ns with go1.5.2), thereby
// speeding up the processing of data coming in.
//
// NOTE: if you send a payload greater than the UDPPayloadSize, you will
// cause a buffer overflow...tune your application very carefully to match
// udp_payload for your metrics source
DefaultUDPPayloadSize = 65536
)

// Config holds various configuration settings for the UDP listener.
Expand All @@ -72,7 +54,9 @@ type Config struct {
ReadBuffer int `toml:"read-buffer"`
BatchTimeout toml.Duration `toml:"batch-timeout"`
Precision string `toml:"precision"`
UDPPayloadSize int `toml:"udp-payload-size"`

// Deprecated config option
udpPayloadSize int `toml:"udp-payload-size"`
}

// NewConfig returns a new instance of Config with defaults.
Expand Down Expand Up @@ -109,8 +93,5 @@ func (c *Config) WithDefaults() *Config {
if d.ReadBuffer == 0 {
d.ReadBuffer = DefaultReadBuffer
}
if d.UDPPayloadSize == 0 {
d.UDPPayloadSize = DefaultUDPPayloadSize
}
return &d
}
2 changes: 0 additions & 2 deletions services/udp/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,5 @@ udp-payload-size = 1500
t.Fatalf("unexpected batch pending: %d", c.BatchPending)
} else if time.Duration(c.BatchTimeout) != (10 * time.Millisecond) {
t.Fatalf("unexpected batch timeout: %v", c.BatchTimeout)
} else if c.UDPPayloadSize != 1500 {
t.Fatalf("unexpected udp-payload-size: %d", c.UDPPayloadSize)
}
}
9 changes: 7 additions & 2 deletions services/udp/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
const (
// Arbitrary, testing indicated that this doesn't typically get over 10
parserChanLen = 1000

MAX_UDP_PAYLOAD = 64 * 1024
)

// statistics gathered by the UDP package.
Expand Down Expand Up @@ -144,6 +146,7 @@ func (s *Service) writer() {
func (s *Service) serve() {
defer s.wg.Done()

buf := make([]byte, MAX_UDP_PAYLOAD)
s.batcher.Start()
for {

Expand All @@ -153,15 +156,17 @@ func (s *Service) serve() {
return
default:
// Keep processing.
buf := make([]byte, s.config.UDPPayloadSize)
n, _, err := s.conn.ReadFromUDP(buf)
if err != nil {
s.statMap.Add(statReadFail, 1)
s.Logger.Printf("Failed to read UDP message: %s", err)
continue
}
s.statMap.Add(statBytesReceived, int64(n))
s.parserChan <- buf[:n]

bufCopy := make([]byte, n)
copy(bufCopy, buf[:n])
s.parserChan <- bufCopy
}
}
}
Expand Down

0 comments on commit 253975d

Please sign in to comment.