Skip to content

Commit

Permalink
Merge pull request #131 from alsm/topics
Browse files Browse the repository at this point in the history
Use slice for Subscriptions instead of map
  • Loading branch information
alsm authored Jun 13, 2023
2 parents 42b88eb + 5591c9b commit c22f806
Show file tree
Hide file tree
Showing 17 changed files with 55 additions and 62 deletions.
4 changes: 2 additions & 2 deletions autopaho/cmd/rpc/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ func getCmConfig(cfg config) autopaho.ClientConfig {
fmt.Println("mqtt connection up")
ctx, _ := context.WithTimeout(context.Background(), time.Duration(5*time.Second))
if _, err := cm.Subscribe(ctx, &paho.Subscribe{
Subscriptions: map[string]paho.SubscribeOptions{
cfg.topic: {QoS: cfg.qos},
Subscriptions: []paho.SubscribeOptions{
{Topic: cfg.topic, QoS: cfg.qos},
},
}); err != nil {
fmt.Printf("failed to subscribe (%s). This is likely to mean no messages will be received.", err)
Expand Down
4 changes: 2 additions & 2 deletions autopaho/cmd/rpc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ func listener(rTopic string) {
})

_, err = cm.Subscribe(ctx, &paho.Subscribe{
Subscriptions: map[string]paho.SubscribeOptions{
rTopic: {QoS: 0},
Subscriptions: []paho.SubscribeOptions{
{Topic: rTopic, QoS: 0},
},
})
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions autopaho/examples/docker/subscriber/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ func main() {
OnConnectionUp: func(cm *autopaho.ConnectionManager, connAck *paho.Connack) {
fmt.Println("mqtt connection up")
if _, err := cm.Subscribe(context.Background(), &paho.Subscribe{
Subscriptions: map[string]paho.SubscribeOptions{
cfg.topic: {QoS: cfg.qos},
Subscriptions: []paho.SubscribeOptions{
{Topic: cfg.topic, QoS: cfg.qos},
},
}); err != nil {
fmt.Printf("failed to subscribe (%s). This is likely to mean no messages will be received.", err)
Expand Down
4 changes: 2 additions & 2 deletions autopaho/extensions/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ func NewHandler(ctx context.Context, opts HandlerOpts) (*Handler, error) {
opts.Router.RegisterHandler(h.responseTopic, h.responseHandler)

_, err := opts.Conn.Subscribe(ctx, &paho.Subscribe{
Subscriptions: map[string]paho.SubscribeOptions{
h.responseTopic: {QoS: 1},
Subscriptions: []paho.SubscribeOptions{
{Topic: h.responseTopic, QoS: 1},
},
})
if err != nil {
Expand Down
10 changes: 2 additions & 8 deletions packets/packets.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,7 @@ func NewControlPacket(t byte) *ControlPacket {
cp.Content = &Pubcomp{Properties: &Properties{}}
case SUBSCRIBE:
cp.Flags = 2
cp.Content = &Subscribe{
Subscriptions: make(map[string]SubOptions),
Properties: &Properties{},
}
cp.Content = &Subscribe{Properties: &Properties{}}
case SUBACK:
cp.Content = &Suback{Properties: &Properties{}}
case UNSUBSCRIBE:
Expand Down Expand Up @@ -220,10 +217,7 @@ func ReadPacket(r io.Reader) (*ControlPacket, error) {
cp.Content = &Pubcomp{Properties: &Properties{}}
case SUBSCRIBE:
cp.Flags = 2
cp.Content = &Subscribe{
Subscriptions: make(map[string]SubOptions),
Properties: &Properties{},
}
cp.Content = &Subscribe{Properties: &Properties{}}
case SUBACK:
cp.Content = &Suback{Properties: &Properties{}}
case UNSUBSCRIBE:
Expand Down
5 changes: 1 addition & 4 deletions packets/packets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,7 @@ func TestNewControlPacket(t *testing.T) {
args: SUBSCRIBE,
want: &ControlPacket{
FixedHeader: FixedHeader{Type: SUBSCRIBE, Flags: 2},
Content: &Subscribe{
Properties: &Properties{},
Subscriptions: make(map[string]SubOptions),
},
Content: &Subscribe{Properties: &Properties{}},
},
},
{
Expand Down
5 changes: 2 additions & 3 deletions packets/pingreq.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package packets

import (
"bytes"
"fmt"
"io"
"net"
)
Expand All @@ -12,10 +11,10 @@ type Pingreq struct {
}

func (p *Pingreq) String() string {
return fmt.Sprintf("PINGREQ")
return "PINGREQ"
}

//Unpack is the implementation of the interface required function for a packet
// Unpack is the implementation of the interface required function for a packet
func (p *Pingreq) Unpack(r *bytes.Buffer) error {
return nil
}
Expand Down
5 changes: 2 additions & 3 deletions packets/pingresp.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package packets

import (
"bytes"
"fmt"
"io"
"net"
)
Expand All @@ -12,10 +11,10 @@ type Pingresp struct {
}

func (p *Pingresp) String() string {
return fmt.Sprintf("PINGRESP")
return "PINGRESP"
}

//Unpack is the implementation of the interface required function for a packet
// Unpack is the implementation of the interface required function for a packet
func (p *Pingresp) Unpack(r *bytes.Buffer) error {
return nil
}
Expand Down
14 changes: 8 additions & 6 deletions packets/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@ import (
// Subscribe is the Variable Header definition for a Subscribe control packet
type Subscribe struct {
Properties *Properties
Subscriptions map[string]SubOptions
Subscriptions []SubOptions
PacketID uint16
}

func (s *Subscribe) String() string {
var b strings.Builder

fmt.Fprintf(&b, "SUBSCRIBE: PacketID:%d Subscriptions:\n", s.PacketID)
for sub, o := range s.Subscriptions {
fmt.Fprintf(&b, "\t%s: QOS:%d RetainHandling:%X NoLocal:%t RetainAsPublished:%t\n", sub, o.QoS, o.RetainHandling, o.NoLocal, o.RetainAsPublished)
for _, o := range s.Subscriptions {
fmt.Fprintf(&b, "\t%s: QOS:%d RetainHandling:%X NoLocal:%t RetainAsPublished:%t\n", o.Topic, o.QoS, o.RetainHandling, o.NoLocal, o.RetainAsPublished)
}
fmt.Fprintf(&b, "Properties:\n%s", s.Properties)

Expand All @@ -29,6 +29,7 @@ func (s *Subscribe) String() string {

// SubOptions is the struct representing the options for a subscription
type SubOptions struct {
Topic string
QoS byte
RetainHandling byte
NoLocal bool
Expand Down Expand Up @@ -87,7 +88,8 @@ func (s *Subscribe) Unpack(r *bytes.Buffer) error {
if err = so.Unpack(r); err != nil {
return err
}
s.Subscriptions[t] = so
so.Topic = t
s.Subscriptions = append(s.Subscriptions, so)
}

return nil
Expand All @@ -98,8 +100,8 @@ func (s *Subscribe) Buffers() net.Buffers {
var b bytes.Buffer
writeUint16(s.PacketID, &b)
var subs bytes.Buffer
for t, o := range s.Subscriptions {
writeString(t, &subs)
for _, o := range s.Subscriptions {
writeString(o.Topic, &subs)
subs.WriteByte(o.Pack())
}
idvp := s.Properties.Pack(SUBSCRIBE)
Expand Down
12 changes: 6 additions & 6 deletions paho/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,20 +623,20 @@ func (c *Client) Authenticate(ctx context.Context, a *Auth) (*AuthResponse, erro
// is returned from the function, along with any errors.
func (c *Client) Subscribe(ctx context.Context, s *Subscribe) (*Suback, error) {
if !c.serverProps.WildcardSubAvailable {
for t := range s.Subscriptions {
if strings.ContainsAny(t, "#+") {
for _, sub := range s.Subscriptions {
if strings.ContainsAny(sub.Topic, "#+") {
// Using a wildcard in a subscription when not supported
return nil, fmt.Errorf("cannot subscribe to %s, server does not support wildcards", t)
return nil, fmt.Errorf("cannot subscribe to %s, server does not support wildcards", sub.Topic)
}
}
}
if !c.serverProps.SubIDAvailable && s.Properties != nil && s.Properties.SubscriptionIdentifier != nil {
return nil, fmt.Errorf("cannot send subscribe with subID set, server does not support subID")
}
if !c.serverProps.SharedSubAvailable {
for t := range s.Subscriptions {
if strings.HasPrefix(t, "$share") {
return nil, fmt.Errorf("cannont subscribe to %s, server does not support shared subscriptions", t)
for _, sub := range s.Subscriptions {
if strings.HasPrefix(sub.Topic, "$share") {
return nil, fmt.Errorf("cannont subscribe to %s, server does not support shared subscriptions", sub.Topic)
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions paho/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,10 @@ func TestClientSubscribe(t *testing.T) {
go c.PingHandler.Start(c.Conn, 30*time.Second)

s := &Subscribe{
Subscriptions: map[string]SubscribeOptions{
"test/1": {QoS: 1},
"test/2": {QoS: 2},
"test/3": {QoS: 0},
Subscriptions: []SubscribeOptions{
{Topic: "test/1", QoS: 1},
{Topic: "test/2", QoS: 2},
{Topic: "test/3", QoS: 0},
},
}

Expand Down
4 changes: 2 additions & 2 deletions paho/cmd/chat/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ func main() {
}()

if _, err := c.Subscribe(context.Background(), &paho.Subscribe{
Subscriptions: map[string]paho.SubscribeOptions{
*topic: {QoS: byte(*qos), NoLocal: true},
Subscriptions: []paho.SubscribeOptions{
{Topic: *topic, QoS: byte(*qos), NoLocal: true},
},
}); err != nil {
log.Fatalln(err)
Expand Down
4 changes: 2 additions & 2 deletions paho/cmd/rpc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ func listener(server, rTopic, username, password string) {
fmt.Printf("Connected to %s\n", server)

_, err = c.Subscribe(context.Background(), &paho.Subscribe{
Subscriptions: map[string]paho.SubscribeOptions{
rTopic: paho.SubscribeOptions{QoS: 0},
Subscriptions: []paho.SubscribeOptions{
{Topic: rTopic, QoS: 0},
},
})
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions paho/cmd/stdoutsub/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ func main() {
}()

sa, err := c.Subscribe(context.Background(), &paho.Subscribe{
Subscriptions: map[string]paho.SubscribeOptions{
*topic: {QoS: byte(*qos)},
Subscriptions: []paho.SubscribeOptions{
{Topic: *topic, QoS: byte(*qos)},
},
})
if err != nil {
Expand Down
22 changes: 12 additions & 10 deletions paho/cp_subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ type (
// Subscribe is a representation of a MQTT subscribe packet
Subscribe struct {
Properties *SubscribeProperties
Subscriptions map[string]SubscribeOptions
Subscriptions []SubscribeOptions
}

// SubscribeOptions is the struct representing the options for a subscription
SubscribeOptions struct {
Topic string
QoS byte
RetainHandling byte
NoLocal bool
Expand All @@ -35,16 +36,17 @@ func (s *Subscribe) InitProperties(prop *packets.Properties) {
}
}

// PacketSubOptionsFromSubscribeOptions returns a map of string to packet
// PacketSubOptionsFromSubscribeOptions returns a slice of packet
// library SubOptions for the paho Subscribe on which it is called
func (s *Subscribe) PacketSubOptionsFromSubscribeOptions() map[string]packets.SubOptions {
r := make(map[string]packets.SubOptions)
for k, v := range s.Subscriptions {
r[k] = packets.SubOptions{
QoS: v.QoS,
NoLocal: v.NoLocal,
RetainAsPublished: v.RetainAsPublished,
RetainHandling: v.RetainHandling,
func (s *Subscribe) PacketSubOptionsFromSubscribeOptions() []packets.SubOptions {
r := make([]packets.SubOptions, len(s.Subscriptions))
for i, sub := range s.Subscriptions {
r[i] = packets.SubOptions{
Topic: sub.Topic,
QoS: sub.QoS,
NoLocal: sub.NoLocal,
RetainAsPublished: sub.RetainAsPublished,
RetainHandling: sub.RetainHandling,
}
}

Expand Down
4 changes: 2 additions & 2 deletions paho/extensions/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ func NewHandler(ctx context.Context, c *paho.Client) (*Handler, error) {
c.Router.RegisterHandler(fmt.Sprintf("%s/responses", c.ClientID), h.responseHandler)

_, err := c.Subscribe(ctx, &paho.Subscribe{
Subscriptions: map[string]paho.SubscribeOptions{
fmt.Sprintf("%s/responses", c.ClientID): {QoS: 1},
Subscriptions: []paho.SubscribeOptions{
{Topic: fmt.Sprintf("%s/responses", c.ClientID), QoS: 1},
},
})
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions paho/message_ids.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ type CPContext struct {
}

// MIDs is the default MIDService provided by this library.
// It uses a map of uint16 to *CPContext to track responses
// to messages with a messageid
// It uses a slice of *CPContext to track responses
// to messages with a messageid tracking the last used message id
type MIDs struct {
sync.Mutex
lastMid uint16
Expand Down

0 comments on commit c22f806

Please sign in to comment.