Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: TCP Input #6266

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di
the system test. {pull}6121[6121]
- Add IIS module to parse access log and error log. {pull}6127[6127]
- Remove the undefined `username` option from the Redis input and clarify the documentation. {pull}6662[6662]
- Refactor the usage of prospector to input in the YAML reference {pull}6121[6121]
- Addition of the TCP input {pull}6266[6266]

*Heartbeat*

Expand Down
14 changes: 14 additions & 0 deletions filebeat/_meta/common.reference.p2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,20 @@ filebeat.inputs:
# Maximum size of the message received over UDP
#max_message_size: 10240

#------------------------------ TCP prospector --------------------------------
# Experimental: Config options for the TCP input
#- type: tcp
#enabled: false

# The host and port to receive the new event
#host: "localhost:9000"

# Character used to split new message
#line_delimiter: "\n"

# Maximum size in bytes of the message received over TCP
#max_message_size: 20971520

#========================== Filebeat autodiscover ==============================

# Autodiscover allows you to detect changes in the system and spawn new modules
Expand Down
5 changes: 4 additions & 1 deletion filebeat/docs/filebeat-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ and configuring modules.
To configure {beatname_uc} manually (instead of using
<<{beatname_lc}-modules-overview,modules>>), you specify a list of inputs in the
+{beatname_lc}.inputs+ section of the +{beatname_lc}.yml+. Inputs specify how
{beatname_uc} locates and processes input data.
{beatname_uc} locates and processes input data.

The list is a http://yaml.org/[YAML] array, so each input begins with
a dash (`-`). You can specify multiple inputs, and you can specify the same
Expand Down Expand Up @@ -47,6 +47,7 @@ You can configure {beatname_uc} to use the following inputs:
* <<{beatname_lc}-input-redis>>
* <<{beatname_lc}-input-udp>>
* <<{beatname_lc}-input-docker>>
* <<{beatname_lc}-input-tcp>>



Expand All @@ -59,3 +60,5 @@ include::inputs/input-redis.asciidoc[]
include::inputs/input-udp.asciidoc[]

include::inputs/input-docker.asciidoc[]

include::inputs/input-tcp.asciidoc[]
55 changes: 55 additions & 0 deletions filebeat/docs/inputs/input-tcp.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
:type: tcp

[id="{beatname_lc}-input-{type}"]
=== TCP input

++++
<titleabbrev>TCP</titleabbrev>
++++

Use the `TCP` input to read events over TCP.

Example configuration:

["source","yaml",subs="attributes"]
----
{beatname_lc}.inputs:
- type: tcp
max_message_size: 10240
host: "localhost:9000"
----


==== Configuration options

The `tcp` input supports the following configuration options plus the
<<{beatname_lc}-input-{type}-common-options>> described later.

[float]
[id="{beatname_lc}-input-{type}-max-message-size"]
==== `max_message_size`

The maximum size of the message received over TCP. The default is `20971520`.

[float]
[id="{beatname_lc}-input-{type}-host"]
==== `host`

The host and TCP port to listen on for event streams.

[float]
[id="{beatname_lc}-input-{type}-line-delimiter"]
==== `line_delimiter`

Specify the characters used to split the incoming events. The default is '\n'.

[float]
[id="{beatname_lc}-input-{type}-timeout"]
==== `timeout`

The number of seconds of inactivity before a remote connection is closed. The default is `300`.

[id="{beatname_lc}-input-{type}-common-options"]
include::../inputs/input-common-options.asciidoc[]

:type!:
14 changes: 14 additions & 0 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,20 @@ filebeat.inputs:
# Maximum size of the message received over UDP
#max_message_size: 10240

#------------------------------ TCP prospector --------------------------------
# Experimental: Config options for the TCP input
#- type: tcp
#enabled: false

# The host and port to receive the new event
#host: "localhost:9000"

# Character used to split new message
#line_delimiter: "\n"

# Maximum size in bytes of the message received over TCP
#max_message_size: 20971520

#========================== Filebeat autodiscover ==============================

# Autodiscover allows you to detect changes in the system and spawn new modules
Expand Down
1 change: 1 addition & 0 deletions filebeat/include/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ import (
_ "github.com/elastic/beats/filebeat/input/log"
_ "github.com/elastic/beats/filebeat/input/redis"
_ "github.com/elastic/beats/filebeat/input/stdin"
_ "github.com/elastic/beats/filebeat/input/tcp"
_ "github.com/elastic/beats/filebeat/input/udp"
)
113 changes: 113 additions & 0 deletions filebeat/input/tcp/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package tcp

import (
"bufio"
"net"
"strings"
"time"

"github.com/pkg/errors"

"github.com/elastic/beats/filebeat/harvester"
"github.com/elastic/beats/filebeat/util"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
)

// Client is a remote client.
type Client struct {
conn net.Conn
forwarder *harvester.Forwarder
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we replace *harvester.Forwarder by an interface or function type? Some more isolation from harvester.Forwarder (I hope to remove Forwarder and Outlet in the future).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Knowing that it make sense to change it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to #6439 ?:)
I was planning to do it in second part but I can do it right now.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's keep the PR as small as possible and do the interface when we have a second implementation.

done chan struct{}
metadata common.MapStr
splitFunc bufio.SplitFunc
maxReadMessage uint64
timeout time.Duration
}

// NewClient returns a new client instance for the remote connection.
func NewClient(
conn net.Conn,
forwarder *harvester.Forwarder,
splitFunc bufio.SplitFunc,
maxReadMessage uint64,
timeout time.Duration,
) *Client {
client := &Client{
conn: conn,
forwarder: forwarder,
done: make(chan struct{}),
splitFunc: splitFunc,
maxReadMessage: maxReadMessage,
timeout: timeout,
metadata: common.MapStr{
"hostnames": remoteHosts(conn),
"ip_address": conn.RemoteAddr().String(),
},
}

return client
}

// Handle is reading message from the specified TCP socket.
func (c *Client) Handle() error {
r := NewResetableLimitedReader(NewDeadlineReader(c.conn, c.timeout), c.maxReadMessage)
buf := bufio.NewReader(r)
scanner := bufio.NewScanner(buf)
scanner.Split(c.splitFunc)

for scanner.Scan() {
err := scanner.Err()
if err != nil {
// we are forcing a close on the socket, lets ignore any error that could happen.
select {
case <-c.done:
break
default:
}
// This is a user defined limit and we should notify the user.
if IsMaxReadBufferErr(err) {
logp.Err("tcp client error: %s", err)
}
return errors.Wrap(err, "tcp client error")
}
r.Reset()
c.forwarder.Send(c.createEvent(scanner.Text()))
}
return nil
}

// Close stops reading from the socket and close the connection.
func (c *Client) Close() {
close(c.done)
c.conn.Close()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering if we could have c.conn.Close() in the same method where it's opened to make sure we don't forget about it. But not sure if this is possible here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think its necessary to call c.conn.Close() here to make sure we don't accept new clients.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree on async Close. Still, double check by running/compiling with -race the async Close does not trigger race condition warnings.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've manually tested with a build with -race and run the test with -race, didn't see any race condition warnings.

}

func (c *Client) createEvent(rawString string) *util.Data {
data := util.NewData()
data.Event = beat.Event{
Timestamp: time.Now(),
Meta: c.metadata,
Copy link

@urso urso Feb 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The metadata field behaves like @metadata in Logstash. If one sets index or id or pipeline, this will be configure some paramters in the Elasticsearch output. This on purpose?

When indexing into ES, the @metadata is dropped. That is, when sending directly to ES, this information can never be retrieved by users (no rename processor).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did it on purpose, I am not sure how valuable that information it is. but I could make it public field and store in the mapping. WDYT?

Fields: common.MapStr{
"message": rawString,
},
}
return data
}

// GetRemoteHosts take the IP address of the client and try to resolve the name, if it fails we
// fallback to the IP, IP can resolve to multiple hostname.
func remoteHosts(conn net.Conn) []string {
ip := conn.RemoteAddr().String()
idx := strings.Index(ip, ":")
if idx == -1 {
return []string{ip}
}
ip = ip[0:idx]
hosts, err := net.LookupAddr(ip)
if err != nil {
hosts = []string{ip}
}
return hosts
}
24 changes: 24 additions & 0 deletions filebeat/input/tcp/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package tcp

import (
"time"

"github.com/elastic/beats/filebeat/harvester"
)

type config struct {
harvester.ForwarderConfig `config:",inline"`
Host string `config:"host" validate:"required"`
LineDelimiter string `config:"line_delimiter" validate:"nonzero"`
Timeout time.Duration `config:"timeout" validate:"nonzero,positive"`
MaxMessageSize uint64 `config:"max_message_size" validate:"nonzero,positive"`
}

var defaultConfig = config{
ForwarderConfig: harvester.ForwarderConfig{
Type: "tcp",
},
LineDelimiter: "\n",
Timeout: time.Minute * 5,
MaxMessageSize: 20 * 1024 * 1024,
}
72 changes: 72 additions & 0 deletions filebeat/input/tcp/conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package tcp
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've keep theses methods public, because I might move it to common.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should only move it to common if we also use it in metricbeat for example, which could happen.


import (
"io"
"net"
"time"

"github.com/pkg/errors"
)

// ErrMaxReadBuffer returns when too many bytes was read on the io.Reader
var ErrMaxReadBuffer = errors.New("max read buffer reached")

// ResetableLimitedReader is based on LimitedReader but allow to reset the byte read and return a specific
// error when we reach the limit.
type ResetableLimitedReader struct {
reader io.Reader
maxReadBuffer uint64
byteRead uint64
}

// NewResetableLimitedReader returns a new ResetableLimitedReader
func NewResetableLimitedReader(reader io.Reader, maxReadBuffer uint64) *ResetableLimitedReader {
return &ResetableLimitedReader{
reader: reader,
maxReadBuffer: maxReadBuffer,
}
}

// Read reads the specified amount of byte
func (m *ResetableLimitedReader) Read(p []byte) (n int, err error) {
if m.byteRead >= m.maxReadBuffer {
return 0, ErrMaxReadBuffer
}
n, err = m.reader.Read(p)
m.byteRead += uint64(n)
return
}

// Reset resets the number of byte read
func (m *ResetableLimitedReader) Reset() {
m.byteRead = 0
}

// IsMaxReadBufferErr returns true when the error is ErrMaxReadBuffer
func IsMaxReadBufferErr(err error) bool {
return err == ErrMaxReadBuffer
}

// DeadlineReader allow read to a io.Reader to timeout, the timeout is refreshed on every read.
type DeadlineReader struct {
conn net.Conn
timeout time.Duration
}

// NewDeadlineReader returns a new DeadlineReader
func NewDeadlineReader(c net.Conn, timeout time.Duration) *DeadlineReader {
return &DeadlineReader{
conn: c,
timeout: timeout,
}
}

// Read reads the number of bytes from the reader
func (d *DeadlineReader) Read(p []byte) (n int, err error) {
d.refresh()
return d.conn.Read(p)
}

func (d *DeadlineReader) refresh() {
d.conn.SetDeadline(time.Now().Add(d.timeout))
}
43 changes: 43 additions & 0 deletions filebeat/input/tcp/conn_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package tcp

import (
"strings"
"testing"

"github.com/stretchr/testify/assert"
)

func TestResetableLimitedReader(t *testing.T) {
maxReadBuffer := 400

t.Run("WhenMaxReadIsReachedInMultipleRead", func(t *testing.T) {
r := strings.NewReader(randomString(maxReadBuffer * 2))
m := NewResetableLimitedReader(r, uint64(maxReadBuffer))
toRead := make([]byte, maxReadBuffer)
_, err := m.Read(toRead)
assert.NoError(t, err)
toRead = make([]byte, 300)
_, err = m.Read(toRead)
assert.Equal(t, ErrMaxReadBuffer, err)
})

t.Run("WhenMaxReadIsNotReached", func(t *testing.T) {
r := strings.NewReader(randomString(maxReadBuffer * 2))
m := NewResetableLimitedReader(r, uint64(maxReadBuffer))
toRead := make([]byte, maxReadBuffer)
_, err := m.Read(toRead)
assert.NoError(t, err)
})

t.Run("WhenResetIsCalled", func(t *testing.T) {
r := strings.NewReader(randomString(maxReadBuffer * 2))
m := NewResetableLimitedReader(r, uint64(maxReadBuffer))
toRead := make([]byte, maxReadBuffer)
_, err := m.Read(toRead)
assert.NoError(t, err)
m.Reset()
toRead = make([]byte, 300)
_, err = m.Read(toRead)
assert.NoError(t, err)
})
}
Loading