diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 6a28c321add7..c48096960c81 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -49,6 +49,8 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di the system test. {pull}6121[6121] - Add IIS module to parse access log and error log. {pull}6127[6127] - Remove the undefined `username` option from the Redis input and clarify the documentation. {pull}6662[6662] +- Refactor the usage of prospector to input in the YAML reference {pull}6121[6121] +- Addition of the TCP input {pull}6266[6266] *Heartbeat* diff --git a/filebeat/_meta/common.reference.p2.yml b/filebeat/_meta/common.reference.p2.yml index 4e922d48a92d..c3bc1c07d4d6 100644 --- a/filebeat/_meta/common.reference.p2.yml +++ b/filebeat/_meta/common.reference.p2.yml @@ -240,6 +240,20 @@ filebeat.inputs: # Maximum size of the message received over UDP #max_message_size: 10240 +#------------------------------ TCP prospector -------------------------------- +# Experimental: Config options for the TCP input +#- type: tcp + #enabled: false + + # The host and port to receive the new event + #host: "localhost:9000" + + # Character used to split new message + #line_delimiter: "\n" + + # Maximum size in bytes of the message received over TCP + #max_message_size: 20971520 + #========================== Filebeat autodiscover ============================== # Autodiscover allows you to detect changes in the system and spawn new modules diff --git a/filebeat/docs/filebeat-options.asciidoc b/filebeat/docs/filebeat-options.asciidoc index 6b1ace6c3945..ba2109e7e33b 100644 --- a/filebeat/docs/filebeat-options.asciidoc +++ b/filebeat/docs/filebeat-options.asciidoc @@ -14,7 +14,7 @@ and configuring modules. To configure {beatname_uc} manually (instead of using <<{beatname_lc}-modules-overview,modules>>), you specify a list of inputs in the +{beatname_lc}.inputs+ section of the +{beatname_lc}.yml+. Inputs specify how -{beatname_uc} locates and processes input data. +{beatname_uc} locates and processes input data. The list is a http://yaml.org/[YAML] array, so each input begins with a dash (`-`). You can specify multiple inputs, and you can specify the same @@ -47,6 +47,7 @@ You can configure {beatname_uc} to use the following inputs: * <<{beatname_lc}-input-redis>> * <<{beatname_lc}-input-udp>> * <<{beatname_lc}-input-docker>> +* <<{beatname_lc}-input-tcp>> @@ -59,3 +60,5 @@ include::inputs/input-redis.asciidoc[] include::inputs/input-udp.asciidoc[] include::inputs/input-docker.asciidoc[] + +include::inputs/input-tcp.asciidoc[] diff --git a/filebeat/docs/inputs/input-tcp.asciidoc b/filebeat/docs/inputs/input-tcp.asciidoc new file mode 100644 index 000000000000..25e2b328f929 --- /dev/null +++ b/filebeat/docs/inputs/input-tcp.asciidoc @@ -0,0 +1,55 @@ +:type: tcp + +[id="{beatname_lc}-input-{type}"] +=== TCP input + +++++ +TCP +++++ + +Use the `TCP` input to read events over TCP. + +Example configuration: + +["source","yaml",subs="attributes"] +---- +{beatname_lc}.inputs: +- type: tcp + max_message_size: 10240 + host: "localhost:9000" +---- + + +==== Configuration options + +The `tcp` input supports the following configuration options plus the +<<{beatname_lc}-input-{type}-common-options>> described later. + +[float] +[id="{beatname_lc}-input-{type}-max-message-size"] +==== `max_message_size` + +The maximum size of the message received over TCP. The default is `20971520`. + +[float] +[id="{beatname_lc}-input-{type}-host"] +==== `host` + +The host and TCP port to listen on for event streams. + +[float] +[id="{beatname_lc}-input-{type}-line-delimiter"] +==== `line_delimiter` + +Specify the characters used to split the incoming events. The default is '\n'. + +[float] +[id="{beatname_lc}-input-{type}-timeout"] +==== `timeout` + +The number of seconds of inactivity before a remote connection is closed. The default is `300`. + +[id="{beatname_lc}-input-{type}-common-options"] +include::../inputs/input-common-options.asciidoc[] + +:type!: diff --git a/filebeat/filebeat.reference.yml b/filebeat/filebeat.reference.yml index 664dcea51be9..5c0585adde9c 100644 --- a/filebeat/filebeat.reference.yml +++ b/filebeat/filebeat.reference.yml @@ -535,6 +535,20 @@ filebeat.inputs: # Maximum size of the message received over UDP #max_message_size: 10240 +#------------------------------ TCP prospector -------------------------------- +# Experimental: Config options for the TCP input +#- type: tcp + #enabled: false + + # The host and port to receive the new event + #host: "localhost:9000" + + # Character used to split new message + #line_delimiter: "\n" + + # Maximum size in bytes of the message received over TCP + #max_message_size: 20971520 + #========================== Filebeat autodiscover ============================== # Autodiscover allows you to detect changes in the system and spawn new modules diff --git a/filebeat/include/list.go b/filebeat/include/list.go index e1aae4e763ac..374be396eced 100644 --- a/filebeat/include/list.go +++ b/filebeat/include/list.go @@ -12,5 +12,6 @@ import ( _ "github.com/elastic/beats/filebeat/input/log" _ "github.com/elastic/beats/filebeat/input/redis" _ "github.com/elastic/beats/filebeat/input/stdin" + _ "github.com/elastic/beats/filebeat/input/tcp" _ "github.com/elastic/beats/filebeat/input/udp" ) diff --git a/filebeat/input/tcp/client.go b/filebeat/input/tcp/client.go new file mode 100644 index 000000000000..4cc62c44c44d --- /dev/null +++ b/filebeat/input/tcp/client.go @@ -0,0 +1,113 @@ +package tcp + +import ( + "bufio" + "net" + "strings" + "time" + + "github.com/pkg/errors" + + "github.com/elastic/beats/filebeat/harvester" + "github.com/elastic/beats/filebeat/util" + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" +) + +// Client is a remote client. +type Client struct { + conn net.Conn + forwarder *harvester.Forwarder + done chan struct{} + metadata common.MapStr + splitFunc bufio.SplitFunc + maxReadMessage uint64 + timeout time.Duration +} + +// NewClient returns a new client instance for the remote connection. +func NewClient( + conn net.Conn, + forwarder *harvester.Forwarder, + splitFunc bufio.SplitFunc, + maxReadMessage uint64, + timeout time.Duration, +) *Client { + client := &Client{ + conn: conn, + forwarder: forwarder, + done: make(chan struct{}), + splitFunc: splitFunc, + maxReadMessage: maxReadMessage, + timeout: timeout, + metadata: common.MapStr{ + "hostnames": remoteHosts(conn), + "ip_address": conn.RemoteAddr().String(), + }, + } + + return client +} + +// Handle is reading message from the specified TCP socket. +func (c *Client) Handle() error { + r := NewResetableLimitedReader(NewDeadlineReader(c.conn, c.timeout), c.maxReadMessage) + buf := bufio.NewReader(r) + scanner := bufio.NewScanner(buf) + scanner.Split(c.splitFunc) + + for scanner.Scan() { + err := scanner.Err() + if err != nil { + // we are forcing a close on the socket, lets ignore any error that could happen. + select { + case <-c.done: + break + default: + } + // This is a user defined limit and we should notify the user. + if IsMaxReadBufferErr(err) { + logp.Err("tcp client error: %s", err) + } + return errors.Wrap(err, "tcp client error") + } + r.Reset() + c.forwarder.Send(c.createEvent(scanner.Text())) + } + return nil +} + +// Close stops reading from the socket and close the connection. +func (c *Client) Close() { + close(c.done) + c.conn.Close() +} + +func (c *Client) createEvent(rawString string) *util.Data { + data := util.NewData() + data.Event = beat.Event{ + Timestamp: time.Now(), + Meta: c.metadata, + Fields: common.MapStr{ + "message": rawString, + }, + } + return data +} + +// GetRemoteHosts take the IP address of the client and try to resolve the name, if it fails we +// fallback to the IP, IP can resolve to multiple hostname. +func remoteHosts(conn net.Conn) []string { + ip := conn.RemoteAddr().String() + idx := strings.Index(ip, ":") + if idx == -1 { + return []string{ip} + } + ip = ip[0:idx] + hosts, err := net.LookupAddr(ip) + if err != nil { + hosts = []string{ip} + } + return hosts +} diff --git a/filebeat/input/tcp/config.go b/filebeat/input/tcp/config.go new file mode 100644 index 000000000000..598ac9c8830d --- /dev/null +++ b/filebeat/input/tcp/config.go @@ -0,0 +1,24 @@ +package tcp + +import ( + "time" + + "github.com/elastic/beats/filebeat/harvester" +) + +type config struct { + harvester.ForwarderConfig `config:",inline"` + Host string `config:"host" validate:"required"` + LineDelimiter string `config:"line_delimiter" validate:"nonzero"` + Timeout time.Duration `config:"timeout" validate:"nonzero,positive"` + MaxMessageSize uint64 `config:"max_message_size" validate:"nonzero,positive"` +} + +var defaultConfig = config{ + ForwarderConfig: harvester.ForwarderConfig{ + Type: "tcp", + }, + LineDelimiter: "\n", + Timeout: time.Minute * 5, + MaxMessageSize: 20 * 1024 * 1024, +} diff --git a/filebeat/input/tcp/conn.go b/filebeat/input/tcp/conn.go new file mode 100644 index 000000000000..a2a40bb02f0e --- /dev/null +++ b/filebeat/input/tcp/conn.go @@ -0,0 +1,72 @@ +package tcp + +import ( + "io" + "net" + "time" + + "github.com/pkg/errors" +) + +// ErrMaxReadBuffer returns when too many bytes was read on the io.Reader +var ErrMaxReadBuffer = errors.New("max read buffer reached") + +// ResetableLimitedReader is based on LimitedReader but allow to reset the byte read and return a specific +// error when we reach the limit. +type ResetableLimitedReader struct { + reader io.Reader + maxReadBuffer uint64 + byteRead uint64 +} + +// NewResetableLimitedReader returns a new ResetableLimitedReader +func NewResetableLimitedReader(reader io.Reader, maxReadBuffer uint64) *ResetableLimitedReader { + return &ResetableLimitedReader{ + reader: reader, + maxReadBuffer: maxReadBuffer, + } +} + +// Read reads the specified amount of byte +func (m *ResetableLimitedReader) Read(p []byte) (n int, err error) { + if m.byteRead >= m.maxReadBuffer { + return 0, ErrMaxReadBuffer + } + n, err = m.reader.Read(p) + m.byteRead += uint64(n) + return +} + +// Reset resets the number of byte read +func (m *ResetableLimitedReader) Reset() { + m.byteRead = 0 +} + +// IsMaxReadBufferErr returns true when the error is ErrMaxReadBuffer +func IsMaxReadBufferErr(err error) bool { + return err == ErrMaxReadBuffer +} + +// DeadlineReader allow read to a io.Reader to timeout, the timeout is refreshed on every read. +type DeadlineReader struct { + conn net.Conn + timeout time.Duration +} + +// NewDeadlineReader returns a new DeadlineReader +func NewDeadlineReader(c net.Conn, timeout time.Duration) *DeadlineReader { + return &DeadlineReader{ + conn: c, + timeout: timeout, + } +} + +// Read reads the number of bytes from the reader +func (d *DeadlineReader) Read(p []byte) (n int, err error) { + d.refresh() + return d.conn.Read(p) +} + +func (d *DeadlineReader) refresh() { + d.conn.SetDeadline(time.Now().Add(d.timeout)) +} diff --git a/filebeat/input/tcp/conn_test.go b/filebeat/input/tcp/conn_test.go new file mode 100644 index 000000000000..aaf2bea6a16a --- /dev/null +++ b/filebeat/input/tcp/conn_test.go @@ -0,0 +1,43 @@ +package tcp + +import ( + "strings" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestResetableLimitedReader(t *testing.T) { + maxReadBuffer := 400 + + t.Run("WhenMaxReadIsReachedInMultipleRead", func(t *testing.T) { + r := strings.NewReader(randomString(maxReadBuffer * 2)) + m := NewResetableLimitedReader(r, uint64(maxReadBuffer)) + toRead := make([]byte, maxReadBuffer) + _, err := m.Read(toRead) + assert.NoError(t, err) + toRead = make([]byte, 300) + _, err = m.Read(toRead) + assert.Equal(t, ErrMaxReadBuffer, err) + }) + + t.Run("WhenMaxReadIsNotReached", func(t *testing.T) { + r := strings.NewReader(randomString(maxReadBuffer * 2)) + m := NewResetableLimitedReader(r, uint64(maxReadBuffer)) + toRead := make([]byte, maxReadBuffer) + _, err := m.Read(toRead) + assert.NoError(t, err) + }) + + t.Run("WhenResetIsCalled", func(t *testing.T) { + r := strings.NewReader(randomString(maxReadBuffer * 2)) + m := NewResetableLimitedReader(r, uint64(maxReadBuffer)) + toRead := make([]byte, maxReadBuffer) + _, err := m.Read(toRead) + assert.NoError(t, err) + m.Reset() + toRead = make([]byte, 300) + _, err = m.Read(toRead) + assert.NoError(t, err) + }) +} diff --git a/filebeat/input/tcp/harvester.go b/filebeat/input/tcp/harvester.go new file mode 100644 index 000000000000..d189ef3303bc --- /dev/null +++ b/filebeat/input/tcp/harvester.go @@ -0,0 +1,154 @@ +package tcp + +import ( + "bufio" + "bytes" + "net" + "sync" + + "github.com/elastic/beats/libbeat/logp" + + "github.com/elastic/beats/filebeat/harvester" +) + +// Harvester represent a TCP server +type Harvester struct { + sync.RWMutex + forwarder *harvester.Forwarder + config *config + server net.Listener + clients map[*Client]struct{} + wg sync.WaitGroup + done chan struct{} + splitFunc bufio.SplitFunc +} + +// NewHarvester creates a new harvester that will forward events +func NewHarvester( + forwarder *harvester.Forwarder, + config *config, +) (*Harvester, error) { + + server, err := net.Listen("tcp", config.Host) + if err != nil { + return nil, err + } + + sf := splitFunc([]byte(config.LineDelimiter)) + return &Harvester{ + config: config, + forwarder: forwarder, + clients: make(map[*Client]struct{}, 0), + done: make(chan struct{}), + server: server, + splitFunc: sf, + }, nil +} + +// Run start and run a new TCP listener to receive new data +func (h *Harvester) Run() error { + logp.Info("Started listening for incoming TCP connection on: %s", h.config.Host) + for { + conn, err := h.server.Accept() + if err != nil { + select { + case <-h.done: + return nil + default: + logp.Debug("tcp", "Can not accept the connection: %s", err) + continue + } + } + + client := NewClient( + conn, + h.forwarder, + h.splitFunc, + h.config.MaxMessageSize, + h.config.Timeout, + ) + logp.Debug( + "tcp", + "New client, remote: %s (total clients: %d)", + conn.RemoteAddr(), + h.clientsCount(), + ) + h.wg.Add(1) + go func() { + defer logp.Recover("recovering from a tcp client crash") + + defer h.wg.Done() + defer conn.Close() + + h.registerClient(client) + defer h.unregisterClient(client) + + err := client.Handle() + if err != nil { + logp.Debug("tcp", "Client error: %s", err) + } + + logp.Debug( + "tcp", + "Client disconnected, remote: %s (total clients: %d)", + conn.RemoteAddr(), + h.clientsCount(), + ) + }() + } +} + +// Stop stops accepting new incoming TCP connection and close any active clients +func (h *Harvester) Stop() { + logp.Info("Stopping TCP harvester") + close(h.done) + h.server.Close() + + logp.Debug("tcp", "Closing remote connections") + for _, client := range h.allClients() { + client.Close() + } + h.wg.Wait() + logp.Debug("tcp", "Remote connections closed") +} + +func (h *Harvester) registerClient(client *Client) { + h.Lock() + defer h.Unlock() + h.clients[client] = struct{}{} +} + +func (h *Harvester) unregisterClient(client *Client) { + h.Lock() + defer h.Unlock() + delete(h.clients, client) +} + +func (h *Harvester) allClients() []*Client { + h.RLock() + defer h.RUnlock() + currentClients := make([]*Client, len(h.clients)) + idx := 0 + for client := range h.clients { + currentClients[idx] = client + idx++ + } + return currentClients +} + +func (h *Harvester) clientsCount() int { + h.RLock() + defer h.RUnlock() + return len(h.clients) +} + +func splitFunc(lineDelimiter []byte) bufio.SplitFunc { + ld := []byte(lineDelimiter) + if bytes.Equal(ld, []byte("\n")) { + // This will work for most usecases and will also strip \r if present. + // CustomDelimiter, need to match completely and the delimiter will be completely removed from the + // returned byte slice + return bufio.ScanLines + } + return scanDelimiter(ld) +} diff --git a/filebeat/input/tcp/harvester_test.go b/filebeat/input/tcp/harvester_test.go new file mode 100644 index 000000000000..d376b95ca453 --- /dev/null +++ b/filebeat/input/tcp/harvester_test.go @@ -0,0 +1,290 @@ +package tcp + +import ( + "fmt" + "math/rand" + "net" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/filebeat/harvester" + "github.com/elastic/beats/filebeat/util" + "github.com/elastic/beats/libbeat/common" +) + +func TestErrorOnEmptyLineDelimiter(t *testing.T) { + cfg := map[string]interface{}{ + "line_delimiter": "", + } + + c, _ := common.NewConfigFrom(cfg) + config := defaultConfig + err := c.Unpack(&config) + assert.Error(t, err) +} + +func TestOverrideHostAndPort(t *testing.T) { + host := "127.0.0.1:10000" + cfg := map[string]interface{}{ + "host": host, + } + c, _ := common.NewConfigFrom(cfg) + config := defaultConfig + err := c.Unpack(&config) + if !assert.NoError(t, err) { + return + } + forwarder := harvester.NewForwarder(nil) + harvester, err := NewHarvester(forwarder, &config) + if !assert.NoError(t, err) { + return + } + defer harvester.Stop() + go func() { + err := harvester.Run() + assert.NoError(t, err) + }() + conn, err := net.Dial("tcp", host) + defer conn.Close() + assert.NoError(t, err) +} + +func TestReceiveEventsAndMetadata(t *testing.T) { + expectedMessages := generateMessages(5, 100) + largeMessages := generateMessages(10, 4096) + + tests := []struct { + name string + cfg map[string]interface{} + expectedMessages []string + messageSent string + }{ + { + name: "NewLine", + cfg: map[string]interface{}{}, + expectedMessages: expectedMessages, + messageSent: strings.Join(expectedMessages, "\n"), + }, + { + name: "NewLineWithCR", + cfg: map[string]interface{}{}, + expectedMessages: expectedMessages, + messageSent: strings.Join(expectedMessages, "\r\n"), + }, + { + name: "CustomDelimiter", + cfg: map[string]interface{}{ + "line_delimiter": ";", + }, + expectedMessages: expectedMessages, + messageSent: strings.Join(expectedMessages, ";"), + }, + { + name: "MultipleCharsCustomDelimiter", + cfg: map[string]interface{}{ + "line_delimiter": "", + }, + expectedMessages: expectedMessages, + messageSent: strings.Join(expectedMessages, ""), + }, + { + name: "SingleCharCustomDelimiterMessageWithoutBoudaries", + cfg: map[string]interface{}{ + "line_delimiter": ";", + }, + expectedMessages: []string{"hello"}, + messageSent: "hello", + }, + { + name: "MultipleCharCustomDelimiterMessageWithoutBoundaries", + cfg: map[string]interface{}{ + "line_delimiter": "", + }, + expectedMessages: []string{"hello"}, + messageSent: "hello", + }, + { + name: "NewLineMessageWithoutBoundaries", + cfg: map[string]interface{}{ + "line_delimiter": "\n", + }, + expectedMessages: []string{"hello"}, + messageSent: "hello", + }, + { + name: "NewLineLargeMessagePayload", + cfg: map[string]interface{}{ + "line_delimiter": "\n", + }, + expectedMessages: largeMessages, + messageSent: strings.Join(largeMessages, "\n"), + }, + { + name: "CustomLargeMessagePayload", + cfg: map[string]interface{}{ + "line_delimiter": ";", + }, + expectedMessages: largeMessages, + messageSent: strings.Join(largeMessages, ";"), + }, + { + name: "MaxReadBufferReached", + cfg: map[string]interface{}{}, + expectedMessages: []string{}, + messageSent: randomString(900000), + }, + { + name: "MaxReadBufferReachedUserConfigured", + cfg: map[string]interface{}{ + "max_read_message": 50000, + }, + expectedMessages: []string{}, + messageSent: randomString(600000), + }, + } + + port := 9000 + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ch := make(chan *util.Data, len(test.expectedMessages)) + defer close(ch) + to := newTestingOutlet(ch) + forwarder := harvester.NewForwarder(to) + test.cfg["host"] = fmt.Sprintf("localhost:%d", port) + cfg, _ := common.NewConfigFrom(test.cfg) + config := defaultConfig + err := cfg.Unpack(&config) + if !assert.NoError(t, err) { + return + } + harvester, err := NewHarvester(forwarder, &config) + if !assert.NoError(t, err) { + return + } + defer func() { + harvester.Stop() + }() + go func() { + err := harvester.Run() + assert.NoError(t, err) + }() + + conn, err := net.Dial("tcp", fmt.Sprintf("localhost:%d", port)) + assert.NoError(t, err) + fmt.Fprint(conn, test.messageSent) + conn.Close() + + var events []*util.Data + + for len(events) < len(test.expectedMessages) { + select { + case event := <-ch: + events = append(events, event) + case <-time.After(time.Second * 10): + t.Fatal("could not drain all the elements") + return + } + } + + for idx, e := range events { + event := e.GetEvent() + message, err := event.GetValue("message") + assert.NoError(t, err) + assert.Equal(t, test.expectedMessages[idx], message) + meta := e.GetMetadata() + _, ok := meta["hostnames"] + assert.True(t, ok) + _, ok = meta["ip_address"] + assert.True(t, ok) + } + }) + port++ + } +} + +func TestReceiveNewEventsConcurrently(t *testing.T) { + workers := 4 + eventsCount := 100 + ch := make(chan *util.Data, eventsCount*workers) + defer close(ch) + to := newTestingOutlet(ch) + forwarder := harvester.NewForwarder(to) + cfg, err := common.NewConfigFrom(map[string]interface{}{"host": "localhost:9000"}) + if !assert.NoError(t, err) { + return + } + config := defaultConfig + err = cfg.Unpack(&config) + if !assert.NoError(t, err) { + return + } + harvester, err := NewHarvester(forwarder, &config) + if !assert.NoError(t, err) { + return + } + defer harvester.Stop() + if !assert.NoError(t, err) { + return + } + go func() { + err := harvester.Run() + assert.NoError(t, err) + }() + + samples := generateMessages(eventsCount, 1024) + for w := 0; w < workers; w++ { + go func() { + conn, err := net.Dial("tcp", "localhost:9000") + defer conn.Close() + assert.NoError(t, err) + for _, sample := range samples { + fmt.Fprintln(conn, sample) + } + }() + } + + var events []*util.Data + for len(events) < eventsCount*workers { + select { + case event := <-ch: + events = append(events, event) + case <-time.After(time.Second * 10): + t.Fatal("timeout when waiting on channel") + return + } + } +} + +type testingOutlet struct { + ch chan *util.Data +} + +func (o *testingOutlet) OnEvent(data *util.Data) bool { + o.ch <- data + return true +} + +func newTestingOutlet(ch chan *util.Data) *testingOutlet { + return &testingOutlet{ch: ch} +} + +// This could be extracted into testing utils and we could add some unicode chars to the charsets. +func randomString(l int) string { + charsets := []byte("abcdefghijklmnopqrstuvwzyzABCDEFGHIJKLMNOPQRSTUVWZYZ0123456789") + message := make([]byte, l) + for i := range message { + message[i] = charsets[rand.Intn(len(charsets))] + } + return string(message) +} + +func generateMessages(c int, l int) []string { + messages := make([]string, c) + for i := range messages { + messages[i] = randomString(l) + } + return messages +} diff --git a/filebeat/input/tcp/input.go b/filebeat/input/tcp/input.go new file mode 100644 index 000000000000..478a1a3fd3de --- /dev/null +++ b/filebeat/input/tcp/input.go @@ -0,0 +1,86 @@ +package tcp + +import ( + "github.com/elastic/beats/filebeat/channel" + "github.com/elastic/beats/filebeat/harvester" + "github.com/elastic/beats/filebeat/input" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/cfgwarn" + "github.com/elastic/beats/libbeat/logp" +) + +func init() { + err := input.Register("tcp", NewInput) + if err != nil { + panic(err) + } +} + +// Input for TCP connection +type Input struct { + harvester *Harvester + started bool + outlet channel.Outleter + config *config +} + +// NewInput creates a new TCP input +func NewInput( + cfg *common.Config, + outlet channel.Factory, + context input.Context, +) (input.Input, error) { + cfgwarn.Experimental("TCP input type is used") + + out, err := outlet(cfg, context.DynamicFields) + if err != nil { + return nil, err + } + + forwarder := harvester.NewForwarder(out) + + config := defaultConfig + err = cfg.Unpack(&config) + if err != nil { + return nil, err + } + + harvester, err := NewHarvester(forwarder, &config) + if err != nil { + return nil, err + } + + return &Input{ + harvester: harvester, + started: false, + outlet: out, + config: &config, + }, nil +} + +// Run start a TCP input +func (p *Input) Run() { + if !p.started { + logp.Info("Starting TCP input on: %s", p.config.Host) + p.started = true + + go func() { + defer p.outlet.Close() + err := p.harvester.Run() + if err != nil { + logp.Err("Error running TCP harvester, error: %s", err) + } + }() + } +} + +// Stop stops TCP server +func (p *Input) Stop() { + logp.Info("Stopping TCP input on: %s", p.config.Host) + p.harvester.Stop() +} + +// Wait stop the current harvester +func (p *Input) Wait() { + p.Stop() +} diff --git a/filebeat/input/tcp/scan.go b/filebeat/input/tcp/scan.go new file mode 100644 index 000000000000..a1e8c42f7617 --- /dev/null +++ b/filebeat/input/tcp/scan.go @@ -0,0 +1,31 @@ +package tcp + +import ( + "bufio" + "bytes" +) + +// ScanDelimiter return a function to split line using a custom delimiter, the delimiter +// is stripped from the returned value. +func scanDelimiter(delimiter []byte) bufio.SplitFunc { + return func(data []byte, eof bool) (int, []byte, error) { + if eof && len(data) == 0 { + return 0, nil, nil + } + if i := bytes.Index(data, delimiter); i >= 0 { + return i + len(delimiter), dropDelimiter(data[0:i], delimiter), nil + } + if eof { + return len(data), dropDelimiter(data, delimiter), nil + } + return 0, nil, nil + } +} + +func dropDelimiter(data []byte, delimiter []byte) []byte { + if len(data) > len(delimiter) && + bytes.Equal(data[len(data)-len(delimiter):len(data)], delimiter) { + return data[0 : len(data)-len(delimiter)] + } + return data +} diff --git a/filebeat/input/tcp/scan_test.go b/filebeat/input/tcp/scan_test.go new file mode 100644 index 000000000000..937ffb43b1e2 --- /dev/null +++ b/filebeat/input/tcp/scan_test.go @@ -0,0 +1,91 @@ +package tcp + +import ( + "bufio" + "strings" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestCustomDelimiter(t *testing.T) { + tests := []struct { + name string + text string + expected []string + delimiter []byte + }{ + { + name: "Multiple chars delimiter", + text: "hellobonjourholahey", + expected: []string{ + "hello", + "bonjour", + "hola", + "hey", + }, + delimiter: []byte(""), + }, + { + name: "Multiple chars delimiter with half starting delimiter", + text: "hellobonjourhey", + expected: []string{ + "hello", + "bonjour"), + }, + { + name: "Multiple chars delimiter with half ending delimiter", + text: "helloEND>holahey", + expected: []string{ + "hello", + "END>hola", + "hey", + }, + delimiter: []byte(""), + }, + { + name: "Delimiter end of string", + text: "hellobonjourholahey", + expected: []string{ + "hello", + "bonjour", + "hola", + "hey", + }, + delimiter: []byte(""), + }, + { + name: "Single char delimiter", + text: "hello;bonjour;hola;hey", + expected: []string{ + "hello", + "bonjour", + "hola", + "hey", + }, + delimiter: []byte(";"), + }, + { + name: "Empty string", + text: "", + expected: []string(nil), + delimiter: []byte(";"), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + buf := strings.NewReader(test.text) + scanner := bufio.NewScanner(buf) + scanner.Split(scanDelimiter(test.delimiter)) + var elements []string + for scanner.Scan() { + elements = append(elements, scanner.Text()) + } + assert.EqualValues(t, test.expected, elements) + }) + } +} diff --git a/filebeat/tests/system/test_tcp.py b/filebeat/tests/system/test_tcp.py new file mode 100644 index 000000000000..dffecfe4f698 --- /dev/null +++ b/filebeat/tests/system/test_tcp.py @@ -0,0 +1,68 @@ +from filebeat import BaseTest +import socket + + +class Test(BaseTest): + """ + Test filebeat TCP input + """ + + def test_tcp_with_newline_delimiter(self): + """ + Test TCP input with a new line delimiter + """ + self.send_events_with_delimiter("\n") + + def test_tcp_with_custom_char_delimiter(self): + """ + Test TCP input with a custom single char delimiter + """ + self.send_events_with_delimiter(";") + + def test_tcp_with_custom_word_delimiter(self): + """ + Test TCP input with a custom single char delimiter + """ + self.send_events_with_delimiter("") + + def send_events_with_delimiter(self, delimiter): + host = "127.0.0.1" + port = 8080 + input_raw = """ +- type: tcp + host: "{}:{}" + enabled: true +""" + + # Use default of \n and stripping \r + if delimiter is not "": + input_raw += "\n line_delimiter: {}".format(delimiter) + + input_raw = input_raw.format(host, port) + + self.render_config_template( + input_raw=input_raw, + inputs=False, + ) + + filebeat = self.start_beat() + + self.wait_until(lambda: self.log_contains("Started listening for incoming TCP connection")) + + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # TCP + sock.connect((host, port)) + + for n in range(0, 2): + sock.send("Hello World: " + str(n) + delimiter) + + self.wait_until(lambda: self.output_count(lambda x: x >= 2)) + + filebeat.check_kill_and_wait() + + output = self.read_output() + + assert len(output) == 2 + assert output[0]["prospector.type"] == "tcp" + assert output[0]["input.type"] == "tcp" + + sock.close()