From f726448ffd42904dcd94f504d894b7d2e44d046f Mon Sep 17 00:00:00 2001 From: Patrick Hemmer Date: Mon, 24 Apr 2017 16:14:42 -0400 Subject: [PATCH] add keep-alive support to socket_listener & socket_writer (#2697) closes #2635 --- CHANGELOG.md | 1 + plugins/inputs/socket_listener/README.md | 6 ++++ .../inputs/socket_listener/socket_listener.go | 36 +++++++++++++++++-- plugins/outputs/socket_writer/README.md | 6 ++++ .../outputs/socket_writer/socket_writer.go | 32 ++++++++++++++++- 5 files changed, 77 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e932d303db2a6..7921bde8fdc3e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -75,6 +75,7 @@ be deprecated eventually. - [#2038](https://github.com/influxdata/telegraf/issues/2038): Add papertrail support to webhooks - [#2253](https://github.com/influxdata/telegraf/pull/2253): Change jolokia plugin to use bulk requests. - [#2575](https://github.com/influxdata/telegraf/issues/2575) Add diskio input for Darwin +- [#2635](https://github.com/influxdata/telegraf/issues/2635): add tcp keep-alive to socket_listener & socket_writer ### Bugfixes diff --git a/plugins/inputs/socket_listener/README.md b/plugins/inputs/socket_listener/README.md index e73296804b202..604b8e893487f 100644 --- a/plugins/inputs/socket_listener/README.md +++ b/plugins/inputs/socket_listener/README.md @@ -36,6 +36,12 @@ This is a sample configuration for the plugin. ## Defaults to the OS default. # read_buffer_size = 65535 + ## Period between keep alive probes. + ## Only applies to TCP sockets. + ## 0 disables keep alive probes. + ## Defaults to the OS configuration. + # keep_alive_period = "5m" + ## Data format to consume. ## Each data format has it's own unique set of configuration options, read ## more about them here: diff --git a/plugins/inputs/socket_listener/socket_listener.go b/plugins/inputs/socket_listener/socket_listener.go index 4a9a470a75e13..eb5fc23ab48ea 100644 --- a/plugins/inputs/socket_listener/socket_listener.go +++ b/plugins/inputs/socket_listener/socket_listener.go @@ -11,6 +11,7 @@ import ( "sync" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" ) @@ -47,6 +48,11 @@ func (ssl *streamSocketListener) listen() { } ssl.connections[c.RemoteAddr().String()] = c ssl.connectionsMtx.Unlock() + + if err := ssl.setKeepAlive(c); err != nil { + ssl.AddError(fmt.Errorf("unable to configure keep alive (%s): %s", ssl.ServiceAddress, err)) + } + go ssl.read(c) } @@ -57,6 +63,23 @@ func (ssl *streamSocketListener) listen() { ssl.connectionsMtx.Unlock() } +func (ssl *streamSocketListener) setKeepAlive(c net.Conn) error { + if ssl.KeepAlivePeriod == nil { + return nil + } + tcpc, ok := c.(*net.TCPConn) + if !ok { + return fmt.Errorf("cannot set keep alive on a %s socket", strings.SplitN(ssl.ServiceAddress, "://", 2)[0]) + } + if ssl.KeepAlivePeriod.Duration == 0 { + return tcpc.SetKeepAlive(false) + } + if err := tcpc.SetKeepAlive(true); err != nil { + return err + } + return tcpc.SetKeepAlivePeriod(ssl.KeepAlivePeriod.Duration) +} + func (ssl *streamSocketListener) removeConnection(c net.Conn) { ssl.connectionsMtx.Lock() delete(ssl.connections, c.RemoteAddr().String()) @@ -116,9 +139,10 @@ func (psl *packetSocketListener) listen() { } type SocketListener struct { - ServiceAddress string - MaxConnections int - ReadBufferSize int + ServiceAddress string + MaxConnections int + ReadBufferSize int + KeepAlivePeriod *internal.Duration parsers.Parser telegraf.Accumulator @@ -154,6 +178,12 @@ func (sl *SocketListener) SampleConfig() string { ## Defaults to the OS default. # read_buffer_size = 65535 + ## Period between keep alive probes. + ## Only applies to TCP sockets. + ## 0 disables keep alive probes. + ## Defaults to the OS configuration. + # keep_alive_period = "5m" + ## Data format to consume. ## Each data format has it's own unique set of configuration options, read ## more about them here: diff --git a/plugins/outputs/socket_writer/README.md b/plugins/outputs/socket_writer/README.md index 441cdf1f708a6..027e92fe82a38 100644 --- a/plugins/outputs/socket_writer/README.md +++ b/plugins/outputs/socket_writer/README.md @@ -19,6 +19,12 @@ It can output data in any of the [supported output formats](https://github.com/i # address = "unix:///tmp/telegraf.sock" # address = "unixgram:///tmp/telegraf.sock" + ## Period between keep alive probes. + ## Only applies to TCP sockets. + ## 0 disables keep alive probes. + ## Defaults to the OS configuration. + # keep_alive_period = "5m" + ## Data format to generate. ## Each data format has it's own unique set of configuration options, read ## more about them here: diff --git a/plugins/outputs/socket_writer/socket_writer.go b/plugins/outputs/socket_writer/socket_writer.go index 2c54bb0bb5bfb..dbc3cba05dd34 100644 --- a/plugins/outputs/socket_writer/socket_writer.go +++ b/plugins/outputs/socket_writer/socket_writer.go @@ -2,16 +2,19 @@ package socket_writer import ( "fmt" + "log" "net" "strings" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/serializers" ) type SocketWriter struct { - Address string + Address string + KeepAlivePeriod *internal.Duration serializers.Serializer @@ -36,6 +39,12 @@ func (sw *SocketWriter) SampleConfig() string { # address = "unix:///tmp/telegraf.sock" # address = "unixgram:///tmp/telegraf.sock" + ## Period between keep alive probes. + ## Only applies to TCP sockets. + ## 0 disables keep alive probes. + ## Defaults to the OS configuration. + # keep_alive_period = "5m" + ## Data format to generate. ## Each data format has it's own unique set of configuration options, read ## more about them here: @@ -59,10 +68,31 @@ func (sw *SocketWriter) Connect() error { return err } + if err := sw.setKeepAlive(c); err != nil { + log.Printf("unable to configure keep alive (%s): %s", sw.Address, err) + } + sw.Conn = c return nil } +func (sw *SocketWriter) setKeepAlive(c net.Conn) error { + if sw.KeepAlivePeriod == nil { + return nil + } + tcpc, ok := c.(*net.TCPConn) + if !ok { + return fmt.Errorf("cannot set keep alive on a %s socket", strings.SplitN(sw.Address, "://", 2)[0]) + } + if sw.KeepAlivePeriod.Duration == 0 { + return tcpc.SetKeepAlive(false) + } + if err := tcpc.SetKeepAlive(true); err != nil { + return err + } + return tcpc.SetKeepAlivePeriod(sw.KeepAlivePeriod.Duration) +} + // Write writes the given metrics to the destination. // If an error is encountered, it is up to the caller to retry the same write again later. // Not parallel safe.