Skip to content

Commit

Permalink
add keep-alive support to socket_listener & socket_writer (influxdata…
Browse files Browse the repository at this point in the history
  • Loading branch information
phemmer authored and Vladislav Mugultyanov (Lazada Group) committed May 30, 2017
1 parent 4e6ca5c commit f726448
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ be deprecated eventually.
- [#2038](https://github.com/influxdata/telegraf/issues/2038): Add papertrail support to webhooks
- [#2253](https://github.com/influxdata/telegraf/pull/2253): Change jolokia plugin to use bulk requests.
- [#2575](https://github.com/influxdata/telegraf/issues/2575) Add diskio input for Darwin
- [#2635](https://github.com/influxdata/telegraf/issues/2635): add tcp keep-alive to socket_listener & socket_writer

### Bugfixes

Expand Down
6 changes: 6 additions & 0 deletions plugins/inputs/socket_listener/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ This is a sample configuration for the plugin.
## Defaults to the OS default.
# read_buffer_size = 65535

## Period between keep alive probes.
## Only applies to TCP sockets.
## 0 disables keep alive probes.
## Defaults to the OS configuration.
# keep_alive_period = "5m"

## Data format to consume.
## Each data format has it's own unique set of configuration options, read
## more about them here:
Expand Down
36 changes: 33 additions & 3 deletions plugins/inputs/socket_listener/socket_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sync"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
)
Expand Down Expand Up @@ -47,6 +48,11 @@ func (ssl *streamSocketListener) listen() {
}
ssl.connections[c.RemoteAddr().String()] = c
ssl.connectionsMtx.Unlock()

if err := ssl.setKeepAlive(c); err != nil {
ssl.AddError(fmt.Errorf("unable to configure keep alive (%s): %s", ssl.ServiceAddress, err))
}

go ssl.read(c)
}

Expand All @@ -57,6 +63,23 @@ func (ssl *streamSocketListener) listen() {
ssl.connectionsMtx.Unlock()
}

func (ssl *streamSocketListener) setKeepAlive(c net.Conn) error {
if ssl.KeepAlivePeriod == nil {
return nil
}
tcpc, ok := c.(*net.TCPConn)
if !ok {
return fmt.Errorf("cannot set keep alive on a %s socket", strings.SplitN(ssl.ServiceAddress, "://", 2)[0])
}
if ssl.KeepAlivePeriod.Duration == 0 {
return tcpc.SetKeepAlive(false)
}
if err := tcpc.SetKeepAlive(true); err != nil {
return err
}
return tcpc.SetKeepAlivePeriod(ssl.KeepAlivePeriod.Duration)
}

func (ssl *streamSocketListener) removeConnection(c net.Conn) {
ssl.connectionsMtx.Lock()
delete(ssl.connections, c.RemoteAddr().String())
Expand Down Expand Up @@ -116,9 +139,10 @@ func (psl *packetSocketListener) listen() {
}

type SocketListener struct {
ServiceAddress string
MaxConnections int
ReadBufferSize int
ServiceAddress string
MaxConnections int
ReadBufferSize int
KeepAlivePeriod *internal.Duration

parsers.Parser
telegraf.Accumulator
Expand Down Expand Up @@ -154,6 +178,12 @@ func (sl *SocketListener) SampleConfig() string {
## Defaults to the OS default.
# read_buffer_size = 65535
## Period between keep alive probes.
## Only applies to TCP sockets.
## 0 disables keep alive probes.
## Defaults to the OS configuration.
# keep_alive_period = "5m"
## Data format to consume.
## Each data format has it's own unique set of configuration options, read
## more about them here:
Expand Down
6 changes: 6 additions & 0 deletions plugins/outputs/socket_writer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ It can output data in any of the [supported output formats](https://github.com/i
# address = "unix:///tmp/telegraf.sock"
# address = "unixgram:///tmp/telegraf.sock"

## Period between keep alive probes.
## Only applies to TCP sockets.
## 0 disables keep alive probes.
## Defaults to the OS configuration.
# keep_alive_period = "5m"

## Data format to generate.
## Each data format has it's own unique set of configuration options, read
## more about them here:
Expand Down
32 changes: 31 additions & 1 deletion plugins/outputs/socket_writer/socket_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,19 @@ package socket_writer

import (
"fmt"
"log"
"net"
"strings"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
)

type SocketWriter struct {
Address string
Address string
KeepAlivePeriod *internal.Duration

serializers.Serializer

Expand All @@ -36,6 +39,12 @@ func (sw *SocketWriter) SampleConfig() string {
# address = "unix:///tmp/telegraf.sock"
# address = "unixgram:///tmp/telegraf.sock"
## Period between keep alive probes.
## Only applies to TCP sockets.
## 0 disables keep alive probes.
## Defaults to the OS configuration.
# keep_alive_period = "5m"
## Data format to generate.
## Each data format has it's own unique set of configuration options, read
## more about them here:
Expand All @@ -59,10 +68,31 @@ func (sw *SocketWriter) Connect() error {
return err
}

if err := sw.setKeepAlive(c); err != nil {
log.Printf("unable to configure keep alive (%s): %s", sw.Address, err)
}

sw.Conn = c
return nil
}

func (sw *SocketWriter) setKeepAlive(c net.Conn) error {
if sw.KeepAlivePeriod == nil {
return nil
}
tcpc, ok := c.(*net.TCPConn)
if !ok {
return fmt.Errorf("cannot set keep alive on a %s socket", strings.SplitN(sw.Address, "://", 2)[0])
}
if sw.KeepAlivePeriod.Duration == 0 {
return tcpc.SetKeepAlive(false)
}
if err := tcpc.SetKeepAlive(true); err != nil {
return err
}
return tcpc.SetKeepAlivePeriod(sw.KeepAlivePeriod.Duration)
}

// Write writes the given metrics to the destination.
// If an error is encountered, it is up to the caller to retry the same write again later.
// Not parallel safe.
Expand Down

0 comments on commit f726448

Please sign in to comment.