Skip to content

Commit

Permalink
feat: MessageClient can Pub/Sub binary data
Browse files Browse the repository at this point in the history
MessageClient can Pub/Sub binary data without MessageEnvelope wrapped.

Signed-off-by: bruce <weichou1229@gmail.com>
  • Loading branch information
weichou1229 committed Oct 25, 2024
1 parent 2868ca7 commit 754bb1b
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 7 deletions.
43 changes: 43 additions & 0 deletions internal/pkg/mqtt/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,3 +342,46 @@ func createClientOptions(

return clientOptions, nil
}

func (mc *Client) PublishBinaryData(data []byte, topic string) error {
optionsReader := mc.mqttClient.OptionsReader()
return getTokenError(
mc.mqttClient.Publish(
topic,
optionsReader.WillQos(),
optionsReader.WillRetained(),
data),
optionsReader.ConnectTimeout(),
PublishOperation,
"Unable to publish message")
}

func (mc *Client) SubscribeBinaryData(topics []types.TopicChannel, messageErrors chan error) error {
optionsReader := mc.mqttClient.OptionsReader()
for _, topic := range topics {
handler := newBinaryDataMessageHandler(topic.Messages)
qos := optionsReader.WillQos()
token := mc.mqttClient.Subscribe(topic.Topic, qos, handler)
err := getTokenError(token, optionsReader.ConnectTimeout(), SubscribeOperation, "Failed to create subscription")
if err != nil {
return err
}
mc.existingSubscriptions[topic.Topic] = existingSubscription{
topic: topic.Topic,
qos: qos,
handler: handler,
errors: messageErrors,
}
}
return nil
}

// newBinaryDataMessageHandler creates a function which propagates the received messages to the proper channel.
func newBinaryDataMessageHandler(messageChannel chan<- types.MessageEnvelope) pahoMqtt.MessageHandler {
return func(client pahoMqtt.Client, message pahoMqtt.Message) {
// Use MessageEnvelope.Payload to store the binary data instead of unmarshalling binary to MessageEnvelope
messageEnvelope := types.NewMessageEnvelopeForRequest(message.Payload(), nil)
messageEnvelope.ReceivedTopic = message.Topic()
messageChannel <- messageEnvelope
}
}
8 changes: 8 additions & 0 deletions internal/pkg/nats/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,3 +223,11 @@ func (c *Client) Disconnect() error {
}
return c.connection.Drain()
}

func (c *Client) PublishBinaryData(data []byte, topic string) error {
return fmt.Errorf("not supported PublishBinaryData func")
}

func (c *Client) SubscribeBinaryData(topics []types.TopicChannel, messageErrors chan error) error {
return fmt.Errorf("not supported SubscribeBinaryData func")
}
8 changes: 8 additions & 0 deletions internal/pkg/noopclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package pkg

import (
"fmt"
"time"

"github.com/edgexfoundry/go-mod-messaging/v4/pkg/types"
Expand Down Expand Up @@ -48,3 +49,10 @@ func (n NoopClient) Subscribe(topics []types.TopicChannel, messageErrors chan er
func (n NoopClient) Disconnect() error {
panic("implement me")
}

func (n NoopClient) PublishBinaryData(data []byte, topic string) error {
return fmt.Errorf("not supported PublishBinaryData func")
}
func (n NoopClient) SubscribeBinaryData(topics []types.TopicChannel, messageErrors chan error) error {
return fmt.Errorf("not supported SubscribeBinaryData func")
}
7 changes: 7 additions & 0 deletions internal/pkg/redis/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,3 +311,10 @@ func convertFromRedisTopicScheme(topic string) string {

return topic
}

func (c Client) PublishBinaryData(data []byte, topic string) error {
return fmt.Errorf("not supported PublishBinaryData func")
}
func (c Client) SubscribeBinaryData(topics []types.TopicChannel, messageErrors chan error) error {
return fmt.Errorf("not supported SubscribeBinaryData func")
}
6 changes: 6 additions & 0 deletions messaging/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ type MessageClient interface {
// the timeout period, a timed out error returned.
Request(message types.MessageEnvelope, requestTopic string, responseTopicPrefix string, timeout time.Duration) (*types.MessageEnvelope, error)

// PublishBinaryData sends binary data to the message bus
PublishBinaryData(data []byte, topic string) error

// SubscribeBinaryData receives binary data from the specified topic, and wrap it in MessageEnvelope.
SubscribeBinaryData(topics []types.TopicChannel, messageErrors chan error) error

// Unsubscribe to unsubscribe from the specified topics.
Unsubscribe(topics ...string) error

Expand Down
76 changes: 69 additions & 7 deletions messaging/mocks/MessageClient.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 754bb1b

Please sign in to comment.