Skip to content

Commit

Permalink
Update collectd and graphite UDP listeners with perf enhancements
Browse files Browse the repository at this point in the history
closes #4678
  • Loading branch information
sparrc committed Nov 5, 2015
1 parent 780df57 commit e0bcb57
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 15 deletions.
16 changes: 14 additions & 2 deletions services/collectd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,29 @@ import (
)

const (
// DefaultBindAddress is the default port to bind to
DefaultBindAddress = ":25826"

// DefaultDatabase is the default DB to write to
DefaultDatabase = "collectd"

// DefaultRetentionPolicy is the default retention policy of the writes
DefaultRetentionPolicy = ""

DefaultBatchSize = 1000
// DefaultBatchSize is the default write batch size.
DefaultBatchSize = 5000

DefaultBatchPending = 5
// DefaultBatchPending is the default number of pending write batches.
DefaultBatchPending = 10

// DefaultBatchTimeout is the default batch timeout.
DefaultBatchDuration = toml.Duration(10 * time.Second)

DefaultTypesDB = "/usr/share/collectd/types.db"

// DefaultUDPReadBuffer is the default UDP read buffer
// increasing this increases the number of UDP packets that can be handled.
DefaultReadBuffer = 8 * 1024 * 1024
)

// Config represents a configuration for the collectd service.
Expand All @@ -31,6 +41,7 @@ type Config struct {
BatchSize int `toml:"batch-size"`
BatchPending int `toml:"batch-pending"`
BatchDuration toml.Duration `toml:"batch-timeout"`
ReadBuffer int `toml:"read-buffer"`
TypesDB string `toml:"typesdb"`
}

Expand All @@ -40,6 +51,7 @@ func NewConfig() Config {
BindAddress: DefaultBindAddress,
Database: DefaultDatabase,
RetentionPolicy: DefaultRetentionPolicy,
ReadBuffer: DefaultReadBuffer,
BatchSize: DefaultBatchSize,
BatchPending: DefaultBatchPending,
BatchDuration: DefaultBatchDuration,
Expand Down
19 changes: 10 additions & 9 deletions services/collectd/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type Service struct {
wg sync.WaitGroup
err chan error
stop chan struct{}
ln *net.UDPConn
conn *net.UDPConn
batcher *tsdb.PointBatcher
typesdb gollectd.Types
addr net.Addr
Expand Down Expand Up @@ -118,13 +118,14 @@ func (s *Service) Open() error {
s.addr = addr

// Start listening
ln, err := net.ListenUDP("udp", addr)
conn, err := net.ListenUDP("udp", addr)
if err != nil {
return fmt.Errorf("unable to listen on UDP: %s", err)
}
s.ln = ln
conn.SetReadBuffer(s.Config.ReadBuffer)
s.conn = conn

s.Logger.Println("Listening on UDP: ", ln.LocalAddr().String())
s.Logger.Println("Listening on UDP: ", conn.LocalAddr().String())

// Start the points batcher.
s.batcher = tsdb.NewPointBatcher(s.Config.BatchSize, s.Config.BatchPending, time.Duration(s.Config.BatchDuration))
Expand All @@ -147,8 +148,8 @@ func (s *Service) Close() error {
if s.stop != nil {
close(s.stop)
}
if s.ln != nil {
s.ln.Close()
if s.conn != nil {
s.conn.Close()
}
if s.batcher != nil {
s.batcher.Stop()
Expand All @@ -157,7 +158,7 @@ func (s *Service) Close() error {

// Release all remaining resources.
s.stop = nil
s.ln = nil
s.conn = nil
s.batcher = nil
s.Logger.Println("collectd UDP closed")
return nil
Expand All @@ -179,7 +180,7 @@ func (s *Service) Err() chan error { return s.err }

// Addr returns the listener's address. Returns nil if listener is closed.
func (s *Service) Addr() net.Addr {
return s.ln.LocalAddr()
return s.conn.LocalAddr()
}

func (s *Service) serve() {
Expand All @@ -204,7 +205,7 @@ func (s *Service) serve() {
// Keep processing.
}

n, _, err := s.ln.ReadFromUDP(buffer)
n, _, err := s.conn.ReadFromUDP(buffer)
if err != nil {
s.statMap.Add(statReadFail, 1)
s.Logger.Printf("collectd ReadFromUDP error: %s", err)
Expand Down
16 changes: 12 additions & 4 deletions services/graphite/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,18 @@ const (
// measurment parts in a template.
DefaultSeparator = "."

// DefaultBatchSize is the default Graphite batch size.
DefaultBatchSize = 1000
// DefaultBatchSize is the default write batch size.
DefaultBatchSize = 5000

// DefaultBatchPending is the default number of pending Graphite batches.
DefaultBatchPending = 5
// DefaultBatchPending is the default number of pending write batches.
DefaultBatchPending = 10

// DefaultBatchTimeout is the default Graphite batch timeout.
DefaultBatchTimeout = time.Second

// DefaultUDPReadBuffer is the default UDP read buffer
// increasing this increases the number of UDP packets that can be handled.
DefaultUDPReadBuffer = 8 * 1024 * 1024
)

// Config represents the configuration for Graphite endpoints.
Expand All @@ -49,6 +53,7 @@ type Config struct {
Templates []string `toml:"templates"`
Tags []string `toml:"tags"`
Separator string `toml:"separator"`
UDPReadBuffer int `toml:"udp-read-buffer"`
}

// WithDefaults takes the given config and returns a new config with any required
Expand Down Expand Up @@ -79,6 +84,9 @@ func (c *Config) WithDefaults() *Config {
if d.Separator == "" {
d.Separator = DefaultSeparator
}
if d.UDPReadBuffer == 0 {
d.UDPReadBuffer = DefaultUDPReadBuffer
}
return &d
}

Expand Down
3 changes: 3 additions & 0 deletions services/graphite/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type Service struct {
batchPending int
batchTimeout time.Duration
consistencyLevel cluster.ConsistencyLevel
udpReadBuffer int

batcher *tsdb.PointBatcher
parser *Parser
Expand Down Expand Up @@ -95,6 +96,7 @@ func NewService(c Config) (*Service, error) {
protocol: d.Protocol,
batchSize: d.BatchSize,
batchPending: d.BatchPending,
udpReadBuffer: d.UDPReadBuffer,
batchTimeout: time.Duration(d.BatchTimeout),
logger: log.New(os.Stderr, "[graphite] ", log.LstdFlags),
tcpConnections: make(map[string]*tcpConnection),
Expand Down Expand Up @@ -293,6 +295,7 @@ func (s *Service) openUDPServer() (net.Addr, error) {
if err != nil {
return nil, err
}
s.udpConn.SetReadBuffer(s.udpReadBuffer)

buf := make([]byte, udpBufferSize)
s.wg.Add(1)
Expand Down

0 comments on commit e0bcb57

Please sign in to comment.