-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature: TCP Input #6266
Feature: TCP Input #6266
Changes from all commits
80133fb
34acbd3
70041bb
91ec711
3882f6d
8624bdf
4c25ecd
15d472e
afcf291
d9a4ab5
98d521e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
:type: tcp | ||
|
||
[id="{beatname_lc}-input-{type}"] | ||
=== TCP input | ||
|
||
++++ | ||
<titleabbrev>TCP</titleabbrev> | ||
++++ | ||
|
||
Use the `TCP` input to read events over TCP. | ||
|
||
Example configuration: | ||
|
||
["source","yaml",subs="attributes"] | ||
---- | ||
{beatname_lc}.inputs: | ||
- type: tcp | ||
max_message_size: 10240 | ||
host: "localhost:9000" | ||
---- | ||
|
||
|
||
==== Configuration options | ||
|
||
The `tcp` input supports the following configuration options plus the | ||
<<{beatname_lc}-input-{type}-common-options>> described later. | ||
|
||
[float] | ||
[id="{beatname_lc}-input-{type}-max-message-size"] | ||
==== `max_message_size` | ||
|
||
The maximum size of the message received over TCP. The default is `20971520`. | ||
|
||
[float] | ||
[id="{beatname_lc}-input-{type}-host"] | ||
==== `host` | ||
|
||
The host and TCP port to listen on for event streams. | ||
|
||
[float] | ||
[id="{beatname_lc}-input-{type}-line-delimiter"] | ||
==== `line_delimiter` | ||
|
||
Specify the characters used to split the incoming events. The default is '\n'. | ||
|
||
[float] | ||
[id="{beatname_lc}-input-{type}-timeout"] | ||
==== `timeout` | ||
|
||
The number of seconds of inactivity before a remote connection is closed. The default is `300`. | ||
|
||
[id="{beatname_lc}-input-{type}-common-options"] | ||
include::../inputs/input-common-options.asciidoc[] | ||
|
||
:type!: |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
package tcp | ||
|
||
import ( | ||
"bufio" | ||
"net" | ||
"strings" | ||
"time" | ||
|
||
"github.com/pkg/errors" | ||
|
||
"github.com/elastic/beats/filebeat/harvester" | ||
"github.com/elastic/beats/filebeat/util" | ||
"github.com/elastic/beats/libbeat/beat" | ||
"github.com/elastic/beats/libbeat/common" | ||
"github.com/elastic/beats/libbeat/logp" | ||
) | ||
|
||
// Client is a remote client. | ||
type Client struct { | ||
conn net.Conn | ||
forwarder *harvester.Forwarder | ||
done chan struct{} | ||
metadata common.MapStr | ||
splitFunc bufio.SplitFunc | ||
maxReadMessage uint64 | ||
timeout time.Duration | ||
} | ||
|
||
// NewClient returns a new client instance for the remote connection. | ||
func NewClient( | ||
conn net.Conn, | ||
forwarder *harvester.Forwarder, | ||
splitFunc bufio.SplitFunc, | ||
maxReadMessage uint64, | ||
timeout time.Duration, | ||
) *Client { | ||
client := &Client{ | ||
conn: conn, | ||
forwarder: forwarder, | ||
done: make(chan struct{}), | ||
splitFunc: splitFunc, | ||
maxReadMessage: maxReadMessage, | ||
timeout: timeout, | ||
metadata: common.MapStr{ | ||
"hostnames": remoteHosts(conn), | ||
"ip_address": conn.RemoteAddr().String(), | ||
}, | ||
} | ||
|
||
return client | ||
} | ||
|
||
// Handle is reading message from the specified TCP socket. | ||
func (c *Client) Handle() error { | ||
r := NewResetableLimitedReader(NewDeadlineReader(c.conn, c.timeout), c.maxReadMessage) | ||
buf := bufio.NewReader(r) | ||
scanner := bufio.NewScanner(buf) | ||
scanner.Split(c.splitFunc) | ||
|
||
for scanner.Scan() { | ||
err := scanner.Err() | ||
if err != nil { | ||
// we are forcing a close on the socket, lets ignore any error that could happen. | ||
select { | ||
case <-c.done: | ||
break | ||
default: | ||
} | ||
// This is a user defined limit and we should notify the user. | ||
if IsMaxReadBufferErr(err) { | ||
logp.Err("tcp client error: %s", err) | ||
} | ||
return errors.Wrap(err, "tcp client error") | ||
} | ||
r.Reset() | ||
c.forwarder.Send(c.createEvent(scanner.Text())) | ||
} | ||
return nil | ||
} | ||
|
||
// Close stops reading from the socket and close the connection. | ||
func (c *Client) Close() { | ||
close(c.done) | ||
c.conn.Close() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was wondering if we could have There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think its necessary to call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agree on async Close. Still, double check by running/compiling with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've manually tested with a build with |
||
} | ||
|
||
func (c *Client) createEvent(rawString string) *util.Data { | ||
data := util.NewData() | ||
data.Event = beat.Event{ | ||
Timestamp: time.Now(), | ||
Meta: c.metadata, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The When indexing into ES, the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I did it on purpose, I am not sure how valuable that information it is. but I could make it public field and store in the mapping. WDYT? |
||
Fields: common.MapStr{ | ||
"message": rawString, | ||
}, | ||
} | ||
return data | ||
} | ||
|
||
// GetRemoteHosts take the IP address of the client and try to resolve the name, if it fails we | ||
// fallback to the IP, IP can resolve to multiple hostname. | ||
func remoteHosts(conn net.Conn) []string { | ||
ip := conn.RemoteAddr().String() | ||
idx := strings.Index(ip, ":") | ||
if idx == -1 { | ||
return []string{ip} | ||
} | ||
ip = ip[0:idx] | ||
hosts, err := net.LookupAddr(ip) | ||
if err != nil { | ||
hosts = []string{ip} | ||
} | ||
return hosts | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
package tcp | ||
|
||
import ( | ||
"time" | ||
|
||
"github.com/elastic/beats/filebeat/harvester" | ||
) | ||
|
||
type config struct { | ||
harvester.ForwarderConfig `config:",inline"` | ||
Host string `config:"host" validate:"required"` | ||
LineDelimiter string `config:"line_delimiter" validate:"nonzero"` | ||
Timeout time.Duration `config:"timeout" validate:"nonzero,positive"` | ||
MaxMessageSize uint64 `config:"max_message_size" validate:"nonzero,positive"` | ||
} | ||
|
||
var defaultConfig = config{ | ||
ForwarderConfig: harvester.ForwarderConfig{ | ||
Type: "tcp", | ||
}, | ||
LineDelimiter: "\n", | ||
Timeout: time.Minute * 5, | ||
MaxMessageSize: 20 * 1024 * 1024, | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
package tcp | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've keep theses methods public, because I might move it to common. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should only move it to common if we also use it in |
||
|
||
import ( | ||
"io" | ||
"net" | ||
"time" | ||
|
||
"github.com/pkg/errors" | ||
) | ||
|
||
// ErrMaxReadBuffer returns when too many bytes was read on the io.Reader | ||
var ErrMaxReadBuffer = errors.New("max read buffer reached") | ||
|
||
// ResetableLimitedReader is based on LimitedReader but allow to reset the byte read and return a specific | ||
// error when we reach the limit. | ||
type ResetableLimitedReader struct { | ||
reader io.Reader | ||
maxReadBuffer uint64 | ||
byteRead uint64 | ||
} | ||
|
||
// NewResetableLimitedReader returns a new ResetableLimitedReader | ||
func NewResetableLimitedReader(reader io.Reader, maxReadBuffer uint64) *ResetableLimitedReader { | ||
return &ResetableLimitedReader{ | ||
reader: reader, | ||
maxReadBuffer: maxReadBuffer, | ||
} | ||
} | ||
|
||
// Read reads the specified amount of byte | ||
func (m *ResetableLimitedReader) Read(p []byte) (n int, err error) { | ||
if m.byteRead >= m.maxReadBuffer { | ||
return 0, ErrMaxReadBuffer | ||
} | ||
n, err = m.reader.Read(p) | ||
m.byteRead += uint64(n) | ||
return | ||
} | ||
|
||
// Reset resets the number of byte read | ||
func (m *ResetableLimitedReader) Reset() { | ||
m.byteRead = 0 | ||
} | ||
|
||
// IsMaxReadBufferErr returns true when the error is ErrMaxReadBuffer | ||
func IsMaxReadBufferErr(err error) bool { | ||
return err == ErrMaxReadBuffer | ||
} | ||
|
||
// DeadlineReader allow read to a io.Reader to timeout, the timeout is refreshed on every read. | ||
type DeadlineReader struct { | ||
conn net.Conn | ||
timeout time.Duration | ||
} | ||
|
||
// NewDeadlineReader returns a new DeadlineReader | ||
func NewDeadlineReader(c net.Conn, timeout time.Duration) *DeadlineReader { | ||
return &DeadlineReader{ | ||
conn: c, | ||
timeout: timeout, | ||
} | ||
} | ||
|
||
// Read reads the number of bytes from the reader | ||
func (d *DeadlineReader) Read(p []byte) (n int, err error) { | ||
d.refresh() | ||
return d.conn.Read(p) | ||
} | ||
|
||
func (d *DeadlineReader) refresh() { | ||
d.conn.SetDeadline(time.Now().Add(d.timeout)) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
package tcp | ||
|
||
import ( | ||
"strings" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
func TestResetableLimitedReader(t *testing.T) { | ||
maxReadBuffer := 400 | ||
|
||
t.Run("WhenMaxReadIsReachedInMultipleRead", func(t *testing.T) { | ||
r := strings.NewReader(randomString(maxReadBuffer * 2)) | ||
m := NewResetableLimitedReader(r, uint64(maxReadBuffer)) | ||
toRead := make([]byte, maxReadBuffer) | ||
_, err := m.Read(toRead) | ||
assert.NoError(t, err) | ||
toRead = make([]byte, 300) | ||
_, err = m.Read(toRead) | ||
assert.Equal(t, ErrMaxReadBuffer, err) | ||
}) | ||
|
||
t.Run("WhenMaxReadIsNotReached", func(t *testing.T) { | ||
r := strings.NewReader(randomString(maxReadBuffer * 2)) | ||
m := NewResetableLimitedReader(r, uint64(maxReadBuffer)) | ||
toRead := make([]byte, maxReadBuffer) | ||
_, err := m.Read(toRead) | ||
assert.NoError(t, err) | ||
}) | ||
|
||
t.Run("WhenResetIsCalled", func(t *testing.T) { | ||
r := strings.NewReader(randomString(maxReadBuffer * 2)) | ||
m := NewResetableLimitedReader(r, uint64(maxReadBuffer)) | ||
toRead := make([]byte, maxReadBuffer) | ||
_, err := m.Read(toRead) | ||
assert.NoError(t, err) | ||
m.Reset() | ||
toRead = make([]byte, 300) | ||
_, err = m.Read(toRead) | ||
assert.NoError(t, err) | ||
}) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we replace
*harvester.Forwarder
by an interface or function type? Some more isolation from harvester.Forwarder (I hope to remove Forwarder and Outlet in the future).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Knowing that it make sense to change it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to #6439 ?:)
I was planning to do it in second part but I can do it right now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's keep the PR as small as possible and do the interface when we have a second implementation.