Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: MessageClient can Pub/Sub binary data #364

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions internal/pkg/mqtt/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,3 +342,46 @@ func createClientOptions(

return clientOptions, nil
}

func (mc *Client) PublishBinaryData(data []byte, topic string) error {
optionsReader := mc.mqttClient.OptionsReader()
return getTokenError(
mc.mqttClient.Publish(
topic,
optionsReader.WillQos(),
optionsReader.WillRetained(),
data),
optionsReader.ConnectTimeout(),
PublishOperation,
"Unable to publish message")
}

func (mc *Client) SubscribeBinaryData(topics []types.TopicChannel, messageErrors chan error) error {
optionsReader := mc.mqttClient.OptionsReader()
for _, topic := range topics {
handler := newBinaryDataMessageHandler(topic.Messages)
qos := optionsReader.WillQos()
token := mc.mqttClient.Subscribe(topic.Topic, qos, handler)
err := getTokenError(token, optionsReader.ConnectTimeout(), SubscribeOperation, "Failed to create subscription")
if err != nil {
return err
}
mc.existingSubscriptions[topic.Topic] = existingSubscription{
topic: topic.Topic,
qos: qos,
handler: handler,
errors: messageErrors,
}
}
return nil
}

// newBinaryDataMessageHandler creates a function which propagates the received messages to the proper channel.
func newBinaryDataMessageHandler(messageChannel chan<- types.MessageEnvelope) pahoMqtt.MessageHandler {
return func(client pahoMqtt.Client, message pahoMqtt.Message) {
// Use MessageEnvelope.Payload to store the binary data instead of unmarshalling binary to MessageEnvelope
messageEnvelope := types.NewMessageEnvelopeForRequest(message.Payload(), nil)
messageEnvelope.ReceivedTopic = message.Topic()
messageChannel <- messageEnvelope
}
}
8 changes: 8 additions & 0 deletions internal/pkg/nats/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,3 +223,11 @@ func (c *Client) Disconnect() error {
}
return c.connection.Drain()
}

func (c *Client) PublishBinaryData(data []byte, topic string) error {
return fmt.Errorf("not supported PublishBinaryData func")
}

func (c *Client) SubscribeBinaryData(topics []types.TopicChannel, messageErrors chan error) error {
return fmt.Errorf("not supported SubscribeBinaryData func")
}
8 changes: 8 additions & 0 deletions internal/pkg/noopclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package pkg

import (
"fmt"
"time"

"github.com/edgexfoundry/go-mod-messaging/v4/pkg/types"
Expand Down Expand Up @@ -48,3 +49,10 @@ func (n NoopClient) Subscribe(topics []types.TopicChannel, messageErrors chan er
func (n NoopClient) Disconnect() error {
panic("implement me")
}

func (n NoopClient) PublishBinaryData(data []byte, topic string) error {
return fmt.Errorf("not supported PublishBinaryData func")
}
func (n NoopClient) SubscribeBinaryData(topics []types.TopicChannel, messageErrors chan error) error {
return fmt.Errorf("not supported SubscribeBinaryData func")
}
7 changes: 7 additions & 0 deletions internal/pkg/redis/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,3 +311,10 @@ func convertFromRedisTopicScheme(topic string) string {

return topic
}

func (c Client) PublishBinaryData(data []byte, topic string) error {
return fmt.Errorf("not supported PublishBinaryData func")
}
func (c Client) SubscribeBinaryData(topics []types.TopicChannel, messageErrors chan error) error {
return fmt.Errorf("not supported SubscribeBinaryData func")
}
6 changes: 6 additions & 0 deletions messaging/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ type MessageClient interface {
// the timeout period, a timed out error returned.
Request(message types.MessageEnvelope, requestTopic string, responseTopicPrefix string, timeout time.Duration) (*types.MessageEnvelope, error)

// PublishBinaryData sends binary data to the message bus
PublishBinaryData(data []byte, topic string) error

// SubscribeBinaryData receives binary data from the specified topic, and wrap it in MessageEnvelope.
SubscribeBinaryData(topics []types.TopicChannel, messageErrors chan error) error

// Unsubscribe to unsubscribe from the specified topics.
Unsubscribe(topics ...string) error

Expand Down
76 changes: 69 additions & 7 deletions messaging/mocks/MessageClient.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.