Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(inputs.mqtt_consumer): Implement startup error behaviors #15486

Merged
merged 14 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions plugins/inputs/mqtt_consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,20 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.

[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins

## Startup error behavior options <!-- @/docs/includes/startup_error_behavior.md -->

In addition to the plugin-specific and global configuration settings the plugin
supports options for specifying the behavior when experiencing startup errors
using the `startup_error_behavior` setting. Available values are:

- `error`: Telegraf with stop and exit in case of startup errors. This is the
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- `error`: Telegraf with stop and exit in case of startup errors. This is the
- `error`: Telegraf will stop and exit in case of startup errors. This is the

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@da-phil could you please put up a PR against docs/includes/startup_error_behavior.md as this is included here and run make docs in the telegraf root dir!?

default behavior.
- `ignore`: Telegraf will ignore startup errors for this plugin and disables it
but continues processing for all other plugins.
- `retry`: Telegraf will try to startup the plugin in every gather or write
cycle in case of startup errors. The plugin is disabled until
srebhan marked this conversation as resolved.
Show resolved Hide resolved
the startup succeeds.

## Secret-store support

This plugin supports secrets from secret-stores for the `username` and
Expand Down Expand Up @@ -67,6 +81,13 @@ to use them.
## Connection timeout for initial connection in seconds
# connection_timeout = "30s"

## Interval and ping timeout for keep-alive messages
## The sum of those options defines when a connection loss is detected.
## Note: The keep-alive interval needs to be greater or equal one second and
## fractions of a second are not supported.
# keepalive = "60s"
# ping_timeout = "10s"

## Max undelivered messages
## This plugin uses tracking metrics, which ensure messages are read to
## outputs before acknowledging them to the original broker to ensure data
Expand Down
51 changes: 28 additions & 23 deletions plugins/inputs/mqtt_consumer/mqtt_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/eclipse/paho.mqtt.golang/packets"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
Expand All @@ -32,21 +33,15 @@ var (
defaultMaxUndeliveredMessages = 1000
)

type ConnectionState int
type empty struct{}
type semaphore chan empty

const (
Disconnected ConnectionState = iota
Connecting
Connected
)

type Client interface {
Connect() mqtt.Token
SubscribeMultiple(filters map[string]byte, callback mqtt.MessageHandler) mqtt.Token
AddRoute(topic string, callback mqtt.MessageHandler)
Disconnect(quiesce uint)
IsConnected() bool
}

type ClientFactory func(o *mqtt.ClientOptions) Client
Expand All @@ -73,6 +68,8 @@ type MQTTConsumer struct {
Password config.Secret `toml:"password"`
QoS int `toml:"qos"`
ConnectionTimeout config.Duration `toml:"connection_timeout"`
KeepAliveInterval config.Duration `toml:"keepalive"`
PingTimeout config.Duration `toml:"ping_timeout"`
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
PersistentSession bool `toml:"persistent_session"`
ClientID string `toml:"client_id"`
Expand All @@ -84,7 +81,6 @@ type MQTTConsumer struct {
client Client
opts *mqtt.ClientOptions
acc telegraf.TrackingAccumulator
state ConnectionState
sem semaphore
messages map[telegraf.TrackingID]mqtt.Message
messagesMutex sync.Mutex
Expand All @@ -104,7 +100,6 @@ func (m *MQTTConsumer) SetParser(parser telegraf.Parser) {
m.parser = parser
}
func (m *MQTTConsumer) Init() error {
m.state = Disconnected
if m.PersistentSession && m.ClientID == "" {
return errors.New("persistent_session requires client_id")
}
Expand Down Expand Up @@ -155,7 +150,6 @@ func (m *MQTTConsumer) Init() error {
return nil
}
func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error {
m.state = Disconnected
m.acc = acc.WithTracking(m.MaxUndeliveredMessages)
m.sem = make(semaphore, m.MaxUndeliveredMessages)
m.ctx, m.cancel = context.WithCancel(context.Background())
Expand All @@ -176,7 +170,6 @@ func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error {
return m.connect()
}
func (m *MQTTConsumer) connect() error {
m.state = Connecting
m.client = m.clientFactory(m.opts)
// AddRoute sets up the function for handling messages. These need to be
// added in case we find a persistent session containing subscriptions so we
Expand All @@ -187,12 +180,22 @@ func (m *MQTTConsumer) connect() error {
}
token := m.client.Connect()
if token.Wait() && token.Error() != nil {
err := token.Error()
m.state = Disconnected
return err
if ct, ok := token.(*mqtt.ConnectToken); ok && ct.ReturnCode() == packets.ErrNetworkError {
// Network errors might be retryable, stop the metric-tracking
// goroutine and return a retryable error.
if m.cancel != nil {
m.cancel()
m.cancel = nil
}
return &internal.StartupError{
Err: token.Error(),
Retry: true,
}
}
return token.Error()
}
m.Log.Infof("Connected %v", m.Servers)
m.state = Connected

// Persistent sessions should skip subscription if a session is present, as
// the subscriptions are stored by the server.
type sessionPresent interface {
Expand All @@ -218,7 +221,6 @@ func (m *MQTTConsumer) onConnectionLost(_ mqtt.Client, err error) {
m.client.Disconnect(5)
m.acc.AddError(fmt.Errorf("connection lost: %w", err))
m.Log.Debugf("Disconnected %v", m.Servers)
m.state = Disconnected
}

// compareTopics is used to support the mqtt wild card `+` which allows for one topic of any value
Expand Down Expand Up @@ -321,16 +323,17 @@ func (m *MQTTConsumer) onMessage(_ mqtt.Client, msg mqtt.Message) {
m.messagesMutex.Unlock()
}
func (m *MQTTConsumer) Stop() {
if m.state == Connected {
if m.client.IsConnected() {
m.Log.Debugf("Disconnecting %v", m.Servers)
m.client.Disconnect(200)
m.Log.Debugf("Disconnected %v", m.Servers)
m.state = Disconnected
}
m.cancel()
if m.cancel != nil {
m.cancel()
}
}
func (m *MQTTConsumer) Gather(_ telegraf.Accumulator) error {
if m.state == Disconnected {
if !m.client.IsConnected() {
m.Log.Debugf("Connecting %v", m.Servers)
return m.connect()
}
Expand Down Expand Up @@ -388,7 +391,8 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) {
opts.AddBroker(server)
}
opts.SetAutoReconnect(false)
opts.SetKeepAlive(time.Second * 60)
opts.SetKeepAlive(time.Duration(m.KeepAliveInterval))
opts.SetPingTimeout(time.Duration(m.PingTimeout))
opts.SetCleanSession(!m.PersistentSession)
opts.SetAutoAckDisabled(m.PersistentSession)
opts.SetConnectionLostHandler(m.onConnectionLost)
Expand Down Expand Up @@ -449,10 +453,11 @@ func typeConvert(types map[string]string, topicValue string, key string) (interf
func New(factory ClientFactory) *MQTTConsumer {
return &MQTTConsumer{
Servers: []string{"tcp://127.0.0.1:1883"},
ConnectionTimeout: defaultConnectionTimeout,
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
ConnectionTimeout: defaultConnectionTimeout,
KeepAliveInterval: config.Duration(60 * time.Second),
PingTimeout: config.Duration(10 * time.Second),
clientFactory: factory,
state: Disconnected,
}
}
func init() {
Expand Down
Loading
Loading