Skip to content

Commit

Permalink
MQTT: add integration test (#16143)
Browse files Browse the repository at this point in the history
* Create mosquitto image

* MQTT input: add integration test

* Fix

* Verify connectivity

* Fix

* Fix: mage check

* Fix

* Fix

* Fix: remove global var
  • Loading branch information
mtojek committed Feb 7, 2020
1 parent e19f78c commit 3ab3702
Show file tree
Hide file tree
Showing 5 changed files with 214 additions and 25 deletions.
8 changes: 8 additions & 0 deletions filebeat/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ services:
- KAFKA_PORT=9092
- KIBANA_HOST=kibana
- KIBANA_PORT=5601
- MOSQUITTO_HOST=mosquitto
- MOSQUITTO_PORT=1883
working_dir: /go/src/github.com/elastic/beats/filebeat
volumes:
- ${PWD}/..:/go/src/github.com/elastic/beats/
Expand All @@ -31,6 +33,7 @@ services:
elasticsearch: { condition: service_healthy }
kafka: { condition: service_healthy }
kibana: { condition: service_healthy }
mosquitto: { condition: service_healthy }
redis: { condition: service_healthy }

elasticsearch:
Expand All @@ -51,5 +54,10 @@ services:
file: ${ES_BEATS}/testing/environments/${TESTING_ENVIRONMENT}.yml
service: kibana

mosquitto:
build: ${ES_BEATS}/testing/environments/docker/mosquitto
expose:
- 1883

redis:
build: ${PWD}/input/redis/_meta
23 changes: 16 additions & 7 deletions filebeat/input/mqtt/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,6 @@ const (
subscribeRetryInterval = 1 * time.Second
)

var (
newMqttClient = libmqtt.NewClient
newBackoff = backoff.NewEqualJitterBackoff
)

// Input contains the input and its config
type mqttInput struct {
once sync.Once
Expand All @@ -67,6 +62,16 @@ func NewInput(
cfg *common.Config,
connector channel.Connector,
inputContext input.Context,
) (input.Input, error) {
return newInput(cfg, connector, inputContext, libmqtt.NewClient, backoff.NewEqualJitterBackoff)
}

func newInput(
cfg *common.Config,
connector channel.Connector,
inputContext input.Context,
newMqttClient func(options *libmqtt.ClientOptions) libmqtt.Client,
newBackoff func(done <-chan struct{}, init, max time.Duration) backoff.Backoff,
) (input.Input, error) {
config := defaultConfig()
if err := cfg.Unpack(&config); err != nil {
Expand All @@ -88,7 +93,7 @@ func NewInput(
inflightMessages := new(sync.WaitGroup)
clientSubscriptions := createClientSubscriptions(config)
onMessageHandler := createOnMessageHandler(logger, out, inflightMessages)
onConnectHandler := createOnConnectHandler(logger, &inputContext, onMessageHandler, clientSubscriptions)
onConnectHandler := createOnConnectHandler(logger, &inputContext, onMessageHandler, clientSubscriptions, newBackoff)
clientOptions, err := createClientOptions(config, onConnectHandler)
if err != nil {
return nil, err
Expand Down Expand Up @@ -127,7 +132,11 @@ func createOnMessageHandler(logger *logp.Logger, outlet channel.Outleter, inflig
}
}

func createOnConnectHandler(logger *logp.Logger, inputContext *input.Context, onMessageHandler func(client libmqtt.Client, message libmqtt.Message), clientSubscriptions map[string]byte) func(client libmqtt.Client) {
func createOnConnectHandler(logger *logp.Logger,
inputContext *input.Context,
onMessageHandler func(client libmqtt.Client, message libmqtt.Message),
clientSubscriptions map[string]byte,
newBackoff func(done <-chan struct{}, init, max time.Duration) backoff.Backoff) func(client libmqtt.Client) {
// The function subscribes the client to the specific topics (with retry backoff in case of failure).
return func(client libmqtt.Client) {
backoff := newBackoff(
Expand Down
36 changes: 18 additions & 18 deletions filebeat/input/mqtt/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@ import (
"github.com/elastic/beats/libbeat/logp"
)

var (
logger = logp.NewLogger("test")
)
var logger = logp.NewLogger("test")

func TestNewInput_MissingConfigField(t *testing.T) {
config := common.MustNewConfigFrom(common.MapStr{
Expand Down Expand Up @@ -75,6 +73,8 @@ func TestNewInput_Run(t *testing.T) {
})

eventsCh := make(chan beat.Event)
defer close(eventsCh)

outlet := &mockedOutleter{
onEventHandler: func(event beat.Event) bool {
eventsCh <- event
Expand Down Expand Up @@ -104,7 +104,7 @@ func TestNewInput_Run(t *testing.T) {
}

var client *mockedClient
newMqttClient = func(o *libmqtt.ClientOptions) libmqtt.Client {
newMqttClient := func(o *libmqtt.ClientOptions) libmqtt.Client {
client = &mockedClient{
onConnectHandler: o.OnConnect,
messages: []mockedMessage{firstMessage, secondMessage},
Expand All @@ -115,7 +115,7 @@ func TestNewInput_Run(t *testing.T) {
return client
}

input, err := NewInput(config, connector, inputContext)
input, err := newInput(config, connector, inputContext, newMqttClient, backoff.NewEqualJitterBackoff)
require.NoError(t, err)
require.NotNil(t, input)

Expand All @@ -137,7 +137,7 @@ func TestNewInput_Run(t *testing.T) {
}
}

func TestNewInput_Run_Stop(t *testing.T) {
func TestNewInput_Run_Wait(t *testing.T) {
config := common.MustNewConfigFrom(common.MapStr{
"hosts": "tcp://mocked:1234",
"topics": []string{"first", "second"},
Expand All @@ -148,7 +148,10 @@ func TestNewInput_Run_Stop(t *testing.T) {

var eventProcessing sync.WaitGroup
eventProcessing.Add(numMessages)

eventsCh := make(chan beat.Event)
defer close(eventsCh)

outlet := &mockedOutleter{
onEventHandler: func(event beat.Event) bool {
eventProcessing.Done()
Expand All @@ -174,7 +177,7 @@ func TestNewInput_Run_Stop(t *testing.T) {
}

var client *mockedClient
newMqttClient = func(o *libmqtt.ClientOptions) libmqtt.Client {
newMqttClient := func(o *libmqtt.ClientOptions) libmqtt.Client {
client = &mockedClient{
onConnectHandler: o.OnConnect,
messages: messages,
Expand All @@ -185,7 +188,7 @@ func TestNewInput_Run_Stop(t *testing.T) {
return client
}

input, err := NewInput(config, connector, inputContext)
input, err := newInput(config, connector, inputContext, newMqttClient, backoff.NewEqualJitterBackoff)
require.NoError(t, err)
require.NotNil(t, input)

Expand All @@ -198,7 +201,7 @@ func TestNewInput_Run_Stop(t *testing.T) {
}
}()

input.Stop()
input.Wait()
}

func TestRun_Once(t *testing.T) {
Expand Down Expand Up @@ -254,11 +257,10 @@ func TestOnCreateHandler_SubscribeMultiple_Succeeded(t *testing.T) {
inputContext := new(finput.Context)
onMessageHandler := func(client libmqtt.Client, message libmqtt.Message) {}
var clientSubscriptions map[string]byte
handler := createOnConnectHandler(logger, inputContext, onMessageHandler, clientSubscriptions)

newBackoff = func(done <-chan struct{}, init, max time.Duration) backoff.Backoff {
newBackoff := func(done <-chan struct{}, init, max time.Duration) backoff.Backoff {
return backoff.NewEqualJitterBackoff(inputContext.Done, time.Nanosecond, 2*time.Nanosecond)
}
handler := createOnConnectHandler(logger, inputContext, onMessageHandler, clientSubscriptions, newBackoff)

client := &mockedClient{
tokens: []libmqtt.Token{&mockedToken{
Expand All @@ -274,11 +276,10 @@ func TestOnCreateHandler_SubscribeMultiple_BackoffSucceeded(t *testing.T) {
inputContext := new(finput.Context)
onMessageHandler := func(client libmqtt.Client, message libmqtt.Message) {}
var clientSubscriptions map[string]byte
handler := createOnConnectHandler(logger, inputContext, onMessageHandler, clientSubscriptions)

newBackoff = func(done <-chan struct{}, init, max time.Duration) backoff.Backoff {
newBackoff := func(done <-chan struct{}, init, max time.Duration) backoff.Backoff {
return backoff.NewEqualJitterBackoff(inputContext.Done, time.Nanosecond, 2*time.Nanosecond)
}
handler := createOnConnectHandler(logger, inputContext, onMessageHandler, clientSubscriptions, newBackoff)

client := &mockedClient{
tokens: []libmqtt.Token{&mockedToken{
Expand All @@ -296,14 +297,13 @@ func TestOnCreateHandler_SubscribeMultiple_BackoffSignalDone(t *testing.T) {
inputContext := new(finput.Context)
onMessageHandler := func(client libmqtt.Client, message libmqtt.Message) {}
var clientSubscriptions map[string]byte
handler := createOnConnectHandler(logger, inputContext, onMessageHandler, clientSubscriptions)

mockedBackoff := &mockedBackoff{
waits: []bool{true, false},
}
newBackoff = func(done <-chan struct{}, init, max time.Duration) backoff.Backoff {
newBackoff := func(done <-chan struct{}, init, max time.Duration) backoff.Backoff {
return mockedBackoff
}
handler := createOnConnectHandler(logger, inputContext, onMessageHandler, clientSubscriptions, newBackoff)

client := &mockedClient{
tokens: []libmqtt.Token{&mockedToken{
Expand Down
170 changes: 170 additions & 0 deletions filebeat/input/mqtt/mqtt_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// +build integration

package mqtt

import (
"fmt"
"os"
"sync"
"testing"
"time"

libmqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/filebeat/channel"
"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
)

const (
message = "hello-world"

waitTimeout = 30 * time.Second
)

var (
hostPort = fmt.Sprintf("tcp://%s:%s",
getOrDefault(os.Getenv("MOSQUITTO_HOST"), "mosquitto"),
getOrDefault(os.Getenv("MOSQUITTO_PORT"), "1883"))
topic = fmt.Sprintf("topic-%d", time.Now().UnixNano())
)

type eventCaptor struct {
c chan struct{}
closeOnce sync.Once
closed bool
events chan beat.Event
}

func newEventCaptor(events chan beat.Event) channel.Outleter {
return &eventCaptor{
c: make(chan struct{}),
events: events,
}
}

func (ec *eventCaptor) OnEvent(event beat.Event) bool {
ec.events <- event
return true
}

func (ec *eventCaptor) Close() error {
ec.closeOnce.Do(func() {
ec.closed = true
close(ec.c)
})
return nil
}

func (ec *eventCaptor) Done() <-chan struct{} {
return ec.c
}

func TestInput(t *testing.T) {
logp.TestingSetup(logp.WithSelectors("mqtt input", "libmqtt"))

// Setup the input config.
config := common.MustNewConfigFrom(common.MapStr{
"hosts": []string{hostPort},
"topics": []string{topic},
})

// Route input events through our captor instead of sending through ES.
eventsCh := make(chan beat.Event)
defer close(eventsCh)

captor := newEventCaptor(eventsCh)
defer captor.Close()

connector := channel.ConnectorFunc(func(_ *common.Config, _ beat.ClientConfig) (channel.Outleter, error) {
return channel.SubOutlet(captor), nil
})

// Mock the context.
inputContext := input.Context{
Done: make(chan struct{}),
BeatDone: make(chan struct{}),
}

// Setup the input
input, err := NewInput(config, connector, inputContext)
require.NoError(t, err)
require.NotNil(t, input)

// Run the input.
input.Run()

// Create Publisher
publisher := createPublisher(t)

// Verify that event has been received
verifiedCh := make(chan struct{})
defer close(verifiedCh)

emitInputData(t, verifiedCh, publisher)

event := <-eventsCh
verifiedCh <- struct{}{}

val, err := event.GetValue("message")
require.NoError(t, err)
require.Equal(t, message, val)
}

func createPublisher(t *testing.T) libmqtt.Client {
clientOptions := libmqtt.NewClientOptions().
SetClientID("emitter").
SetAutoReconnect(false).
SetConnectRetry(false).
AddBroker(hostPort)
client := libmqtt.NewClient(clientOptions)
token := client.Connect()
require.True(t, token.WaitTimeout(waitTimeout))
require.NoError(t, token.Error())
return client
}

func emitInputData(t *testing.T, verifiedCh <-chan struct{}, publisher libmqtt.Client) {
go func() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

for {
select {
case <-verifiedCh:
return
case <-ticker.C:
token := publisher.Publish(topic, 1, false, []byte(message))
require.True(t, token.WaitTimeout(waitTimeout))
require.NoError(t, token.Error())
}
}
}()
}

func getOrDefault(s, defaultString string) string {
if s == "" {
return defaultString
}
return s
}
2 changes: 2 additions & 0 deletions testing/environments/docker/mosquitto/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
FROM eclipse-mosquitto:1.6.8
HEALTHCHECK --interval=1s --retries=600 CMD nc -z localhost 1883

0 comments on commit 3ab3702

Please sign in to comment.