Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update collectd and graphite UDP listeners with perf enhancements #4681

Merged
merged 3 commits into from
Nov 10, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
- [#4676](https://github.com/influxdb/influxdb/pull/4676): UDP service listener performance enhancements
- [#4659](https://github.com/influxdb/influxdb/pull/4659): Support IF EXISTS for DROP DATABASE. Thanks @ch33hau
- [#4721](https://github.com/influxdb/influxdb/pull/4721): Export tsdb.InterfaceValues
- [#4681](https://github.com/influxdb/influxdb/pull/4681): Increase default buffer size for collectd and graphite listeners
- [#4659](https://github.com/influxdb/influxdb/pull/4659): Support IF EXISTS for DROP DATABASE

### Bugfixes
- [#4715](https://github.com/influxdb/influxdb/pull/4715): Fix panic during Raft-close. Fix [issue #4707](https://github.com/influxdb/influxdb/issues/4707). Thanks @oiooj
Expand Down
3 changes: 3 additions & 0 deletions etc/config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ reporting-disabled = false
# batch-size = 1000 # will flush if this many points get buffered
# batch-pending = 5 # number of batches that may be pending in memory
# batch-timeout = "1s" # will flush at least this often even if we haven't hit buffer limit
# udp-read-buffer = 0 # UDP Read buffer size, 0 means OS default. UDP listener will fail if set above OS max.

## "name-schema" configures tag names for parsing the metric name from graphite protocol;
## separated by `name-separator`.
Expand Down Expand Up @@ -252,6 +253,7 @@ reporting-disabled = false
# batch-size = 1000 # will flush if this many points get buffered
# batch-pending = 5 # number of batches that may be pending in memory
# batch-timeout = "1s" # will flush at least this often even if we haven't hit buffer limit
# read-buffer = 0 # UDP Read buffer size, 0 means OS default. UDP listener will fail if set above OS max.

###
### [opentsdb]
Expand Down Expand Up @@ -295,6 +297,7 @@ reporting-disabled = false
# batch-size = 1000 # will flush if this many points get buffered
# batch-pending = 5 # number of batches that may be pending in memory
# batch-timeout = "1s" # will flush at least this often even if we haven't hit buffer limit
# read-buffer = 0 # UDP Read buffer size, 0 means OS default. UDP listener will fail if set above OS max.

###
### [continuous_queries]
Expand Down
20 changes: 20 additions & 0 deletions services/collectd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

The _collectd_ input allows InfluxDB to accept data transmitted in collectd native format. This data is transmitted over UDP.

## A note on UDP/IP OS Buffer sizes

If you're running Linux or FreeBSD, please adjust your OS UDP buffer
size limit, [see here for more details.](../udp/README.md#a-note-on-udpip-os-buffer-sizes)

## Configuration

Each collectd input allows the binding address, target database, and target retention policy to be set. If the database does not exist, it will be created automatically when the input is initialized. If the retention policy is not configured, then the default retention policy for the database is used. However if the retention policy is set, the retention policy must be explicitly created. The input will not automatically create it.
Expand All @@ -13,3 +18,18 @@ The path to the collectd types database file may also be set
## Large UDP packets

Please note that UDP packages larger than the standard size of 1452 are dropped at the time of ingestion, so be sure to set `MaxPacketSize` to 1452 in the collectd configuration.

## Config Example

```
[collectd]
enabled = false
bind-address = ":25826" # the bind address
database = "collectd" # Name of the database that will be written to
retention-policy = ""
batch-size = 5000 # will flush if this many points get buffered
batch-pending = 10 # number of batches that may be pending in memory
batch-timeout = "10s"
read-buffer = 0 # UDP read buffer size, 0 means to use OS default
typesdb = "/usr/share/collectd/types.db"
```
25 changes: 23 additions & 2 deletions services/collectd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,38 @@ import (
)

const (
// DefaultBindAddress is the default port to bind to
DefaultBindAddress = ":25826"

// DefaultDatabase is the default DB to write to
DefaultDatabase = "collectd"

// DefaultRetentionPolicy is the default retention policy of the writes
DefaultRetentionPolicy = ""

DefaultBatchSize = 1000
// DefaultBatchSize is the default write batch size.
DefaultBatchSize = 5000

DefaultBatchPending = 5
// DefaultBatchPending is the default number of pending write batches.
DefaultBatchPending = 10

// DefaultBatchTimeout is the default batch timeout.
DefaultBatchDuration = toml.Duration(10 * time.Second)

DefaultTypesDB = "/usr/share/collectd/types.db"

// DefaultReadBuffer is the default buffer size for the UDP listener.
// Sets the size of the operating system's receive buffer associated with
// the UDP traffic. Keep in mind that the OS must be able
// to handle the number set here or the UDP listener will error and exit.
//
// DefaultReadBuffer = 0 means to use the OS default, which is usually too
// small for high UDP performance.
//
// Increasing OS buffer limits:
// Linux: sudo sysctl -w net.core.rmem_max=<read-buffer>
// BSD/Darwin: sudo sysctl -w kern.ipc.maxsockbuf=<read-buffer>
DefaultReadBuffer = 0
)

// Config represents a configuration for the collectd service.
Expand All @@ -31,6 +50,7 @@ type Config struct {
BatchSize int `toml:"batch-size"`
BatchPending int `toml:"batch-pending"`
BatchDuration toml.Duration `toml:"batch-timeout"`
ReadBuffer int `toml:"read-buffer"`
TypesDB string `toml:"typesdb"`
}

Expand All @@ -40,6 +60,7 @@ func NewConfig() Config {
BindAddress: DefaultBindAddress,
Database: DefaultDatabase,
RetentionPolicy: DefaultRetentionPolicy,
ReadBuffer: DefaultReadBuffer,
BatchSize: DefaultBatchSize,
BatchPending: DefaultBatchPending,
BatchDuration: DefaultBatchDuration,
Expand Down
26 changes: 17 additions & 9 deletions services/collectd/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type Service struct {
wg sync.WaitGroup
err chan error
stop chan struct{}
ln *net.UDPConn
conn *net.UDPConn
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed this just for consistency with graphite and udp services

batcher *tsdb.PointBatcher
typesdb gollectd.Types
addr net.Addr
Expand Down Expand Up @@ -118,13 +118,21 @@ func (s *Service) Open() error {
s.addr = addr

// Start listening
ln, err := net.ListenUDP("udp", addr)
conn, err := net.ListenUDP("udp", addr)
if err != nil {
return fmt.Errorf("unable to listen on UDP: %s", err)
}
s.ln = ln

s.Logger.Println("Listening on UDP: ", ln.LocalAddr().String())
if s.Config.ReadBuffer != 0 {
err = conn.SetReadBuffer(s.Config.ReadBuffer)
if err != nil {
return fmt.Errorf("unable to set UDP read buffer to %d: %s",
s.Config.ReadBuffer, err)
}
}
s.conn = conn

s.Logger.Println("Listening on UDP: ", conn.LocalAddr().String())

// Start the points batcher.
s.batcher = tsdb.NewPointBatcher(s.Config.BatchSize, s.Config.BatchPending, time.Duration(s.Config.BatchDuration))
Expand All @@ -147,8 +155,8 @@ func (s *Service) Close() error {
if s.stop != nil {
close(s.stop)
}
if s.ln != nil {
s.ln.Close()
if s.conn != nil {
s.conn.Close()
}
if s.batcher != nil {
s.batcher.Stop()
Expand All @@ -157,7 +165,7 @@ func (s *Service) Close() error {

// Release all remaining resources.
s.stop = nil
s.ln = nil
s.conn = nil
s.batcher = nil
s.Logger.Println("collectd UDP closed")
return nil
Expand All @@ -179,7 +187,7 @@ func (s *Service) Err() chan error { return s.err }

// Addr returns the listener's address. Returns nil if listener is closed.
func (s *Service) Addr() net.Addr {
return s.ln.LocalAddr()
return s.conn.LocalAddr()
}

func (s *Service) serve() {
Expand All @@ -204,7 +212,7 @@ func (s *Service) serve() {
// Keep processing.
}

n, _, err := s.ln.ReadFromUDP(buffer)
n, _, err := s.conn.ReadFromUDP(buffer)
if err != nil {
s.statMap.Add(statReadFail, 1)
s.Logger.Printf("collectd ReadFromUDP error: %s", err)
Expand Down
31 changes: 28 additions & 3 deletions services/graphite/README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
# Configuration
# The graphite Input

## A note on UDP/IP OS Buffer sizes

If you're using UDP input and running Linux or FreeBSD, please adjust your UDP buffer
size limit, [see here for more details.](../udp/README.md#a-note-on-udpip-os-buffer-sizes)

## Configuration

Each Graphite input allows the binding address, target database, and protocol to be set. If the database does not exist, it will be created automatically when the input is initialized. The write-consistency-level can also be set. If any write operations do not meet the configured consistency guarantees, an error will occur and the data will not be indexed. The default consistency-level is `ONE`.

Each Graphite input also performs internal batching of the points it receives, as batched writes to the database are more efficient. The default _batch size_ is 1000, _pending batch_ factor is 5, with a _batch timeout_ of 1 second. This means the input will write batches of maximum size 1000, but if a batch has not reached 1000 points within 1 second of the first point being added to a batch, it will emit that batch regardless of size. The pending batch factor controls how many batches can be in memory at once, allowing the input to transmit a batch, while still building other batches.

# Parsing Metrics
## Parsing Metrics

The graphite plugin allows measurements to be saved using the graphite line protocol. By default, enabling the graphite plugin will allow you to collect metrics and store them using the metric name as the measurement. If you send a metric named `servers.localhost.cpu.loadavg.10`, it will store the full metric name as the measurement with no extracted tags.

Expand Down Expand Up @@ -147,7 +154,7 @@ If you need to add the same set of tags to all metrics, you can define them glob
#]
```

## Customized Config
## Customized Config
```
[[graphite]]
enabled = true
Expand All @@ -167,3 +174,21 @@ If you need to add the same set of tags to all metrics, you can define them glob
".measurement*",
]
```

## Two graphite listener, UDP & TCP, Config

```
[[graphite]]
enabled = true
bind-address = ":2003"
protocol = "tcp"
# consistency-level = "one"

[[graphite]]
enabled = true
bind-address = ":2004" # the bind address
protocol = "udp" # protocol to read via
udp-read-buffer = 8388608 # (8*1024*1024) UDP read buffer size
```


25 changes: 21 additions & 4 deletions services/graphite/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,27 @@ const (
// measurment parts in a template.
DefaultSeparator = "."

// DefaultBatchSize is the default Graphite batch size.
DefaultBatchSize = 1000
// DefaultBatchSize is the default write batch size.
DefaultBatchSize = 5000

// DefaultBatchPending is the default number of pending Graphite batches.
DefaultBatchPending = 5
// DefaultBatchPending is the default number of pending write batches.
DefaultBatchPending = 10

// DefaultBatchTimeout is the default Graphite batch timeout.
DefaultBatchTimeout = time.Second

// DefaultUDPReadBuffer is the default buffer size for the UDP listener.
// Sets the size of the operating system's receive buffer associated with
// the UDP traffic. Keep in mind that the OS must be able
// to handle the number set here or the UDP listener will error and exit.
//
// DefaultReadBuffer = 0 means to use the OS default, which is usually too
// small for high UDP performance.
//
// Increasing OS buffer limits:
// Linux: sudo sysctl -w net.core.rmem_max=<read-buffer>
// BSD/Darwin: sudo sysctl -w kern.ipc.maxsockbuf=<read-buffer>
DefaultUDPReadBuffer = 0
)

// Config represents the configuration for Graphite endpoints.
Expand All @@ -49,6 +62,7 @@ type Config struct {
Templates []string `toml:"templates"`
Tags []string `toml:"tags"`
Separator string `toml:"separator"`
UDPReadBuffer int `toml:"udp-read-buffer"`
}

func NewConfig() Config {
Expand Down Expand Up @@ -92,6 +106,9 @@ func (c *Config) WithDefaults() *Config {
if d.Separator == "" {
d.Separator = DefaultSeparator
}
if d.UDPReadBuffer == 0 {
d.UDPReadBuffer = DefaultUDPReadBuffer
}
return &d
}

Expand Down
10 changes: 10 additions & 0 deletions services/graphite/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type Service struct {
batchPending int
batchTimeout time.Duration
consistencyLevel cluster.ConsistencyLevel
udpReadBuffer int

batcher *tsdb.PointBatcher
parser *Parser
Expand Down Expand Up @@ -95,6 +96,7 @@ func NewService(c Config) (*Service, error) {
protocol: d.Protocol,
batchSize: d.BatchSize,
batchPending: d.BatchPending,
udpReadBuffer: d.UDPReadBuffer,
batchTimeout: time.Duration(d.BatchTimeout),
logger: log.New(os.Stderr, "[graphite] ", log.LstdFlags),
tcpConnections: make(map[string]*tcpConnection),
Expand Down Expand Up @@ -294,6 +296,14 @@ func (s *Service) openUDPServer() (net.Addr, error) {
return nil, err
}

if s.udpReadBuffer != 0 {
err = s.udpConn.SetReadBuffer(s.udpReadBuffer)
if err != nil {
return nil, fmt.Errorf("unable to set UDP read buffer to %d: %s",
s.udpReadBuffer, err)
}
}

buf := make([]byte, udpBufferSize)
s.wg.Add(1)
go func() {
Expand Down
Loading