From 5591c9b24bffcbeabeab13fd5f8f067e73f0cadf Mon Sep 17 00:00:00 2001 From: Al S-M Date: Wed, 24 May 2023 14:22:15 +0100 Subject: [PATCH] Use slice for Subscriptions instead of map Enumerating a map isn't deterministic so we can't guarantee an order to the subscribe packet sent to the server, as the response only contains a slice of values we need to know what order the subscriptions were sent in so that the responses can be correlated, using a slice and putting the topic as a property of the suboptions allows us to be sure of this. #123 --- autopaho/cmd/rpc/config.go | 4 ++-- autopaho/cmd/rpc/main.go | 4 ++-- autopaho/examples/docker/subscriber/main.go | 4 ++-- autopaho/extensions/rpc/rpc.go | 4 ++-- packets/packets.go | 10 ++-------- packets/packets_test.go | 5 +---- packets/pingreq.go | 5 ++--- packets/pingresp.go | 5 ++--- packets/subscribe.go | 14 +++++++------ paho/client.go | 12 +++++------ paho/client_test.go | 8 ++++---- paho/cmd/chat/main.go | 4 ++-- paho/cmd/rpc/main.go | 4 ++-- paho/cmd/stdoutsub/main.go | 4 ++-- paho/cp_subscribe.go | 22 +++++++++++---------- paho/extensions/rpc/rpc.go | 4 ++-- paho/message_ids.go | 4 ++-- 17 files changed, 55 insertions(+), 62 deletions(-) diff --git a/autopaho/cmd/rpc/config.go b/autopaho/cmd/rpc/config.go index 50e8044..9beba7c 100644 --- a/autopaho/cmd/rpc/config.go +++ b/autopaho/cmd/rpc/config.go @@ -114,8 +114,8 @@ func getCmConfig(cfg config) autopaho.ClientConfig { fmt.Println("mqtt connection up") ctx, _ := context.WithTimeout(context.Background(), time.Duration(5*time.Second)) if _, err := cm.Subscribe(ctx, &paho.Subscribe{ - Subscriptions: map[string]paho.SubscribeOptions{ - cfg.topic: {QoS: cfg.qos}, + Subscriptions: []paho.SubscribeOptions{ + {Topic: cfg.topic, QoS: cfg.qos}, }, }); err != nil { fmt.Printf("failed to subscribe (%s). This is likely to mean no messages will be received.", err) diff --git a/autopaho/cmd/rpc/main.go b/autopaho/cmd/rpc/main.go index 270c611..6d62f7e 100644 --- a/autopaho/cmd/rpc/main.go +++ b/autopaho/cmd/rpc/main.go @@ -96,8 +96,8 @@ func listener(rTopic string) { }) _, err = cm.Subscribe(ctx, &paho.Subscribe{ - Subscriptions: map[string]paho.SubscribeOptions{ - rTopic: {QoS: 0}, + Subscriptions: []paho.SubscribeOptions{ + {Topic: rTopic, QoS: 0}, }, }) if err != nil { diff --git a/autopaho/examples/docker/subscriber/main.go b/autopaho/examples/docker/subscriber/main.go index 668bc81..5ef61ab 100644 --- a/autopaho/examples/docker/subscriber/main.go +++ b/autopaho/examples/docker/subscriber/main.go @@ -32,8 +32,8 @@ func main() { OnConnectionUp: func(cm *autopaho.ConnectionManager, connAck *paho.Connack) { fmt.Println("mqtt connection up") if _, err := cm.Subscribe(context.Background(), &paho.Subscribe{ - Subscriptions: map[string]paho.SubscribeOptions{ - cfg.topic: {QoS: cfg.qos}, + Subscriptions: []paho.SubscribeOptions{ + {Topic: cfg.topic, QoS: cfg.qos}, }, }); err != nil { fmt.Printf("failed to subscribe (%s). This is likely to mean no messages will be received.", err) diff --git a/autopaho/extensions/rpc/rpc.go b/autopaho/extensions/rpc/rpc.go index 31b3bba..26f288b 100644 --- a/autopaho/extensions/rpc/rpc.go +++ b/autopaho/extensions/rpc/rpc.go @@ -37,8 +37,8 @@ func NewHandler(ctx context.Context, opts HandlerOpts) (*Handler, error) { opts.Router.RegisterHandler(h.responseTopic, h.responseHandler) _, err := opts.Conn.Subscribe(ctx, &paho.Subscribe{ - Subscriptions: map[string]paho.SubscribeOptions{ - h.responseTopic: {QoS: 1}, + Subscriptions: []paho.SubscribeOptions{ + {Topic: h.responseTopic, QoS: 1}, }, }) if err != nil { diff --git a/packets/packets.go b/packets/packets.go index 4965940..87140bb 100644 --- a/packets/packets.go +++ b/packets/packets.go @@ -157,10 +157,7 @@ func NewControlPacket(t byte) *ControlPacket { cp.Content = &Pubcomp{Properties: &Properties{}} case SUBSCRIBE: cp.Flags = 2 - cp.Content = &Subscribe{ - Subscriptions: make(map[string]SubOptions), - Properties: &Properties{}, - } + cp.Content = &Subscribe{Properties: &Properties{}} case SUBACK: cp.Content = &Suback{Properties: &Properties{}} case UNSUBSCRIBE: @@ -220,10 +217,7 @@ func ReadPacket(r io.Reader) (*ControlPacket, error) { cp.Content = &Pubcomp{Properties: &Properties{}} case SUBSCRIBE: cp.Flags = 2 - cp.Content = &Subscribe{ - Subscriptions: make(map[string]SubOptions), - Properties: &Properties{}, - } + cp.Content = &Subscribe{Properties: &Properties{}} case SUBACK: cp.Content = &Suback{Properties: &Properties{}} case UNSUBSCRIBE: diff --git a/packets/packets_test.go b/packets/packets_test.go index b0f1bd4..b94c14f 100644 --- a/packets/packets_test.go +++ b/packets/packets_test.go @@ -218,10 +218,7 @@ func TestNewControlPacket(t *testing.T) { args: SUBSCRIBE, want: &ControlPacket{ FixedHeader: FixedHeader{Type: SUBSCRIBE, Flags: 2}, - Content: &Subscribe{ - Properties: &Properties{}, - Subscriptions: make(map[string]SubOptions), - }, + Content: &Subscribe{Properties: &Properties{}}, }, }, { diff --git a/packets/pingreq.go b/packets/pingreq.go index 85f30c2..27d39ee 100644 --- a/packets/pingreq.go +++ b/packets/pingreq.go @@ -2,7 +2,6 @@ package packets import ( "bytes" - "fmt" "io" "net" ) @@ -12,10 +11,10 @@ type Pingreq struct { } func (p *Pingreq) String() string { - return fmt.Sprintf("PINGREQ") + return "PINGREQ" } -//Unpack is the implementation of the interface required function for a packet +// Unpack is the implementation of the interface required function for a packet func (p *Pingreq) Unpack(r *bytes.Buffer) error { return nil } diff --git a/packets/pingresp.go b/packets/pingresp.go index c110fc4..fcf421a 100644 --- a/packets/pingresp.go +++ b/packets/pingresp.go @@ -2,7 +2,6 @@ package packets import ( "bytes" - "fmt" "io" "net" ) @@ -12,10 +11,10 @@ type Pingresp struct { } func (p *Pingresp) String() string { - return fmt.Sprintf("PINGRESP") + return "PINGRESP" } -//Unpack is the implementation of the interface required function for a packet +// Unpack is the implementation of the interface required function for a packet func (p *Pingresp) Unpack(r *bytes.Buffer) error { return nil } diff --git a/packets/subscribe.go b/packets/subscribe.go index 3f457a2..3637d87 100644 --- a/packets/subscribe.go +++ b/packets/subscribe.go @@ -11,7 +11,7 @@ import ( // Subscribe is the Variable Header definition for a Subscribe control packet type Subscribe struct { Properties *Properties - Subscriptions map[string]SubOptions + Subscriptions []SubOptions PacketID uint16 } @@ -19,8 +19,8 @@ func (s *Subscribe) String() string { var b strings.Builder fmt.Fprintf(&b, "SUBSCRIBE: PacketID:%d Subscriptions:\n", s.PacketID) - for sub, o := range s.Subscriptions { - fmt.Fprintf(&b, "\t%s: QOS:%d RetainHandling:%X NoLocal:%t RetainAsPublished:%t\n", sub, o.QoS, o.RetainHandling, o.NoLocal, o.RetainAsPublished) + for _, o := range s.Subscriptions { + fmt.Fprintf(&b, "\t%s: QOS:%d RetainHandling:%X NoLocal:%t RetainAsPublished:%t\n", o.Topic, o.QoS, o.RetainHandling, o.NoLocal, o.RetainAsPublished) } fmt.Fprintf(&b, "Properties:\n%s", s.Properties) @@ -29,6 +29,7 @@ func (s *Subscribe) String() string { // SubOptions is the struct representing the options for a subscription type SubOptions struct { + Topic string QoS byte RetainHandling byte NoLocal bool @@ -87,7 +88,8 @@ func (s *Subscribe) Unpack(r *bytes.Buffer) error { if err = so.Unpack(r); err != nil { return err } - s.Subscriptions[t] = so + so.Topic = t + s.Subscriptions = append(s.Subscriptions, so) } return nil @@ -98,8 +100,8 @@ func (s *Subscribe) Buffers() net.Buffers { var b bytes.Buffer writeUint16(s.PacketID, &b) var subs bytes.Buffer - for t, o := range s.Subscriptions { - writeString(t, &subs) + for _, o := range s.Subscriptions { + writeString(o.Topic, &subs) subs.WriteByte(o.Pack()) } idvp := s.Properties.Pack(SUBSCRIBE) diff --git a/paho/client.go b/paho/client.go index 9e758ea..c124ca6 100644 --- a/paho/client.go +++ b/paho/client.go @@ -623,10 +623,10 @@ func (c *Client) Authenticate(ctx context.Context, a *Auth) (*AuthResponse, erro // is returned from the function, along with any errors. func (c *Client) Subscribe(ctx context.Context, s *Subscribe) (*Suback, error) { if !c.serverProps.WildcardSubAvailable { - for t := range s.Subscriptions { - if strings.ContainsAny(t, "#+") { + for _, sub := range s.Subscriptions { + if strings.ContainsAny(sub.Topic, "#+") { // Using a wildcard in a subscription when not supported - return nil, fmt.Errorf("cannot subscribe to %s, server does not support wildcards", t) + return nil, fmt.Errorf("cannot subscribe to %s, server does not support wildcards", sub.Topic) } } } @@ -634,9 +634,9 @@ func (c *Client) Subscribe(ctx context.Context, s *Subscribe) (*Suback, error) { return nil, fmt.Errorf("cannot send subscribe with subID set, server does not support subID") } if !c.serverProps.SharedSubAvailable { - for t := range s.Subscriptions { - if strings.HasPrefix(t, "$share") { - return nil, fmt.Errorf("cannont subscribe to %s, server does not support shared subscriptions", t) + for _, sub := range s.Subscriptions { + if strings.HasPrefix(sub.Topic, "$share") { + return nil, fmt.Errorf("cannont subscribe to %s, server does not support shared subscriptions", sub.Topic) } } } diff --git a/paho/client_test.go b/paho/client_test.go index 17e027a..0b0d713 100644 --- a/paho/client_test.go +++ b/paho/client_test.go @@ -109,10 +109,10 @@ func TestClientSubscribe(t *testing.T) { go c.PingHandler.Start(c.Conn, 30*time.Second) s := &Subscribe{ - Subscriptions: map[string]SubscribeOptions{ - "test/1": {QoS: 1}, - "test/2": {QoS: 2}, - "test/3": {QoS: 0}, + Subscriptions: []SubscribeOptions{ + {Topic: "test/1", QoS: 1}, + {Topic: "test/2", QoS: 2}, + {Topic: "test/3", QoS: 0}, }, } diff --git a/paho/cmd/chat/main.go b/paho/cmd/chat/main.go index 82cda87..bc313b9 100644 --- a/paho/cmd/chat/main.go +++ b/paho/cmd/chat/main.go @@ -81,8 +81,8 @@ func main() { }() if _, err := c.Subscribe(context.Background(), &paho.Subscribe{ - Subscriptions: map[string]paho.SubscribeOptions{ - *topic: {QoS: byte(*qos), NoLocal: true}, + Subscriptions: []paho.SubscribeOptions{ + {Topic: *topic, QoS: byte(*qos), NoLocal: true}, }, }); err != nil { log.Fatalln(err) diff --git a/paho/cmd/rpc/main.go b/paho/cmd/rpc/main.go index de67ad5..7b98050 100644 --- a/paho/cmd/rpc/main.go +++ b/paho/cmd/rpc/main.go @@ -113,8 +113,8 @@ func listener(server, rTopic, username, password string) { fmt.Printf("Connected to %s\n", server) _, err = c.Subscribe(context.Background(), &paho.Subscribe{ - Subscriptions: map[string]paho.SubscribeOptions{ - rTopic: paho.SubscribeOptions{QoS: 0}, + Subscriptions: []paho.SubscribeOptions{ + {Topic: rTopic, QoS: 0}, }, }) if err != nil { diff --git a/paho/cmd/stdoutsub/main.go b/paho/cmd/stdoutsub/main.go index a06f9e6..da42901 100644 --- a/paho/cmd/stdoutsub/main.go +++ b/paho/cmd/stdoutsub/main.go @@ -78,8 +78,8 @@ func main() { }() sa, err := c.Subscribe(context.Background(), &paho.Subscribe{ - Subscriptions: map[string]paho.SubscribeOptions{ - *topic: {QoS: byte(*qos)}, + Subscriptions: []paho.SubscribeOptions{ + {Topic: *topic, QoS: byte(*qos)}, }, }) if err != nil { diff --git a/paho/cp_subscribe.go b/paho/cp_subscribe.go index e111f0c..52dc540 100644 --- a/paho/cp_subscribe.go +++ b/paho/cp_subscribe.go @@ -6,11 +6,12 @@ type ( // Subscribe is a representation of a MQTT subscribe packet Subscribe struct { Properties *SubscribeProperties - Subscriptions map[string]SubscribeOptions + Subscriptions []SubscribeOptions } // SubscribeOptions is the struct representing the options for a subscription SubscribeOptions struct { + Topic string QoS byte RetainHandling byte NoLocal bool @@ -35,16 +36,17 @@ func (s *Subscribe) InitProperties(prop *packets.Properties) { } } -// PacketSubOptionsFromSubscribeOptions returns a map of string to packet +// PacketSubOptionsFromSubscribeOptions returns a slice of packet // library SubOptions for the paho Subscribe on which it is called -func (s *Subscribe) PacketSubOptionsFromSubscribeOptions() map[string]packets.SubOptions { - r := make(map[string]packets.SubOptions) - for k, v := range s.Subscriptions { - r[k] = packets.SubOptions{ - QoS: v.QoS, - NoLocal: v.NoLocal, - RetainAsPublished: v.RetainAsPublished, - RetainHandling: v.RetainHandling, +func (s *Subscribe) PacketSubOptionsFromSubscribeOptions() []packets.SubOptions { + r := make([]packets.SubOptions, len(s.Subscriptions)) + for i, sub := range s.Subscriptions { + r[i] = packets.SubOptions{ + Topic: sub.Topic, + QoS: sub.QoS, + NoLocal: sub.NoLocal, + RetainAsPublished: sub.RetainAsPublished, + RetainHandling: sub.RetainHandling, } } diff --git a/paho/extensions/rpc/rpc.go b/paho/extensions/rpc/rpc.go index 40d9397..7dfa0c5 100644 --- a/paho/extensions/rpc/rpc.go +++ b/paho/extensions/rpc/rpc.go @@ -26,8 +26,8 @@ func NewHandler(ctx context.Context, c *paho.Client) (*Handler, error) { c.Router.RegisterHandler(fmt.Sprintf("%s/responses", c.ClientID), h.responseHandler) _, err := c.Subscribe(ctx, &paho.Subscribe{ - Subscriptions: map[string]paho.SubscribeOptions{ - fmt.Sprintf("%s/responses", c.ClientID): {QoS: 1}, + Subscriptions: []paho.SubscribeOptions{ + {Topic: fmt.Sprintf("%s/responses", c.ClientID), QoS: 1}, }, }) if err != nil { diff --git a/paho/message_ids.go b/paho/message_ids.go index 58b03e3..f20ac79 100644 --- a/paho/message_ids.go +++ b/paho/message_ids.go @@ -43,8 +43,8 @@ type CPContext struct { } // MIDs is the default MIDService provided by this library. -// It uses a map of uint16 to *CPContext to track responses -// to messages with a messageid +// It uses a slice of *CPContext to track responses +// to messages with a messageid tracking the last used message id type MIDs struct { sync.Mutex lastMid uint16