Skip to content

Commit

Permalink
feat: Add MQTT and ZeroMQ channels support to support-notification
Browse files Browse the repository at this point in the history
close #5007

Signed-off-by: Ginny Guan <ginny@iotechsys.com>
  • Loading branch information
jinlinGuan committed Jan 6, 2025
1 parent 027efad commit e824200
Show file tree
Hide file tree
Showing 14 changed files with 553 additions and 8 deletions.
3 changes: 3 additions & 0 deletions Attribution.txt
Original file line number Diff line number Diff line change
Expand Up @@ -382,3 +382,6 @@ https://github.com/go-jose/go-jose/blob/v2.6.3/LICENSE

nhooyr.io/websocket (ISC License) - https://github.com/nhooyr/websocket
https://github.com/nhooyr/websocket/blob/master/LICENSE.txt

github.com/pebbe/zmq4 (BSD-2) https://github.com/pebbe/zmq4
https://github.com/pebbe/zmq4/blob/master/LICENSE.txt
4 changes: 2 additions & 2 deletions cmd/support-notifications/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ ARG ADD_BUILD_TAGS=""

WORKDIR /edgex-go

RUN apk add --update --no-cache make bash git ca-certificates
RUN apk add --update --no-cache make bash git ca-certificates zeromq-dev

COPY go.mod vendor* ./
RUN [ ! -d "vendor" ] && go mod download all || echo "skipping..."
Expand All @@ -33,7 +33,7 @@ RUN make cmd/support-notifications/support-notifications

FROM alpine:3.20

RUN apk add --update --no-cache ca-certificates dumb-init
RUN apk add --update --no-cache ca-certificates dumb-init zeromq
# Ensure using latest versions of all installed packages to avoid any recent CVEs
RUN apk --no-cache upgrade

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438
github.com/jackc/pgx/v5 v5.7.1
github.com/labstack/echo/v4 v4.13.3
github.com/pebbe/zmq4 v1.2.11
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475
github.com/robfig/cron/v3 v3.0.1
github.com/spf13/cast v1.7.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,8 @@ github.com/orcaman/concurrent-map/v2 v2.0.1/go.mod h1:9Eq3TG2oBe5FirmYWQfYO5iH1q
github.com/parallaxsecond/parsec-client-go v0.0.0-20221025095442-f0a77d263cf9 h1:mOvehYivJ4Aqu2CPe3D3lv8jhqOI9/1o0THxJHBE0qw=
github.com/parallaxsecond/parsec-client-go v0.0.0-20221025095442-f0a77d263cf9/go.mod h1:gLH27qo/dvMhLTVVyMELpe3Tut7sOfkiDg7ZpeqKwsw=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pebbe/zmq4 v1.2.11 h1:Ua5mgIaZeabUGnH7tqswkUcjkL7JYGai5e8v4hpEU9Q=
github.com/pebbe/zmq4 v1.2.11/go.mod h1:nqnPueOapVhE2wItZ0uOErngczsJdLOGkebMxaO8r48=
github.com/pelletier/go-toml v1.9.3/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down
18 changes: 17 additions & 1 deletion internal/support/notifications/application/channel/container.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright (C) 2021 IOTech Ltd
// Copyright (C) 2021-2025 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

Expand All @@ -15,6 +15,12 @@ var RESTSenderName = di.TypeInstanceToName(RESTSender{})
// EmailSenderName contains the name of the channel.EmailSender implementation in the DIC.
var EmailSenderName = di.TypeInstanceToName(EmailSender{})

// MQTTSenderName contains the name of the channel.MQTTSender implementation in the DIC.
var MQTTSenderName = di.TypeInstanceToName(MQTTSender{})

// ZeroMQTSenderName contains the name of the channel.ZeroMQSender implementation in the DIC.
var ZeroMQTSenderName = di.TypeInstanceToName(ZeroMQSender{})

// RESTSenderFrom helper function queries the DIC and returns the channel.Sender implementation.
func RESTSenderFrom(get di.Get) Sender {
return get(RESTSenderName).(Sender)
Expand All @@ -24,3 +30,13 @@ func RESTSenderFrom(get di.Get) Sender {
func EmailSenderFrom(get di.Get) Sender {
return get(EmailSenderName).(Sender)
}

// MQTTSenderFrom helper function queries the DIC and returns the channel.Sender implementation.
func MQTTSenderFrom(get di.Get) Sender {
return get(MQTTSenderName).(Sender)
}

// ZeroMQSenderFrom helper function queries the DIC and returns the channel.Sender implementation.
func ZeroMQSenderFrom(get di.Get) Sender {
return get(ZeroMQTSenderName).(Sender)
}
149 changes: 149 additions & 0 deletions internal/support/notifications/application/channel/mqtt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Copyright (C) 2024-2025 IOTech Ltd

package channel

import (
"crypto/tls"
"crypto/x509"
"fmt"
"net/url"
"strings"
"time"

bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v4/bootstrap/container"
"github.com/edgexfoundry/go-mod-bootstrap/v4/bootstrap/messaging"
"github.com/edgexfoundry/go-mod-core-contracts/v4/common"
"github.com/edgexfoundry/go-mod-core-contracts/v4/errors"
"github.com/edgexfoundry/go-mod-core-contracts/v4/models"

mqtt "github.com/eclipse/paho.mqtt.golang"
)

const (
WaitDuration = 3 * time.Second
)

// prepareMqttClient creates a new client or load the exist client from cache
func (sender *MQTTSender) prepareMqttClient(address models.MQTTPubAddress) (mqtt.Client, errors.EdgeX) {
client := sender.loadClient(address)
if client != nil {
return client, nil
}

client, err := sender.createClient(address)
if err != nil {
return nil, errors.NewCommonEdgeXWrapper(err)
}

return client, nil
}

func (sender *MQTTSender) cacheKey(publisher string, host string, port int) string {
return fmt.Sprintf("%s:%s:%d", publisher, host, port)
}

func (sender *MQTTSender) loadClient(address models.MQTTPubAddress) mqtt.Client {
sender.mutex.RLock()
defer sender.mutex.RUnlock()
key := sender.cacheKey(address.Publisher, address.Host, address.Port)
mqttClient, ok := sender.clientCache[key]
if ok {
return mqttClient
}
return nil
}

// createMqttClient creates a new MQTT client
// The implementation can refer to https://github.com/edgexfoundry/app-functions-sdk-go/blob/1bc0c5a6f3d13f883f4b71f940f0cb2168d0daab/pkg/secure/mqttfactory.go#L58
func (sender *MQTTSender) createClient(address models.MQTTPubAddress) (mqtt.Client, errors.EdgeX) {
sender.mutex.Lock()
defer sender.mutex.Unlock()

// Check the cache before creating new MQTT client
key := sender.cacheKey(address.Publisher, address.Host, address.Port)
mqttClient, ok := sender.clientCache[key]
if ok {
return mqttClient, nil
}

scheme := common.TCP
if address.Scheme != "" {
scheme = address.Scheme
}
brokerUrl := &url.URL{
Scheme: strings.ToLower(scheme),
Host: fmt.Sprintf("%s:%d", address.Host, address.Port),
}
opts := mqtt.NewClientOptions()
opts.SetAutoReconnect(true)
opts.SetClientID(address.Publisher)
opts.Servers = []*url.URL{brokerUrl}

secretProvider := bootstrapContainer.SecretProviderFrom(sender.dic.Get)

//get the secrets from the secret provider and populate the struct
secretData, err := messaging.GetSecretData(address.AuthMode, address.SecretPath, secretProvider)
if err != nil {
return nil, errors.NewCommonEdgeXWrapper(err)
}
//ensure that the authmode selected has the required secret values
if secretData != nil {
err = messaging.ValidateSecretData(address.AuthMode, address.SecretPath, secretData)
if err != nil {
return nil, errors.NewCommonEdgeXWrapper(err)
}
// configure the mqtt client with the retrieved secret values
err = configureMQTTClientForAuth(address, opts, secretData)
if err != nil {
return nil, errors.NewCommonEdgeXWrapper(err)
}
}

client := mqtt.NewClient(opts)
token := client.Connect()
if token.WaitTimeout(WaitDuration) && token.Error() != nil {
return client, errors.NewCommonEdgeX(errors.KindServerError, fmt.Sprintf("fail to connect the MQTT broker, %v", token.Error()), nil)
}

sender.clientCache[key] = client

return client, nil
}

func configureMQTTClientForAuth(address models.MQTTPubAddress, options *mqtt.ClientOptions, secretData *messaging.SecretData) errors.EdgeX {
caCertPool := x509.NewCertPool()
tlsConfig := &tls.Config{
// nolint: gosec
InsecureSkipVerify: address.SkipCertVerify,
}

switch address.AuthMode {
case messaging.AuthModeUsernamePassword:
options.SetUsername(secretData.Username)
options.SetPassword(secretData.Password)
case messaging.AuthModeCert:
// Expect user set require_certificate and use_identity_as_username to true, which is assumed that only authenticated clients have valid certificates
// This authentication usage can refer to https://mosquitto.org/man/mosquitto-conf-5.html
cert, err := tls.X509KeyPair(secretData.CertPemBlock, secretData.KeyPemBlock)
if err != nil {
return errors.NewCommonEdgeXWrapper(err)
}
tlsConfig.Certificates = []tls.Certificate{cert}
case messaging.AuthModeCA:
// Nothing to do here for this option
case messaging.AuthModeNone:
return nil
}

if len(secretData.CaPemBlock) > 0 {
ok := caCertPool.AppendCertsFromPEM(secretData.CaPemBlock)
if !ok {
return errors.NewCommonEdgeX(errors.KindServerError, "Error parsing CA PEM block", nil)
}
tlsConfig.RootCAs = caCertPool
}

options.SetTLSConfig(tlsConfig)

return nil
}
Loading

0 comments on commit e824200

Please sign in to comment.