Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: MessageClient can Pub/Sub binary data #364

Merged
merged 1 commit into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 60 additions & 24 deletions internal/pkg/mqtt/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ type MessageMarshaller func(v interface{}) ([]byte, error)
// MessageUnmarshaller defines the function signature for unmarshaling []byte into structs.
type MessageUnmarshaller func(data []byte, v interface{}) error

type MessageHandlerCreator func(unmarshaler MessageUnmarshaller,
messageChannel chan<- types.MessageEnvelope, errorChannel chan<- error) pahoMqtt.MessageHandler

// Client facilitates communication to an MQTT server and provides functionality needed to send and receive MQTT
// messages.
type Client struct {
Expand Down Expand Up @@ -159,30 +162,7 @@ func (mc *Client) Publish(message types.MessageEnvelope, topic string) error {

// Subscribe creates a subscription for the specified topics.
func (mc *Client) Subscribe(topics []types.TopicChannel, messageErrors chan error) error {
optionsReader := mc.mqttClient.OptionsReader()

mc.subscriptionMutex.Lock()
defer mc.subscriptionMutex.Unlock()

for _, topic := range topics {
handler := newMessageHandler(mc.unmarshaller, topic.Messages, messageErrors)
qos := optionsReader.WillQos()

token := mc.mqttClient.Subscribe(topic.Topic, qos, handler)
err := getTokenError(token, optionsReader.ConnectTimeout(), SubscribeOperation, "Failed to create subscription")
if err != nil {
return err
}

mc.existingSubscriptions[topic.Topic] = existingSubscription{
topic: topic.Topic,
qos: qos,
handler: handler,
errors: messageErrors,
}
}

return nil
return mc.subscribe(topics, messageErrors, newMessageHandler)
}

// Request publishes a request and waits for a response
Expand Down Expand Up @@ -342,3 +322,59 @@ func createClientOptions(

return clientOptions, nil
}

func (mc *Client) PublishBinaryData(data []byte, topic string) error {
optionsReader := mc.mqttClient.OptionsReader()
return getTokenError(
mc.mqttClient.Publish(
topic,
optionsReader.WillQos(),
optionsReader.WillRetained(),
data),
optionsReader.ConnectTimeout(),
PublishOperation,
"Unable to publish message")
}

func (mc *Client) SubscribeBinaryData(topics []types.TopicChannel, messageErrors chan error) error {
return mc.subscribe(topics, messageErrors, newBinaryDataMessageHandler)
}

// newBinaryDataMessageHandler creates a function which propagates the received messages to the proper channel.
func newBinaryDataMessageHandler(_ MessageUnmarshaller,
messageChannel chan<- types.MessageEnvelope,
_ chan<- error) pahoMqtt.MessageHandler {
return func(client pahoMqtt.Client, message pahoMqtt.Message) {
// Use MessageEnvelope.Payload to store the binary data instead of unmarshalling binary to MessageEnvelope
messageEnvelope := types.NewMessageEnvelopeForRequest(message.Payload(), nil)
messageEnvelope.ReceivedTopic = message.Topic()
messageChannel <- messageEnvelope
}
}

func (mc *Client) subscribe(topics []types.TopicChannel, messageErrors chan error, messageHandlerCreator MessageHandlerCreator) error {
optionsReader := mc.mqttClient.OptionsReader()

mc.subscriptionMutex.Lock()
defer mc.subscriptionMutex.Unlock()

for _, topic := range topics {
handler := messageHandlerCreator(mc.unmarshaller, topic.Messages, messageErrors)
qos := optionsReader.WillQos()

token := mc.mqttClient.Subscribe(topic.Topic, qos, handler)
err := getTokenError(token, optionsReader.ConnectTimeout(), SubscribeOperation, "Failed to create subscription")
if err != nil {
return err
}

mc.existingSubscriptions[topic.Topic] = existingSubscription{
cloudxxx8 marked this conversation as resolved.
Show resolved Hide resolved
topic: topic.Topic,
qos: qos,
handler: handler,
errors: messageErrors,
}
}

return nil
}
8 changes: 8 additions & 0 deletions internal/pkg/nats/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,3 +223,11 @@ func (c *Client) Disconnect() error {
}
return c.connection.Drain()
}

func (c *Client) PublishBinaryData(data []byte, topic string) error {
return fmt.Errorf("not supported PublishBinaryData func")
}

func (c *Client) SubscribeBinaryData(topics []types.TopicChannel, messageErrors chan error) error {
return fmt.Errorf("not supported SubscribeBinaryData func")
}
8 changes: 8 additions & 0 deletions internal/pkg/noopclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package pkg

import (
"fmt"
"time"

"github.com/edgexfoundry/go-mod-messaging/v4/pkg/types"
Expand Down Expand Up @@ -48,3 +49,10 @@ func (n NoopClient) Subscribe(topics []types.TopicChannel, messageErrors chan er
func (n NoopClient) Disconnect() error {
panic("implement me")
}

func (n NoopClient) PublishBinaryData(data []byte, topic string) error {
return fmt.Errorf("not supported PublishBinaryData func")
}
func (n NoopClient) SubscribeBinaryData(topics []types.TopicChannel, messageErrors chan error) error {
return fmt.Errorf("not supported SubscribeBinaryData func")
}
7 changes: 7 additions & 0 deletions internal/pkg/redis/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,3 +311,10 @@ func convertFromRedisTopicScheme(topic string) string {

return topic
}

func (c Client) PublishBinaryData(data []byte, topic string) error {
return fmt.Errorf("not supported PublishBinaryData func")
}
func (c Client) SubscribeBinaryData(topics []types.TopicChannel, messageErrors chan error) error {
return fmt.Errorf("not supported SubscribeBinaryData func")
}
6 changes: 6 additions & 0 deletions messaging/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ type MessageClient interface {
// the timeout period, a timed out error returned.
Request(message types.MessageEnvelope, requestTopic string, responseTopicPrefix string, timeout time.Duration) (*types.MessageEnvelope, error)

// PublishBinaryData sends binary data to the message bus
PublishBinaryData(data []byte, topic string) error

// SubscribeBinaryData receives binary data from the specified topic, and wrap it in MessageEnvelope.
SubscribeBinaryData(topics []types.TopicChannel, messageErrors chan error) error

// Unsubscribe to unsubscribe from the specified topics.
Unsubscribe(topics ...string) error

Expand Down
76 changes: 69 additions & 7 deletions messaging/mocks/MessageClient.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.