diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 6a28c321add7..c48096960c81 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -49,6 +49,8 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di
the system test. {pull}6121[6121]
- Add IIS module to parse access log and error log. {pull}6127[6127]
- Remove the undefined `username` option from the Redis input and clarify the documentation. {pull}6662[6662]
+- Refactor the usage of prospector to input in the YAML reference {pull}6121[6121]
+- Addition of the TCP input {pull}6266[6266]
*Heartbeat*
diff --git a/filebeat/_meta/common.reference.p2.yml b/filebeat/_meta/common.reference.p2.yml
index 4e922d48a92d..c3bc1c07d4d6 100644
--- a/filebeat/_meta/common.reference.p2.yml
+++ b/filebeat/_meta/common.reference.p2.yml
@@ -240,6 +240,20 @@ filebeat.inputs:
# Maximum size of the message received over UDP
#max_message_size: 10240
+#------------------------------ TCP prospector --------------------------------
+# Experimental: Config options for the TCP input
+#- type: tcp
+ #enabled: false
+
+ # The host and port to receive the new event
+ #host: "localhost:9000"
+
+ # Character used to split new message
+ #line_delimiter: "\n"
+
+ # Maximum size in bytes of the message received over TCP
+ #max_message_size: 20971520
+
#========================== Filebeat autodiscover ==============================
# Autodiscover allows you to detect changes in the system and spawn new modules
diff --git a/filebeat/docs/filebeat-options.asciidoc b/filebeat/docs/filebeat-options.asciidoc
index 6b1ace6c3945..ba2109e7e33b 100644
--- a/filebeat/docs/filebeat-options.asciidoc
+++ b/filebeat/docs/filebeat-options.asciidoc
@@ -14,7 +14,7 @@ and configuring modules.
To configure {beatname_uc} manually (instead of using
<<{beatname_lc}-modules-overview,modules>>), you specify a list of inputs in the
+{beatname_lc}.inputs+ section of the +{beatname_lc}.yml+. Inputs specify how
-{beatname_uc} locates and processes input data.
+{beatname_uc} locates and processes input data.
The list is a http://yaml.org/[YAML] array, so each input begins with
a dash (`-`). You can specify multiple inputs, and you can specify the same
@@ -47,6 +47,7 @@ You can configure {beatname_uc} to use the following inputs:
* <<{beatname_lc}-input-redis>>
* <<{beatname_lc}-input-udp>>
* <<{beatname_lc}-input-docker>>
+* <<{beatname_lc}-input-tcp>>
@@ -59,3 +60,5 @@ include::inputs/input-redis.asciidoc[]
include::inputs/input-udp.asciidoc[]
include::inputs/input-docker.asciidoc[]
+
+include::inputs/input-tcp.asciidoc[]
diff --git a/filebeat/docs/inputs/input-tcp.asciidoc b/filebeat/docs/inputs/input-tcp.asciidoc
new file mode 100644
index 000000000000..25e2b328f929
--- /dev/null
+++ b/filebeat/docs/inputs/input-tcp.asciidoc
@@ -0,0 +1,55 @@
+:type: tcp
+
+[id="{beatname_lc}-input-{type}"]
+=== TCP input
+
+++++
+TCP
+++++
+
+Use the `TCP` input to read events over TCP.
+
+Example configuration:
+
+["source","yaml",subs="attributes"]
+----
+{beatname_lc}.inputs:
+- type: tcp
+ max_message_size: 10240
+ host: "localhost:9000"
+----
+
+
+==== Configuration options
+
+The `tcp` input supports the following configuration options plus the
+<<{beatname_lc}-input-{type}-common-options>> described later.
+
+[float]
+[id="{beatname_lc}-input-{type}-max-message-size"]
+==== `max_message_size`
+
+The maximum size of the message received over TCP. The default is `20971520`.
+
+[float]
+[id="{beatname_lc}-input-{type}-host"]
+==== `host`
+
+The host and TCP port to listen on for event streams.
+
+[float]
+[id="{beatname_lc}-input-{type}-line-delimiter"]
+==== `line_delimiter`
+
+Specify the characters used to split the incoming events. The default is '\n'.
+
+[float]
+[id="{beatname_lc}-input-{type}-timeout"]
+==== `timeout`
+
+The number of seconds of inactivity before a remote connection is closed. The default is `300`.
+
+[id="{beatname_lc}-input-{type}-common-options"]
+include::../inputs/input-common-options.asciidoc[]
+
+:type!:
diff --git a/filebeat/filebeat.reference.yml b/filebeat/filebeat.reference.yml
index 664dcea51be9..5c0585adde9c 100644
--- a/filebeat/filebeat.reference.yml
+++ b/filebeat/filebeat.reference.yml
@@ -535,6 +535,20 @@ filebeat.inputs:
# Maximum size of the message received over UDP
#max_message_size: 10240
+#------------------------------ TCP prospector --------------------------------
+# Experimental: Config options for the TCP input
+#- type: tcp
+ #enabled: false
+
+ # The host and port to receive the new event
+ #host: "localhost:9000"
+
+ # Character used to split new message
+ #line_delimiter: "\n"
+
+ # Maximum size in bytes of the message received over TCP
+ #max_message_size: 20971520
+
#========================== Filebeat autodiscover ==============================
# Autodiscover allows you to detect changes in the system and spawn new modules
diff --git a/filebeat/include/list.go b/filebeat/include/list.go
index e1aae4e763ac..374be396eced 100644
--- a/filebeat/include/list.go
+++ b/filebeat/include/list.go
@@ -12,5 +12,6 @@ import (
_ "github.com/elastic/beats/filebeat/input/log"
_ "github.com/elastic/beats/filebeat/input/redis"
_ "github.com/elastic/beats/filebeat/input/stdin"
+ _ "github.com/elastic/beats/filebeat/input/tcp"
_ "github.com/elastic/beats/filebeat/input/udp"
)
diff --git a/filebeat/input/tcp/client.go b/filebeat/input/tcp/client.go
new file mode 100644
index 000000000000..4cc62c44c44d
--- /dev/null
+++ b/filebeat/input/tcp/client.go
@@ -0,0 +1,113 @@
+package tcp
+
+import (
+ "bufio"
+ "net"
+ "strings"
+ "time"
+
+ "github.com/pkg/errors"
+
+ "github.com/elastic/beats/filebeat/harvester"
+ "github.com/elastic/beats/filebeat/util"
+ "github.com/elastic/beats/libbeat/beat"
+ "github.com/elastic/beats/libbeat/common"
+ "github.com/elastic/beats/libbeat/logp"
+)
+
+// Client is a remote client.
+type Client struct {
+ conn net.Conn
+ forwarder *harvester.Forwarder
+ done chan struct{}
+ metadata common.MapStr
+ splitFunc bufio.SplitFunc
+ maxReadMessage uint64
+ timeout time.Duration
+}
+
+// NewClient returns a new client instance for the remote connection.
+func NewClient(
+ conn net.Conn,
+ forwarder *harvester.Forwarder,
+ splitFunc bufio.SplitFunc,
+ maxReadMessage uint64,
+ timeout time.Duration,
+) *Client {
+ client := &Client{
+ conn: conn,
+ forwarder: forwarder,
+ done: make(chan struct{}),
+ splitFunc: splitFunc,
+ maxReadMessage: maxReadMessage,
+ timeout: timeout,
+ metadata: common.MapStr{
+ "hostnames": remoteHosts(conn),
+ "ip_address": conn.RemoteAddr().String(),
+ },
+ }
+
+ return client
+}
+
+// Handle is reading message from the specified TCP socket.
+func (c *Client) Handle() error {
+ r := NewResetableLimitedReader(NewDeadlineReader(c.conn, c.timeout), c.maxReadMessage)
+ buf := bufio.NewReader(r)
+ scanner := bufio.NewScanner(buf)
+ scanner.Split(c.splitFunc)
+
+ for scanner.Scan() {
+ err := scanner.Err()
+ if err != nil {
+ // we are forcing a close on the socket, lets ignore any error that could happen.
+ select {
+ case <-c.done:
+ break
+ default:
+ }
+ // This is a user defined limit and we should notify the user.
+ if IsMaxReadBufferErr(err) {
+ logp.Err("tcp client error: %s", err)
+ }
+ return errors.Wrap(err, "tcp client error")
+ }
+ r.Reset()
+ c.forwarder.Send(c.createEvent(scanner.Text()))
+ }
+ return nil
+}
+
+// Close stops reading from the socket and close the connection.
+func (c *Client) Close() {
+ close(c.done)
+ c.conn.Close()
+}
+
+func (c *Client) createEvent(rawString string) *util.Data {
+ data := util.NewData()
+ data.Event = beat.Event{
+ Timestamp: time.Now(),
+ Meta: c.metadata,
+ Fields: common.MapStr{
+ "message": rawString,
+ },
+ }
+ return data
+}
+
+// GetRemoteHosts take the IP address of the client and try to resolve the name, if it fails we
+// fallback to the IP, IP can resolve to multiple hostname.
+func remoteHosts(conn net.Conn) []string {
+ ip := conn.RemoteAddr().String()
+ idx := strings.Index(ip, ":")
+ if idx == -1 {
+ return []string{ip}
+ }
+ ip = ip[0:idx]
+ hosts, err := net.LookupAddr(ip)
+ if err != nil {
+ hosts = []string{ip}
+ }
+ return hosts
+}
diff --git a/filebeat/input/tcp/config.go b/filebeat/input/tcp/config.go
new file mode 100644
index 000000000000..598ac9c8830d
--- /dev/null
+++ b/filebeat/input/tcp/config.go
@@ -0,0 +1,24 @@
+package tcp
+
+import (
+ "time"
+
+ "github.com/elastic/beats/filebeat/harvester"
+)
+
+type config struct {
+ harvester.ForwarderConfig `config:",inline"`
+ Host string `config:"host" validate:"required"`
+ LineDelimiter string `config:"line_delimiter" validate:"nonzero"`
+ Timeout time.Duration `config:"timeout" validate:"nonzero,positive"`
+ MaxMessageSize uint64 `config:"max_message_size" validate:"nonzero,positive"`
+}
+
+var defaultConfig = config{
+ ForwarderConfig: harvester.ForwarderConfig{
+ Type: "tcp",
+ },
+ LineDelimiter: "\n",
+ Timeout: time.Minute * 5,
+ MaxMessageSize: 20 * 1024 * 1024,
+}
diff --git a/filebeat/input/tcp/conn.go b/filebeat/input/tcp/conn.go
new file mode 100644
index 000000000000..a2a40bb02f0e
--- /dev/null
+++ b/filebeat/input/tcp/conn.go
@@ -0,0 +1,72 @@
+package tcp
+
+import (
+ "io"
+ "net"
+ "time"
+
+ "github.com/pkg/errors"
+)
+
+// ErrMaxReadBuffer returns when too many bytes was read on the io.Reader
+var ErrMaxReadBuffer = errors.New("max read buffer reached")
+
+// ResetableLimitedReader is based on LimitedReader but allow to reset the byte read and return a specific
+// error when we reach the limit.
+type ResetableLimitedReader struct {
+ reader io.Reader
+ maxReadBuffer uint64
+ byteRead uint64
+}
+
+// NewResetableLimitedReader returns a new ResetableLimitedReader
+func NewResetableLimitedReader(reader io.Reader, maxReadBuffer uint64) *ResetableLimitedReader {
+ return &ResetableLimitedReader{
+ reader: reader,
+ maxReadBuffer: maxReadBuffer,
+ }
+}
+
+// Read reads the specified amount of byte
+func (m *ResetableLimitedReader) Read(p []byte) (n int, err error) {
+ if m.byteRead >= m.maxReadBuffer {
+ return 0, ErrMaxReadBuffer
+ }
+ n, err = m.reader.Read(p)
+ m.byteRead += uint64(n)
+ return
+}
+
+// Reset resets the number of byte read
+func (m *ResetableLimitedReader) Reset() {
+ m.byteRead = 0
+}
+
+// IsMaxReadBufferErr returns true when the error is ErrMaxReadBuffer
+func IsMaxReadBufferErr(err error) bool {
+ return err == ErrMaxReadBuffer
+}
+
+// DeadlineReader allow read to a io.Reader to timeout, the timeout is refreshed on every read.
+type DeadlineReader struct {
+ conn net.Conn
+ timeout time.Duration
+}
+
+// NewDeadlineReader returns a new DeadlineReader
+func NewDeadlineReader(c net.Conn, timeout time.Duration) *DeadlineReader {
+ return &DeadlineReader{
+ conn: c,
+ timeout: timeout,
+ }
+}
+
+// Read reads the number of bytes from the reader
+func (d *DeadlineReader) Read(p []byte) (n int, err error) {
+ d.refresh()
+ return d.conn.Read(p)
+}
+
+func (d *DeadlineReader) refresh() {
+ d.conn.SetDeadline(time.Now().Add(d.timeout))
+}
diff --git a/filebeat/input/tcp/conn_test.go b/filebeat/input/tcp/conn_test.go
new file mode 100644
index 000000000000..aaf2bea6a16a
--- /dev/null
+++ b/filebeat/input/tcp/conn_test.go
@@ -0,0 +1,43 @@
+package tcp
+
+import (
+ "strings"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestResetableLimitedReader(t *testing.T) {
+ maxReadBuffer := 400
+
+ t.Run("WhenMaxReadIsReachedInMultipleRead", func(t *testing.T) {
+ r := strings.NewReader(randomString(maxReadBuffer * 2))
+ m := NewResetableLimitedReader(r, uint64(maxReadBuffer))
+ toRead := make([]byte, maxReadBuffer)
+ _, err := m.Read(toRead)
+ assert.NoError(t, err)
+ toRead = make([]byte, 300)
+ _, err = m.Read(toRead)
+ assert.Equal(t, ErrMaxReadBuffer, err)
+ })
+
+ t.Run("WhenMaxReadIsNotReached", func(t *testing.T) {
+ r := strings.NewReader(randomString(maxReadBuffer * 2))
+ m := NewResetableLimitedReader(r, uint64(maxReadBuffer))
+ toRead := make([]byte, maxReadBuffer)
+ _, err := m.Read(toRead)
+ assert.NoError(t, err)
+ })
+
+ t.Run("WhenResetIsCalled", func(t *testing.T) {
+ r := strings.NewReader(randomString(maxReadBuffer * 2))
+ m := NewResetableLimitedReader(r, uint64(maxReadBuffer))
+ toRead := make([]byte, maxReadBuffer)
+ _, err := m.Read(toRead)
+ assert.NoError(t, err)
+ m.Reset()
+ toRead = make([]byte, 300)
+ _, err = m.Read(toRead)
+ assert.NoError(t, err)
+ })
+}
diff --git a/filebeat/input/tcp/harvester.go b/filebeat/input/tcp/harvester.go
new file mode 100644
index 000000000000..d189ef3303bc
--- /dev/null
+++ b/filebeat/input/tcp/harvester.go
@@ -0,0 +1,154 @@
+package tcp
+
+import (
+ "bufio"
+ "bytes"
+ "net"
+ "sync"
+
+ "github.com/elastic/beats/libbeat/logp"
+
+ "github.com/elastic/beats/filebeat/harvester"
+)
+
+// Harvester represent a TCP server
+type Harvester struct {
+ sync.RWMutex
+ forwarder *harvester.Forwarder
+ config *config
+ server net.Listener
+ clients map[*Client]struct{}
+ wg sync.WaitGroup
+ done chan struct{}
+ splitFunc bufio.SplitFunc
+}
+
+// NewHarvester creates a new harvester that will forward events
+func NewHarvester(
+ forwarder *harvester.Forwarder,
+ config *config,
+) (*Harvester, error) {
+
+ server, err := net.Listen("tcp", config.Host)
+ if err != nil {
+ return nil, err
+ }
+
+ sf := splitFunc([]byte(config.LineDelimiter))
+ return &Harvester{
+ config: config,
+ forwarder: forwarder,
+ clients: make(map[*Client]struct{}, 0),
+ done: make(chan struct{}),
+ server: server,
+ splitFunc: sf,
+ }, nil
+}
+
+// Run start and run a new TCP listener to receive new data
+func (h *Harvester) Run() error {
+ logp.Info("Started listening for incoming TCP connection on: %s", h.config.Host)
+ for {
+ conn, err := h.server.Accept()
+ if err != nil {
+ select {
+ case <-h.done:
+ return nil
+ default:
+ logp.Debug("tcp", "Can not accept the connection: %s", err)
+ continue
+ }
+ }
+
+ client := NewClient(
+ conn,
+ h.forwarder,
+ h.splitFunc,
+ h.config.MaxMessageSize,
+ h.config.Timeout,
+ )
+ logp.Debug(
+ "tcp",
+ "New client, remote: %s (total clients: %d)",
+ conn.RemoteAddr(),
+ h.clientsCount(),
+ )
+ h.wg.Add(1)
+ go func() {
+ defer logp.Recover("recovering from a tcp client crash")
+
+ defer h.wg.Done()
+ defer conn.Close()
+
+ h.registerClient(client)
+ defer h.unregisterClient(client)
+
+ err := client.Handle()
+ if err != nil {
+ logp.Debug("tcp", "Client error: %s", err)
+ }
+
+ logp.Debug(
+ "tcp",
+ "Client disconnected, remote: %s (total clients: %d)",
+ conn.RemoteAddr(),
+ h.clientsCount(),
+ )
+ }()
+ }
+}
+
+// Stop stops accepting new incoming TCP connection and close any active clients
+func (h *Harvester) Stop() {
+ logp.Info("Stopping TCP harvester")
+ close(h.done)
+ h.server.Close()
+
+ logp.Debug("tcp", "Closing remote connections")
+ for _, client := range h.allClients() {
+ client.Close()
+ }
+ h.wg.Wait()
+ logp.Debug("tcp", "Remote connections closed")
+}
+
+func (h *Harvester) registerClient(client *Client) {
+ h.Lock()
+ defer h.Unlock()
+ h.clients[client] = struct{}{}
+}
+
+func (h *Harvester) unregisterClient(client *Client) {
+ h.Lock()
+ defer h.Unlock()
+ delete(h.clients, client)
+}
+
+func (h *Harvester) allClients() []*Client {
+ h.RLock()
+ defer h.RUnlock()
+ currentClients := make([]*Client, len(h.clients))
+ idx := 0
+ for client := range h.clients {
+ currentClients[idx] = client
+ idx++
+ }
+ return currentClients
+}
+
+func (h *Harvester) clientsCount() int {
+ h.RLock()
+ defer h.RUnlock()
+ return len(h.clients)
+}
+
+func splitFunc(lineDelimiter []byte) bufio.SplitFunc {
+ ld := []byte(lineDelimiter)
+ if bytes.Equal(ld, []byte("\n")) {
+ // This will work for most usecases and will also strip \r if present.
+ // CustomDelimiter, need to match completely and the delimiter will be completely removed from the
+ // returned byte slice
+ return bufio.ScanLines
+ }
+ return scanDelimiter(ld)
+}
diff --git a/filebeat/input/tcp/harvester_test.go b/filebeat/input/tcp/harvester_test.go
new file mode 100644
index 000000000000..d376b95ca453
--- /dev/null
+++ b/filebeat/input/tcp/harvester_test.go
@@ -0,0 +1,290 @@
+package tcp
+
+import (
+ "fmt"
+ "math/rand"
+ "net"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+
+ "github.com/elastic/beats/filebeat/harvester"
+ "github.com/elastic/beats/filebeat/util"
+ "github.com/elastic/beats/libbeat/common"
+)
+
+func TestErrorOnEmptyLineDelimiter(t *testing.T) {
+ cfg := map[string]interface{}{
+ "line_delimiter": "",
+ }
+
+ c, _ := common.NewConfigFrom(cfg)
+ config := defaultConfig
+ err := c.Unpack(&config)
+ assert.Error(t, err)
+}
+
+func TestOverrideHostAndPort(t *testing.T) {
+ host := "127.0.0.1:10000"
+ cfg := map[string]interface{}{
+ "host": host,
+ }
+ c, _ := common.NewConfigFrom(cfg)
+ config := defaultConfig
+ err := c.Unpack(&config)
+ if !assert.NoError(t, err) {
+ return
+ }
+ forwarder := harvester.NewForwarder(nil)
+ harvester, err := NewHarvester(forwarder, &config)
+ if !assert.NoError(t, err) {
+ return
+ }
+ defer harvester.Stop()
+ go func() {
+ err := harvester.Run()
+ assert.NoError(t, err)
+ }()
+ conn, err := net.Dial("tcp", host)
+ defer conn.Close()
+ assert.NoError(t, err)
+}
+
+func TestReceiveEventsAndMetadata(t *testing.T) {
+ expectedMessages := generateMessages(5, 100)
+ largeMessages := generateMessages(10, 4096)
+
+ tests := []struct {
+ name string
+ cfg map[string]interface{}
+ expectedMessages []string
+ messageSent string
+ }{
+ {
+ name: "NewLine",
+ cfg: map[string]interface{}{},
+ expectedMessages: expectedMessages,
+ messageSent: strings.Join(expectedMessages, "\n"),
+ },
+ {
+ name: "NewLineWithCR",
+ cfg: map[string]interface{}{},
+ expectedMessages: expectedMessages,
+ messageSent: strings.Join(expectedMessages, "\r\n"),
+ },
+ {
+ name: "CustomDelimiter",
+ cfg: map[string]interface{}{
+ "line_delimiter": ";",
+ },
+ expectedMessages: expectedMessages,
+ messageSent: strings.Join(expectedMessages, ";"),
+ },
+ {
+ name: "MultipleCharsCustomDelimiter",
+ cfg: map[string]interface{}{
+ "line_delimiter": "",
+ },
+ expectedMessages: expectedMessages,
+ messageSent: strings.Join(expectedMessages, ""),
+ },
+ {
+ name: "SingleCharCustomDelimiterMessageWithoutBoudaries",
+ cfg: map[string]interface{}{
+ "line_delimiter": ";",
+ },
+ expectedMessages: []string{"hello"},
+ messageSent: "hello",
+ },
+ {
+ name: "MultipleCharCustomDelimiterMessageWithoutBoundaries",
+ cfg: map[string]interface{}{
+ "line_delimiter": "",
+ },
+ expectedMessages: []string{"hello"},
+ messageSent: "hello",
+ },
+ {
+ name: "NewLineMessageWithoutBoundaries",
+ cfg: map[string]interface{}{
+ "line_delimiter": "\n",
+ },
+ expectedMessages: []string{"hello"},
+ messageSent: "hello",
+ },
+ {
+ name: "NewLineLargeMessagePayload",
+ cfg: map[string]interface{}{
+ "line_delimiter": "\n",
+ },
+ expectedMessages: largeMessages,
+ messageSent: strings.Join(largeMessages, "\n"),
+ },
+ {
+ name: "CustomLargeMessagePayload",
+ cfg: map[string]interface{}{
+ "line_delimiter": ";",
+ },
+ expectedMessages: largeMessages,
+ messageSent: strings.Join(largeMessages, ";"),
+ },
+ {
+ name: "MaxReadBufferReached",
+ cfg: map[string]interface{}{},
+ expectedMessages: []string{},
+ messageSent: randomString(900000),
+ },
+ {
+ name: "MaxReadBufferReachedUserConfigured",
+ cfg: map[string]interface{}{
+ "max_read_message": 50000,
+ },
+ expectedMessages: []string{},
+ messageSent: randomString(600000),
+ },
+ }
+
+ port := 9000
+ for _, test := range tests {
+ t.Run(test.name, func(t *testing.T) {
+ ch := make(chan *util.Data, len(test.expectedMessages))
+ defer close(ch)
+ to := newTestingOutlet(ch)
+ forwarder := harvester.NewForwarder(to)
+ test.cfg["host"] = fmt.Sprintf("localhost:%d", port)
+ cfg, _ := common.NewConfigFrom(test.cfg)
+ config := defaultConfig
+ err := cfg.Unpack(&config)
+ if !assert.NoError(t, err) {
+ return
+ }
+ harvester, err := NewHarvester(forwarder, &config)
+ if !assert.NoError(t, err) {
+ return
+ }
+ defer func() {
+ harvester.Stop()
+ }()
+ go func() {
+ err := harvester.Run()
+ assert.NoError(t, err)
+ }()
+
+ conn, err := net.Dial("tcp", fmt.Sprintf("localhost:%d", port))
+ assert.NoError(t, err)
+ fmt.Fprint(conn, test.messageSent)
+ conn.Close()
+
+ var events []*util.Data
+
+ for len(events) < len(test.expectedMessages) {
+ select {
+ case event := <-ch:
+ events = append(events, event)
+ case <-time.After(time.Second * 10):
+ t.Fatal("could not drain all the elements")
+ return
+ }
+ }
+
+ for idx, e := range events {
+ event := e.GetEvent()
+ message, err := event.GetValue("message")
+ assert.NoError(t, err)
+ assert.Equal(t, test.expectedMessages[idx], message)
+ meta := e.GetMetadata()
+ _, ok := meta["hostnames"]
+ assert.True(t, ok)
+ _, ok = meta["ip_address"]
+ assert.True(t, ok)
+ }
+ })
+ port++
+ }
+}
+
+func TestReceiveNewEventsConcurrently(t *testing.T) {
+ workers := 4
+ eventsCount := 100
+ ch := make(chan *util.Data, eventsCount*workers)
+ defer close(ch)
+ to := newTestingOutlet(ch)
+ forwarder := harvester.NewForwarder(to)
+ cfg, err := common.NewConfigFrom(map[string]interface{}{"host": "localhost:9000"})
+ if !assert.NoError(t, err) {
+ return
+ }
+ config := defaultConfig
+ err = cfg.Unpack(&config)
+ if !assert.NoError(t, err) {
+ return
+ }
+ harvester, err := NewHarvester(forwarder, &config)
+ if !assert.NoError(t, err) {
+ return
+ }
+ defer harvester.Stop()
+ if !assert.NoError(t, err) {
+ return
+ }
+ go func() {
+ err := harvester.Run()
+ assert.NoError(t, err)
+ }()
+
+ samples := generateMessages(eventsCount, 1024)
+ for w := 0; w < workers; w++ {
+ go func() {
+ conn, err := net.Dial("tcp", "localhost:9000")
+ defer conn.Close()
+ assert.NoError(t, err)
+ for _, sample := range samples {
+ fmt.Fprintln(conn, sample)
+ }
+ }()
+ }
+
+ var events []*util.Data
+ for len(events) < eventsCount*workers {
+ select {
+ case event := <-ch:
+ events = append(events, event)
+ case <-time.After(time.Second * 10):
+ t.Fatal("timeout when waiting on channel")
+ return
+ }
+ }
+}
+
+type testingOutlet struct {
+ ch chan *util.Data
+}
+
+func (o *testingOutlet) OnEvent(data *util.Data) bool {
+ o.ch <- data
+ return true
+}
+
+func newTestingOutlet(ch chan *util.Data) *testingOutlet {
+ return &testingOutlet{ch: ch}
+}
+
+// This could be extracted into testing utils and we could add some unicode chars to the charsets.
+func randomString(l int) string {
+ charsets := []byte("abcdefghijklmnopqrstuvwzyzABCDEFGHIJKLMNOPQRSTUVWZYZ0123456789")
+ message := make([]byte, l)
+ for i := range message {
+ message[i] = charsets[rand.Intn(len(charsets))]
+ }
+ return string(message)
+}
+
+func generateMessages(c int, l int) []string {
+ messages := make([]string, c)
+ for i := range messages {
+ messages[i] = randomString(l)
+ }
+ return messages
+}
diff --git a/filebeat/input/tcp/input.go b/filebeat/input/tcp/input.go
new file mode 100644
index 000000000000..478a1a3fd3de
--- /dev/null
+++ b/filebeat/input/tcp/input.go
@@ -0,0 +1,86 @@
+package tcp
+
+import (
+ "github.com/elastic/beats/filebeat/channel"
+ "github.com/elastic/beats/filebeat/harvester"
+ "github.com/elastic/beats/filebeat/input"
+ "github.com/elastic/beats/libbeat/common"
+ "github.com/elastic/beats/libbeat/common/cfgwarn"
+ "github.com/elastic/beats/libbeat/logp"
+)
+
+func init() {
+ err := input.Register("tcp", NewInput)
+ if err != nil {
+ panic(err)
+ }
+}
+
+// Input for TCP connection
+type Input struct {
+ harvester *Harvester
+ started bool
+ outlet channel.Outleter
+ config *config
+}
+
+// NewInput creates a new TCP input
+func NewInput(
+ cfg *common.Config,
+ outlet channel.Factory,
+ context input.Context,
+) (input.Input, error) {
+ cfgwarn.Experimental("TCP input type is used")
+
+ out, err := outlet(cfg, context.DynamicFields)
+ if err != nil {
+ return nil, err
+ }
+
+ forwarder := harvester.NewForwarder(out)
+
+ config := defaultConfig
+ err = cfg.Unpack(&config)
+ if err != nil {
+ return nil, err
+ }
+
+ harvester, err := NewHarvester(forwarder, &config)
+ if err != nil {
+ return nil, err
+ }
+
+ return &Input{
+ harvester: harvester,
+ started: false,
+ outlet: out,
+ config: &config,
+ }, nil
+}
+
+// Run start a TCP input
+func (p *Input) Run() {
+ if !p.started {
+ logp.Info("Starting TCP input on: %s", p.config.Host)
+ p.started = true
+
+ go func() {
+ defer p.outlet.Close()
+ err := p.harvester.Run()
+ if err != nil {
+ logp.Err("Error running TCP harvester, error: %s", err)
+ }
+ }()
+ }
+}
+
+// Stop stops TCP server
+func (p *Input) Stop() {
+ logp.Info("Stopping TCP input on: %s", p.config.Host)
+ p.harvester.Stop()
+}
+
+// Wait stop the current harvester
+func (p *Input) Wait() {
+ p.Stop()
+}
diff --git a/filebeat/input/tcp/scan.go b/filebeat/input/tcp/scan.go
new file mode 100644
index 000000000000..a1e8c42f7617
--- /dev/null
+++ b/filebeat/input/tcp/scan.go
@@ -0,0 +1,31 @@
+package tcp
+
+import (
+ "bufio"
+ "bytes"
+)
+
+// ScanDelimiter return a function to split line using a custom delimiter, the delimiter
+// is stripped from the returned value.
+func scanDelimiter(delimiter []byte) bufio.SplitFunc {
+ return func(data []byte, eof bool) (int, []byte, error) {
+ if eof && len(data) == 0 {
+ return 0, nil, nil
+ }
+ if i := bytes.Index(data, delimiter); i >= 0 {
+ return i + len(delimiter), dropDelimiter(data[0:i], delimiter), nil
+ }
+ if eof {
+ return len(data), dropDelimiter(data, delimiter), nil
+ }
+ return 0, nil, nil
+ }
+}
+
+func dropDelimiter(data []byte, delimiter []byte) []byte {
+ if len(data) > len(delimiter) &&
+ bytes.Equal(data[len(data)-len(delimiter):len(data)], delimiter) {
+ return data[0 : len(data)-len(delimiter)]
+ }
+ return data
+}
diff --git a/filebeat/input/tcp/scan_test.go b/filebeat/input/tcp/scan_test.go
new file mode 100644
index 000000000000..937ffb43b1e2
--- /dev/null
+++ b/filebeat/input/tcp/scan_test.go
@@ -0,0 +1,91 @@
+package tcp
+
+import (
+ "bufio"
+ "strings"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestCustomDelimiter(t *testing.T) {
+ tests := []struct {
+ name string
+ text string
+ expected []string
+ delimiter []byte
+ }{
+ {
+ name: "Multiple chars delimiter",
+ text: "hellobonjourholahey",
+ expected: []string{
+ "hello",
+ "bonjour",
+ "hola",
+ "hey",
+ },
+ delimiter: []byte(""),
+ },
+ {
+ name: "Multiple chars delimiter with half starting delimiter",
+ text: "hellobonjourhey",
+ expected: []string{
+ "hello",
+ "bonjour"),
+ },
+ {
+ name: "Multiple chars delimiter with half ending delimiter",
+ text: "helloEND>holahey",
+ expected: []string{
+ "hello",
+ "END>hola",
+ "hey",
+ },
+ delimiter: []byte(""),
+ },
+ {
+ name: "Delimiter end of string",
+ text: "hellobonjourholahey",
+ expected: []string{
+ "hello",
+ "bonjour",
+ "hola",
+ "hey",
+ },
+ delimiter: []byte(""),
+ },
+ {
+ name: "Single char delimiter",
+ text: "hello;bonjour;hola;hey",
+ expected: []string{
+ "hello",
+ "bonjour",
+ "hola",
+ "hey",
+ },
+ delimiter: []byte(";"),
+ },
+ {
+ name: "Empty string",
+ text: "",
+ expected: []string(nil),
+ delimiter: []byte(";"),
+ },
+ }
+
+ for _, test := range tests {
+ t.Run(test.name, func(t *testing.T) {
+ buf := strings.NewReader(test.text)
+ scanner := bufio.NewScanner(buf)
+ scanner.Split(scanDelimiter(test.delimiter))
+ var elements []string
+ for scanner.Scan() {
+ elements = append(elements, scanner.Text())
+ }
+ assert.EqualValues(t, test.expected, elements)
+ })
+ }
+}
diff --git a/filebeat/tests/system/test_tcp.py b/filebeat/tests/system/test_tcp.py
new file mode 100644
index 000000000000..dffecfe4f698
--- /dev/null
+++ b/filebeat/tests/system/test_tcp.py
@@ -0,0 +1,68 @@
+from filebeat import BaseTest
+import socket
+
+
+class Test(BaseTest):
+ """
+ Test filebeat TCP input
+ """
+
+ def test_tcp_with_newline_delimiter(self):
+ """
+ Test TCP input with a new line delimiter
+ """
+ self.send_events_with_delimiter("\n")
+
+ def test_tcp_with_custom_char_delimiter(self):
+ """
+ Test TCP input with a custom single char delimiter
+ """
+ self.send_events_with_delimiter(";")
+
+ def test_tcp_with_custom_word_delimiter(self):
+ """
+ Test TCP input with a custom single char delimiter
+ """
+ self.send_events_with_delimiter("")
+
+ def send_events_with_delimiter(self, delimiter):
+ host = "127.0.0.1"
+ port = 8080
+ input_raw = """
+- type: tcp
+ host: "{}:{}"
+ enabled: true
+"""
+
+ # Use default of \n and stripping \r
+ if delimiter is not "":
+ input_raw += "\n line_delimiter: {}".format(delimiter)
+
+ input_raw = input_raw.format(host, port)
+
+ self.render_config_template(
+ input_raw=input_raw,
+ inputs=False,
+ )
+
+ filebeat = self.start_beat()
+
+ self.wait_until(lambda: self.log_contains("Started listening for incoming TCP connection"))
+
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # TCP
+ sock.connect((host, port))
+
+ for n in range(0, 2):
+ sock.send("Hello World: " + str(n) + delimiter)
+
+ self.wait_until(lambda: self.output_count(lambda x: x >= 2))
+
+ filebeat.check_kill_and_wait()
+
+ output = self.read_output()
+
+ assert len(output) == 2
+ assert output[0]["prospector.type"] == "tcp"
+ assert output[0]["input.type"] == "tcp"
+
+ sock.close()