From db83adaa5d8d1a4620c2230ca1b8dfc3e3f5377e Mon Sep 17 00:00:00 2001 From: Marcin Tojek Date: Thu, 6 Feb 2020 10:28:47 +0100 Subject: [PATCH 1/9] Create mosquitto image --- filebeat/docker-compose.yml | 10 ++++++++++ testing/environments/docker/mosquitto/Dockerfile | 2 ++ 2 files changed, 12 insertions(+) create mode 100644 testing/environments/docker/mosquitto/Dockerfile diff --git a/filebeat/docker-compose.yml b/filebeat/docker-compose.yml index 52437a78a8d..a9cbc9d32bf 100644 --- a/filebeat/docker-compose.yml +++ b/filebeat/docker-compose.yml @@ -16,6 +16,8 @@ services: - KAFKA_PORT=9092 - KIBANA_HOST=kibana - KIBANA_PORT=5601 + - MOSQUITTO_HOST=mosquitto + - MOSQUITTO_PORT=1883 working_dir: /go/src/github.com/elastic/beats/filebeat volumes: - ${PWD}/..:/go/src/github.com/elastic/beats/ @@ -31,6 +33,7 @@ services: elasticsearch: { condition: service_healthy } kafka: { condition: service_healthy } kibana: { condition: service_healthy } + mosquitto: { condition: service_healthy } redis: { condition: service_healthy } elasticsearch: @@ -51,5 +54,12 @@ services: file: ${ES_BEATS}/testing/environments/${TESTING_ENVIRONMENT}.yml service: kibana + mosquitto: + build: ${ES_BEATS}/testing/environments/docker/mosquitto + expose: + - 1883 + ports: + - "127.0.0.1:1883:1883" + redis: build: ${PWD}/input/redis/_meta diff --git a/testing/environments/docker/mosquitto/Dockerfile b/testing/environments/docker/mosquitto/Dockerfile new file mode 100644 index 00000000000..eac5d1e0d6c --- /dev/null +++ b/testing/environments/docker/mosquitto/Dockerfile @@ -0,0 +1,2 @@ +FROM eclipse-mosquitto:1.6.8 +HEALTHCHECK --interval=1s --retries=600 CMD nc -z localhost 1883 From 510c55eab0dd53acc5964d0d8252a16ed1065bb6 Mon Sep 17 00:00:00 2001 From: Marcin Tojek Date: Thu, 6 Feb 2020 19:01:55 +0100 Subject: [PATCH 2/9] MQTT input: add integration test --- filebeat/input/mqtt/input.go | 2 +- filebeat/input/mqtt/input_test.go | 9 +- filebeat/input/mqtt/mqtt_integration_test.go | 167 +++++++++++++++++++ 3 files changed, 175 insertions(+), 3 deletions(-) create mode 100644 filebeat/input/mqtt/mqtt_integration_test.go diff --git a/filebeat/input/mqtt/input.go b/filebeat/input/mqtt/input.go index a3e00338cb8..f70b8ef3eb5 100644 --- a/filebeat/input/mqtt/input.go +++ b/filebeat/input/mqtt/input.go @@ -119,7 +119,7 @@ func createOnMessageHandler(logger *logp.Logger, outlet channel.Outleter, inflig Timestamp: time.Now(), Fields: common.MapStr{ "message": string(message.Payload()), - "mqtt": mqttFields, + "mqtt": mqttFields, }, }) diff --git a/filebeat/input/mqtt/input_test.go b/filebeat/input/mqtt/input_test.go index 9261e40b468..15299088559 100644 --- a/filebeat/input/mqtt/input_test.go +++ b/filebeat/input/mqtt/input_test.go @@ -75,6 +75,8 @@ func TestNewInput_Run(t *testing.T) { }) eventsCh := make(chan beat.Event) + defer close(eventsCh) + outlet := &mockedOutleter{ onEventHandler: func(event beat.Event) bool { eventsCh <- event @@ -137,7 +139,7 @@ func TestNewInput_Run(t *testing.T) { } } -func TestNewInput_Run_Stop(t *testing.T) { +func TestNewInput_Run_Wait(t *testing.T) { config := common.MustNewConfigFrom(common.MapStr{ "hosts": "tcp://mocked:1234", "topics": []string{"first", "second"}, @@ -148,7 +150,10 @@ func TestNewInput_Run_Stop(t *testing.T) { var eventProcessing sync.WaitGroup eventProcessing.Add(numMessages) + eventsCh := make(chan beat.Event) + defer close(eventsCh) + outlet := &mockedOutleter{ onEventHandler: func(event beat.Event) bool { eventProcessing.Done() @@ -198,7 +203,7 @@ func TestNewInput_Run_Stop(t *testing.T) { } }() - input.Stop() + input.Wait() } func TestRun_Once(t *testing.T) { diff --git a/filebeat/input/mqtt/mqtt_integration_test.go b/filebeat/input/mqtt/mqtt_integration_test.go new file mode 100644 index 00000000000..56bc37be7d5 --- /dev/null +++ b/filebeat/input/mqtt/mqtt_integration_test.go @@ -0,0 +1,167 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +build integration + +package mqtt + +import ( + "fmt" + "os" + "sync" + "testing" + "time" + + libmqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/filebeat/channel" + "github.com/elastic/beats/filebeat/input" + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" +) + +const ( + message = "hello-world" + messageCount = 100 + + waitTimeout = 30 * time.Second +) + +var ( + hostPort = fmt.Sprintf("tcp://%s:%s", + getOrDefault(os.Getenv("MOSQUITTO_HOST"), "mosquitto"), + getOrDefault(os.Getenv("MOSQUITTO_PORT"), "1883")) + topic = fmt.Sprintf("topic-%d", time.Now().UnixNano()) +) + +type eventCaptor struct { + c chan struct{} + closeOnce sync.Once + closed bool + events chan beat.Event +} + +func newEventCaptor(events chan beat.Event) channel.Outleter { + return &eventCaptor{ + c: make(chan struct{}), + events: events, + } +} + +func (ec *eventCaptor) OnEvent(event beat.Event) bool { + ec.events <- event + return true +} + +func (ec *eventCaptor) Close() error { + ec.closeOnce.Do(func() { + ec.closed = true + close(ec.c) + }) + return nil +} + +func (ec *eventCaptor) Done() <-chan struct{} { + return ec.c +} + +func TestInput(t *testing.T) { + logp.TestingSetup(logp.WithSelectors("mqtt input", "libmqtt")) + + // Setup the input config. + config := common.MustNewConfigFrom(common.MapStr{ + "hosts": []string{hostPort}, + "topics": []string{topic}, + }) + + // Route input events through our captor instead of sending through ES. + events := make(chan beat.Event, messageCount) + defer close(events) + + captor := newEventCaptor(events) + defer captor.Close() + + connector := channel.ConnectorFunc(func(_ *common.Config, _ beat.ClientConfig) (channel.Outleter, error) { + return channel.SubOutlet(captor), nil + }) + + // Mock the context. + inputContext := input.Context{ + Done: make(chan struct{}), + BeatDone: make(chan struct{}), + } + + // Setup the input + input, err := NewInput(config, connector, inputContext) + require.NoError(t, err) + require.NotNil(t, input) + + // Run the input. + input.Run() + + // Create Publisher + publisher := createPublisher(t) + + var wg sync.WaitGroup + wg.Add(2) + go verifyEventsReceived(t, &wg, events) + go emitInputData(t, &wg, publisher) + + wg.Wait() +} + +func createPublisher(t *testing.T) libmqtt.Client { + clientOptions := libmqtt.NewClientOptions(). + SetClientID("emitter"). + SetAutoReconnect(false). + SetConnectRetry(false). + AddBroker(hostPort) + client := libmqtt.NewClient(clientOptions) + token := client.Connect() + require.True(t, token.WaitTimeout(waitTimeout)) + require.NoError(t, token.Error()) + return client +} + +func verifyEventsReceived(t *testing.T, wg *sync.WaitGroup, events <-chan beat.Event) { + for i := 0; i < messageCount; i++ { + event := <-events + + val, err := event.GetValue("message") + require.NoError(t, err) + require.Equal(t, message, val) + } + wg.Done() +} + +func emitInputData(t *testing.T, wg *sync.WaitGroup, publisher libmqtt.Client) { + for i := 0; i < messageCount; i++ { + token := publisher.Publish(topic, 2, false, []byte(message)) + require.True(t, token.WaitTimeout(waitTimeout)) + require.NoError(t, token.Error()) + } + wg.Done() +} + +func getOrDefault(s, defaultString string) string { + if s == "" { + return defaultString + } + return s +} From 881fec56a43b9739475b1fd2c277550e420ce150 Mon Sep 17 00:00:00 2001 From: Marcin Tojek Date: Thu, 6 Feb 2020 19:05:23 +0100 Subject: [PATCH 3/9] Fix --- filebeat/docker-compose.yml | 2 -- filebeat/input/mqtt/input.go | 2 +- filebeat/input/mqtt/mqtt_integration_test.go | 2 +- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/filebeat/docker-compose.yml b/filebeat/docker-compose.yml index a9cbc9d32bf..19302ae1e6f 100644 --- a/filebeat/docker-compose.yml +++ b/filebeat/docker-compose.yml @@ -58,8 +58,6 @@ services: build: ${ES_BEATS}/testing/environments/docker/mosquitto expose: - 1883 - ports: - - "127.0.0.1:1883:1883" redis: build: ${PWD}/input/redis/_meta diff --git a/filebeat/input/mqtt/input.go b/filebeat/input/mqtt/input.go index f70b8ef3eb5..a3e00338cb8 100644 --- a/filebeat/input/mqtt/input.go +++ b/filebeat/input/mqtt/input.go @@ -119,7 +119,7 @@ func createOnMessageHandler(logger *logp.Logger, outlet channel.Outleter, inflig Timestamp: time.Now(), Fields: common.MapStr{ "message": string(message.Payload()), - "mqtt": mqttFields, + "mqtt": mqttFields, }, }) diff --git a/filebeat/input/mqtt/mqtt_integration_test.go b/filebeat/input/mqtt/mqtt_integration_test.go index 56bc37be7d5..dab5ecc9389 100644 --- a/filebeat/input/mqtt/mqtt_integration_test.go +++ b/filebeat/input/mqtt/mqtt_integration_test.go @@ -37,7 +37,7 @@ import ( ) const ( - message = "hello-world" + message = "hello-world" messageCount = 100 waitTimeout = 30 * time.Second From 5673009cd17c549e7efcd50cc902ce3c1427944d Mon Sep 17 00:00:00 2001 From: Marcin Tojek Date: Thu, 6 Feb 2020 20:31:35 +0100 Subject: [PATCH 4/9] Verify connectivity --- filebeat/input/mqtt/mqtt_integration_test.go | 51 +++++++++++++------- 1 file changed, 34 insertions(+), 17 deletions(-) diff --git a/filebeat/input/mqtt/mqtt_integration_test.go b/filebeat/input/mqtt/mqtt_integration_test.go index dab5ecc9389..32316403fee 100644 --- a/filebeat/input/mqtt/mqtt_integration_test.go +++ b/filebeat/input/mqtt/mqtt_integration_test.go @@ -91,10 +91,10 @@ func TestInput(t *testing.T) { }) // Route input events through our captor instead of sending through ES. - events := make(chan beat.Event, messageCount) - defer close(events) + eventsCh := make(chan beat.Event) + defer close(eventsCh) - captor := newEventCaptor(events) + captor := newEventCaptor(eventsCh) defer captor.Close() connector := channel.ConnectorFunc(func(_ *common.Config, _ beat.ClientConfig) (channel.Outleter, error) { @@ -120,8 +120,9 @@ func TestInput(t *testing.T) { var wg sync.WaitGroup wg.Add(2) - go verifyEventsReceived(t, &wg, events) - go emitInputData(t, &wg, publisher) + + verifiedCh := verifyEventReceived(t, &wg, eventsCh) + emitInputData(t, &wg, verifiedCh, publisher) wg.Wait() } @@ -139,24 +140,39 @@ func createPublisher(t *testing.T) libmqtt.Client { return client } -func verifyEventsReceived(t *testing.T, wg *sync.WaitGroup, events <-chan beat.Event) { - for i := 0; i < messageCount; i++ { - event := <-events +func verifyEventReceived(t *testing.T, wg *sync.WaitGroup, eventsCh <-chan beat.Event) <-chan struct{} { + verifiedCh := make(chan struct{}) + go func() { + event := <-eventsCh val, err := event.GetValue("message") require.NoError(t, err) require.Equal(t, message, val) - } - wg.Done() + + verifiedCh <- struct{}{} + + wg.Done() + }() + return verifiedCh } -func emitInputData(t *testing.T, wg *sync.WaitGroup, publisher libmqtt.Client) { - for i := 0; i < messageCount; i++ { - token := publisher.Publish(topic, 2, false, []byte(message)) - require.True(t, token.WaitTimeout(waitTimeout)) - require.NoError(t, token.Error()) - } - wg.Done() +func emitInputData(t *testing.T, wg *sync.WaitGroup, verifiedCh <-chan struct{}, publisher libmqtt.Client) { + go func() { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for { + select { + case <-verifiedCh: + wg.Done() + return + case <-ticker.C: + token := publisher.Publish(topic, 1, false, []byte(message)) + require.True(t, token.WaitTimeout(waitTimeout)) + require.NoError(t, token.Error()) + } + } + }() } func getOrDefault(s, defaultString string) string { @@ -165,3 +181,4 @@ func getOrDefault(s, defaultString string) string { } return s } + From 59c580d4aca4e07b9227a1aca92834b562ccc0c8 Mon Sep 17 00:00:00 2001 From: Marcin Tojek Date: Thu, 6 Feb 2020 20:34:13 +0100 Subject: [PATCH 5/9] Fix --- filebeat/input/mqtt/mqtt_integration_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/filebeat/input/mqtt/mqtt_integration_test.go b/filebeat/input/mqtt/mqtt_integration_test.go index 32316403fee..24c93e668d7 100644 --- a/filebeat/input/mqtt/mqtt_integration_test.go +++ b/filebeat/input/mqtt/mqtt_integration_test.go @@ -38,7 +38,6 @@ import ( const ( message = "hello-world" - messageCount = 100 waitTimeout = 30 * time.Second ) From 0894e4710069a417c60e19678709d9078a6ab41c Mon Sep 17 00:00:00 2001 From: Marcin Tojek Date: Thu, 6 Feb 2020 20:43:10 +0100 Subject: [PATCH 6/9] Fix: mage check --- filebeat/input/mqtt/mqtt_integration_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/filebeat/input/mqtt/mqtt_integration_test.go b/filebeat/input/mqtt/mqtt_integration_test.go index 24c93e668d7..e9ee04a8fed 100644 --- a/filebeat/input/mqtt/mqtt_integration_test.go +++ b/filebeat/input/mqtt/mqtt_integration_test.go @@ -37,7 +37,7 @@ import ( ) const ( - message = "hello-world" + message = "hello-world" waitTimeout = 30 * time.Second ) @@ -180,4 +180,3 @@ func getOrDefault(s, defaultString string) string { } return s } - From 1e986d938e81fbc80c2afb371ed809731df9e971 Mon Sep 17 00:00:00 2001 From: Marcin Tojek Date: Thu, 6 Feb 2020 21:56:31 +0100 Subject: [PATCH 7/9] Fix --- filebeat/input/mqtt/mqtt_integration_test.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/filebeat/input/mqtt/mqtt_integration_test.go b/filebeat/input/mqtt/mqtt_integration_test.go index e9ee04a8fed..d5f11f1dd5d 100644 --- a/filebeat/input/mqtt/mqtt_integration_test.go +++ b/filebeat/input/mqtt/mqtt_integration_test.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -// +build integration +// + build integration package mqtt @@ -90,7 +90,7 @@ func TestInput(t *testing.T) { }) // Route input events through our captor instead of sending through ES. - eventsCh := make(chan beat.Event) + eventsCh := make(chan beat.Event, 100) defer close(eventsCh) captor := newEventCaptor(eventsCh) @@ -140,7 +140,8 @@ func createPublisher(t *testing.T) libmqtt.Client { } func verifyEventReceived(t *testing.T, wg *sync.WaitGroup, eventsCh <-chan beat.Event) <-chan struct{} { - verifiedCh := make(chan struct{}) + verifiedCh := make(chan struct{}, 1) + go func() { event := <-eventsCh @@ -157,7 +158,7 @@ func verifyEventReceived(t *testing.T, wg *sync.WaitGroup, eventsCh <-chan beat. func emitInputData(t *testing.T, wg *sync.WaitGroup, verifiedCh <-chan struct{}, publisher libmqtt.Client) { go func() { - ticker := time.NewTicker(time.Second) + ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() for { From b8264b56f9a5467fa817f04f60bd084780e9f255 Mon Sep 17 00:00:00 2001 From: Marcin Tojek Date: Thu, 6 Feb 2020 22:27:41 +0100 Subject: [PATCH 8/9] Fix --- filebeat/input/mqtt/mqtt_integration_test.go | 45 ++++++++------------ 1 file changed, 18 insertions(+), 27 deletions(-) diff --git a/filebeat/input/mqtt/mqtt_integration_test.go b/filebeat/input/mqtt/mqtt_integration_test.go index d5f11f1dd5d..b9663ef71b8 100644 --- a/filebeat/input/mqtt/mqtt_integration_test.go +++ b/filebeat/input/mqtt/mqtt_integration_test.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -// + build integration +// +build integration package mqtt @@ -33,6 +33,7 @@ import ( "github.com/elastic/beats/filebeat/input" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/backoff" "github.com/elastic/beats/libbeat/logp" ) @@ -83,6 +84,9 @@ func (ec *eventCaptor) Done() <-chan struct{} { func TestInput(t *testing.T) { logp.TestingSetup(logp.WithSelectors("mqtt input", "libmqtt")) + newMqttClient = libmqtt.NewClient + newBackoff = backoff.NewEqualJitterBackoff + // Setup the input config. config := common.MustNewConfigFrom(common.MapStr{ "hosts": []string{hostPort}, @@ -90,7 +94,7 @@ func TestInput(t *testing.T) { }) // Route input events through our captor instead of sending through ES. - eventsCh := make(chan beat.Event, 100) + eventsCh := make(chan beat.Event) defer close(eventsCh) captor := newEventCaptor(eventsCh) @@ -117,13 +121,18 @@ func TestInput(t *testing.T) { // Create Publisher publisher := createPublisher(t) - var wg sync.WaitGroup - wg.Add(2) + // Verify that event has been received + verifiedCh := make(chan struct{}) + defer close(verifiedCh) + + emitInputData(t, verifiedCh, publisher) - verifiedCh := verifyEventReceived(t, &wg, eventsCh) - emitInputData(t, &wg, verifiedCh, publisher) + event := <-eventsCh + verifiedCh <- struct{}{} - wg.Wait() + val, err := event.GetValue("message") + require.NoError(t, err) + require.Equal(t, message, val) } func createPublisher(t *testing.T) libmqtt.Client { @@ -139,32 +148,14 @@ func createPublisher(t *testing.T) libmqtt.Client { return client } -func verifyEventReceived(t *testing.T, wg *sync.WaitGroup, eventsCh <-chan beat.Event) <-chan struct{} { - verifiedCh := make(chan struct{}, 1) - - go func() { - event := <-eventsCh - - val, err := event.GetValue("message") - require.NoError(t, err) - require.Equal(t, message, val) - - verifiedCh <- struct{}{} - - wg.Done() - }() - return verifiedCh -} - -func emitInputData(t *testing.T, wg *sync.WaitGroup, verifiedCh <-chan struct{}, publisher libmqtt.Client) { +func emitInputData(t *testing.T, verifiedCh <-chan struct{}, publisher libmqtt.Client) { go func() { - ticker := time.NewTicker(10 * time.Second) + ticker := time.NewTicker(time.Second) defer ticker.Stop() for { select { case <-verifiedCh: - wg.Done() return case <-ticker.C: token := publisher.Publish(topic, 1, false, []byte(message)) From 91751b827747acf2b53d4b63122f91efbc7ec89a Mon Sep 17 00:00:00 2001 From: Marcin Tojek Date: Fri, 7 Feb 2020 08:51:54 +0100 Subject: [PATCH 9/9] Fix: remove global var --- filebeat/input/mqtt/input.go | 23 ++++++++++++----- filebeat/input/mqtt/input_test.go | 27 ++++++++------------ filebeat/input/mqtt/mqtt_integration_test.go | 4 --- 3 files changed, 27 insertions(+), 27 deletions(-) diff --git a/filebeat/input/mqtt/input.go b/filebeat/input/mqtt/input.go index a3e00338cb8..945c1fe8de5 100644 --- a/filebeat/input/mqtt/input.go +++ b/filebeat/input/mqtt/input.go @@ -40,11 +40,6 @@ const ( subscribeRetryInterval = 1 * time.Second ) -var ( - newMqttClient = libmqtt.NewClient - newBackoff = backoff.NewEqualJitterBackoff -) - // Input contains the input and its config type mqttInput struct { once sync.Once @@ -67,6 +62,16 @@ func NewInput( cfg *common.Config, connector channel.Connector, inputContext input.Context, +) (input.Input, error) { + return newInput(cfg, connector, inputContext, libmqtt.NewClient, backoff.NewEqualJitterBackoff) +} + +func newInput( + cfg *common.Config, + connector channel.Connector, + inputContext input.Context, + newMqttClient func(options *libmqtt.ClientOptions) libmqtt.Client, + newBackoff func(done <-chan struct{}, init, max time.Duration) backoff.Backoff, ) (input.Input, error) { config := defaultConfig() if err := cfg.Unpack(&config); err != nil { @@ -88,7 +93,7 @@ func NewInput( inflightMessages := new(sync.WaitGroup) clientSubscriptions := createClientSubscriptions(config) onMessageHandler := createOnMessageHandler(logger, out, inflightMessages) - onConnectHandler := createOnConnectHandler(logger, &inputContext, onMessageHandler, clientSubscriptions) + onConnectHandler := createOnConnectHandler(logger, &inputContext, onMessageHandler, clientSubscriptions, newBackoff) clientOptions, err := createClientOptions(config, onConnectHandler) if err != nil { return nil, err @@ -127,7 +132,11 @@ func createOnMessageHandler(logger *logp.Logger, outlet channel.Outleter, inflig } } -func createOnConnectHandler(logger *logp.Logger, inputContext *input.Context, onMessageHandler func(client libmqtt.Client, message libmqtt.Message), clientSubscriptions map[string]byte) func(client libmqtt.Client) { +func createOnConnectHandler(logger *logp.Logger, + inputContext *input.Context, + onMessageHandler func(client libmqtt.Client, message libmqtt.Message), + clientSubscriptions map[string]byte, + newBackoff func(done <-chan struct{}, init, max time.Duration) backoff.Backoff) func(client libmqtt.Client) { // The function subscribes the client to the specific topics (with retry backoff in case of failure). return func(client libmqtt.Client) { backoff := newBackoff( diff --git a/filebeat/input/mqtt/input_test.go b/filebeat/input/mqtt/input_test.go index 15299088559..99413bc5cf7 100644 --- a/filebeat/input/mqtt/input_test.go +++ b/filebeat/input/mqtt/input_test.go @@ -33,9 +33,7 @@ import ( "github.com/elastic/beats/libbeat/logp" ) -var ( - logger = logp.NewLogger("test") -) +var logger = logp.NewLogger("test") func TestNewInput_MissingConfigField(t *testing.T) { config := common.MustNewConfigFrom(common.MapStr{ @@ -106,7 +104,7 @@ func TestNewInput_Run(t *testing.T) { } var client *mockedClient - newMqttClient = func(o *libmqtt.ClientOptions) libmqtt.Client { + newMqttClient := func(o *libmqtt.ClientOptions) libmqtt.Client { client = &mockedClient{ onConnectHandler: o.OnConnect, messages: []mockedMessage{firstMessage, secondMessage}, @@ -117,7 +115,7 @@ func TestNewInput_Run(t *testing.T) { return client } - input, err := NewInput(config, connector, inputContext) + input, err := newInput(config, connector, inputContext, newMqttClient, backoff.NewEqualJitterBackoff) require.NoError(t, err) require.NotNil(t, input) @@ -179,7 +177,7 @@ func TestNewInput_Run_Wait(t *testing.T) { } var client *mockedClient - newMqttClient = func(o *libmqtt.ClientOptions) libmqtt.Client { + newMqttClient := func(o *libmqtt.ClientOptions) libmqtt.Client { client = &mockedClient{ onConnectHandler: o.OnConnect, messages: messages, @@ -190,7 +188,7 @@ func TestNewInput_Run_Wait(t *testing.T) { return client } - input, err := NewInput(config, connector, inputContext) + input, err := newInput(config, connector, inputContext, newMqttClient, backoff.NewEqualJitterBackoff) require.NoError(t, err) require.NotNil(t, input) @@ -259,11 +257,10 @@ func TestOnCreateHandler_SubscribeMultiple_Succeeded(t *testing.T) { inputContext := new(finput.Context) onMessageHandler := func(client libmqtt.Client, message libmqtt.Message) {} var clientSubscriptions map[string]byte - handler := createOnConnectHandler(logger, inputContext, onMessageHandler, clientSubscriptions) - - newBackoff = func(done <-chan struct{}, init, max time.Duration) backoff.Backoff { + newBackoff := func(done <-chan struct{}, init, max time.Duration) backoff.Backoff { return backoff.NewEqualJitterBackoff(inputContext.Done, time.Nanosecond, 2*time.Nanosecond) } + handler := createOnConnectHandler(logger, inputContext, onMessageHandler, clientSubscriptions, newBackoff) client := &mockedClient{ tokens: []libmqtt.Token{&mockedToken{ @@ -279,11 +276,10 @@ func TestOnCreateHandler_SubscribeMultiple_BackoffSucceeded(t *testing.T) { inputContext := new(finput.Context) onMessageHandler := func(client libmqtt.Client, message libmqtt.Message) {} var clientSubscriptions map[string]byte - handler := createOnConnectHandler(logger, inputContext, onMessageHandler, clientSubscriptions) - - newBackoff = func(done <-chan struct{}, init, max time.Duration) backoff.Backoff { + newBackoff := func(done <-chan struct{}, init, max time.Duration) backoff.Backoff { return backoff.NewEqualJitterBackoff(inputContext.Done, time.Nanosecond, 2*time.Nanosecond) } + handler := createOnConnectHandler(logger, inputContext, onMessageHandler, clientSubscriptions, newBackoff) client := &mockedClient{ tokens: []libmqtt.Token{&mockedToken{ @@ -301,14 +297,13 @@ func TestOnCreateHandler_SubscribeMultiple_BackoffSignalDone(t *testing.T) { inputContext := new(finput.Context) onMessageHandler := func(client libmqtt.Client, message libmqtt.Message) {} var clientSubscriptions map[string]byte - handler := createOnConnectHandler(logger, inputContext, onMessageHandler, clientSubscriptions) - mockedBackoff := &mockedBackoff{ waits: []bool{true, false}, } - newBackoff = func(done <-chan struct{}, init, max time.Duration) backoff.Backoff { + newBackoff := func(done <-chan struct{}, init, max time.Duration) backoff.Backoff { return mockedBackoff } + handler := createOnConnectHandler(logger, inputContext, onMessageHandler, clientSubscriptions, newBackoff) client := &mockedClient{ tokens: []libmqtt.Token{&mockedToken{ diff --git a/filebeat/input/mqtt/mqtt_integration_test.go b/filebeat/input/mqtt/mqtt_integration_test.go index b9663ef71b8..3fd3506cbf4 100644 --- a/filebeat/input/mqtt/mqtt_integration_test.go +++ b/filebeat/input/mqtt/mqtt_integration_test.go @@ -33,7 +33,6 @@ import ( "github.com/elastic/beats/filebeat/input" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/common/backoff" "github.com/elastic/beats/libbeat/logp" ) @@ -84,9 +83,6 @@ func (ec *eventCaptor) Done() <-chan struct{} { func TestInput(t *testing.T) { logp.TestingSetup(logp.WithSelectors("mqtt input", "libmqtt")) - newMqttClient = libmqtt.NewClient - newBackoff = backoff.NewEqualJitterBackoff - // Setup the input config. config := common.MustNewConfigFrom(common.MapStr{ "hosts": []string{hostPort},