diff --git a/autopaho/auto_test.go b/autopaho/auto_test.go index bbb7e73..638da1d 100644 --- a/autopaho/auto_test.go +++ b/autopaho/auto_test.go @@ -296,7 +296,7 @@ func TestBasicPubSub(t *testing.T) { PahoErrors: logger, ClientConfig: paho.ClientConfig{ ClientID: "test", - Router: paho.NewSingleHandlerRouter(func(publish *paho.Publish) { + Router: paho.NewStandardRouterWithDefault(func(publish *paho.Publish) { mrMu.Lock() defer mrMu.Unlock() messagesReceived = append(messagesReceived, publish) diff --git a/autopaho/cmd/rpc/config.go b/autopaho/cmd/rpc/config.go index 0916651..2a452d5 100644 --- a/autopaho/cmd/rpc/config.go +++ b/autopaho/cmd/rpc/config.go @@ -127,7 +127,7 @@ func getCmConfig(cfg config) autopaho.ClientConfig { OnConnectError: func(err error) { fmt.Printf("error whilst attempting connection: %s\n", err) }, ClientConfig: paho.ClientConfig{ ClientID: cfg.clientID, - Router: paho.NewSingleHandlerRouter(func(m *paho.Publish) { + Router: paho.NewStandardRouterWithDefault(func(m *paho.Publish) { log.Printf("%v+", m) }), OnClientError: func(err error) { fmt.Printf("%s requested disconnect: %s\n", cfg.clientID, err) }, diff --git a/autopaho/examples/basics/basics.go b/autopaho/examples/basics/basics.go index f800063..8bd13a7 100644 --- a/autopaho/examples/basics/basics.go +++ b/autopaho/examples/basics/basics.go @@ -56,8 +56,8 @@ func main() { ClientConfig: paho.ClientConfig{ // If you are using QOS 1/2, then it's important to specify a client id (which must be unique) ClientID: clientID, - // The Router will receive any inbound messages (SingleHandlerRouter ignores the topic and always calls the function) - Router: paho.NewSingleHandlerRouter(func(m *paho.Publish) { + // The Router will receive any inbound messages (the router can map for you or just pass messages to a single handler) + Router: paho.NewStandardRouterWithDefault(func(m *paho.Publish) { fmt.Printf("received message on topic %s; body: %s (retain: %t)\n", m.Topic, m.Payload, m.Retain) }), OnClientError: func(err error) { fmt.Printf("client error: %s\n", err) }, diff --git a/autopaho/examples/docker/subscriber/main.go b/autopaho/examples/docker/subscriber/main.go index a053a8d..df410fe 100644 --- a/autopaho/examples/docker/subscriber/main.go +++ b/autopaho/examples/docker/subscriber/main.go @@ -64,7 +64,7 @@ func main() { ClientConfig: paho.ClientConfig{ ClientID: cfg.clientID, Session: sessionState, - Router: paho.NewSingleHandlerRouter(func(m *paho.Publish) { + Router: paho.NewStandardRouterWithDefault(func(m *paho.Publish) { h.handle(m) }), OnClientError: func(err error) { fmt.Printf("client error: %s\n", err) }, diff --git a/autopaho/examples/queue/subscribe.go b/autopaho/examples/queue/subscribe.go index 25e0f80..c3c7f55 100644 --- a/autopaho/examples/queue/subscribe.go +++ b/autopaho/examples/queue/subscribe.go @@ -52,7 +52,7 @@ func subscribe(ctx context.Context, serverURL *url.URL, msgCount uint64, ready c // eclipse/paho.golang/paho provides base mqtt functionality, the below config will be passed in for each connection ClientConfig: paho.ClientConfig{ ClientID: "TestSub", - Router: paho.NewSingleHandlerRouter(func(m *paho.Publish) { + Router: paho.NewStandardRouterWithDefault(func(m *paho.Publish) { msgNo, err := binary.ReadUvarint(bytes.NewReader(m.Payload)) if err != nil { panic(err) // Message corruption or something else is using our topic! diff --git a/autopaho/persistence_test.go b/autopaho/persistence_test.go index 46359ff..9e5324b 100644 --- a/autopaho/persistence_test.go +++ b/autopaho/persistence_test.go @@ -92,7 +92,7 @@ func TestDisconnectAfterOutgoingPublish(t *testing.T) { ClientConfig: paho.ClientConfig{ ClientID: "test", Session: session, - Router: paho.NewSingleHandlerRouter(func(publish *paho.Publish) {}), + Router: paho.NewStandardRouterWithDefault(func(publish *paho.Publish) {}), }, } @@ -255,7 +255,7 @@ func TestQueueResume(t *testing.T) { ClientConfig: paho.ClientConfig{ ClientID: "test", Session: session, - Router: paho.NewSingleHandlerRouter(func(publish *paho.Publish) {}), + Router: paho.NewStandardRouterWithDefault(func(publish *paho.Publish) {}), }, } diff --git a/autopaho/queue_test.go b/autopaho/queue_test.go index 84b06d3..7031c99 100644 --- a/autopaho/queue_test.go +++ b/autopaho/queue_test.go @@ -98,7 +98,7 @@ func TestQueuedMessages(t *testing.T) { ClientConfig: paho.ClientConfig{ ClientID: "test", Session: session, - Router: paho.NewSingleHandlerRouter(func(publish *paho.Publish) {}), + Router: paho.NewStandardRouterWithDefault(func(publish *paho.Publish) {}), }, } diff --git a/autopaho/readme.md b/autopaho/readme.md index 1aaa4ca..3ea517d 100644 --- a/autopaho/readme.md +++ b/autopaho/readme.md @@ -49,8 +49,8 @@ cliCfg := autopaho.ClientConfig{ ClientConfig: paho.ClientConfig{ // If you are using QOS 1/2, then it's important to specify a client id (which must be unique) ClientID: clientID, - // The Router will receive any inbound messages (SingleHandlerRouter ignores the topic and always calls the function) - Router: paho.NewSingleHandlerRouter(func(m *paho.Publish) { + // The Router will receive any inbound messages (the router can map for you or just pass messages to a single handler) + Router: paho.NewStandardRouterWithDefault(func(m *paho.Publish) { fmt.Printf("received message on topic %s; body: %s (retain: %t)\n", m.Topic, m.Payload, m.Retain) }), OnClientError: func(err error) { fmt.Printf("client error: %s\n", err) }, diff --git a/paho/client_test.go b/paho/client_test.go index df7c4a2..cff0c10 100644 --- a/paho/client_test.go +++ b/paho/client_test.go @@ -311,7 +311,7 @@ func TestClientReceiveQoS0(t *testing.T) { c := NewClient(ClientConfig{ Conn: ts.ClientConn(), - Router: NewSingleHandlerRouter(func(p *Publish) { + Router: NewStandardRouterWithDefault(func(p *Publish) { assert.Equal(t, "test/0", p.Topic) assert.Equal(t, "test payload", string(p.Payload)) assert.Equal(t, byte(0), p.QoS) @@ -356,7 +356,7 @@ func TestClientReceiveQoS1(t *testing.T) { c := NewClient(ClientConfig{ Conn: ts.ClientConn(), - Router: NewSingleHandlerRouter(func(p *Publish) { + Router: NewStandardRouterWithDefault(func(p *Publish) { assert.Equal(t, "test/1", p.Topic) assert.Equal(t, "test payload", string(p.Payload)) assert.Equal(t, byte(1), p.QoS) @@ -402,7 +402,7 @@ func TestClientReceiveQoS2(t *testing.T) { c := NewClient(ClientConfig{ Conn: ts.ClientConn(), - Router: NewSingleHandlerRouter(func(p *Publish) { + Router: NewStandardRouterWithDefault(func(p *Publish) { assert.Equal(t, "test/2", p.Topic) assert.Equal(t, "test payload", string(p.Payload)) assert.Equal(t, byte(2), p.QoS) @@ -464,7 +464,7 @@ func TestClientReceiveAndAckInOrder(t *testing.T) { wg.Add(expectedPacketsCount) c := NewClient(ClientConfig{ Conn: ts.ClientConn(), - Router: NewSingleHandlerRouter(func(p *Publish) { + Router: NewStandardRouterWithDefault(func(p *Publish) { defer wg.Done() actualPublishPackets = append(actualPublishPackets, *p.Packet()) }), @@ -545,7 +545,7 @@ func TestManualAcksInOrder(t *testing.T) { Conn: ts.ClientConn(), EnableManualAcknowledgment: true, }) - c.Router = NewSingleHandlerRouter(func(p *Publish) { + c.Router = NewStandardRouterWithDefault(func(p *Publish) { defer wg.Done() actualPublishPackets = append(actualPublishPackets, *p.Packet()) require.NoError(t, c.Ack(p)) diff --git a/paho/cmd/chat/main.go b/paho/cmd/chat/main.go index bc313b9..8ec6d34 100644 --- a/paho/cmd/chat/main.go +++ b/paho/cmd/chat/main.go @@ -34,7 +34,7 @@ func main() { } c := paho.NewClient(paho.ClientConfig{ - Router: paho.NewSingleHandlerRouter(func(m *paho.Publish) { + Router: paho.NewStandardRouterWithDefault(func(m *paho.Publish) { log.Printf("%s : %s", m.Properties.User.Get("chatname"), string(m.Payload)) }), Conn: conn, diff --git a/paho/cmd/rpc/main.go b/paho/cmd/rpc/main.go index 7b98050..5335067 100644 --- a/paho/cmd/rpc/main.go +++ b/paho/cmd/rpc/main.go @@ -51,7 +51,7 @@ func listener(server, rTopic, username, password string) { c := paho.NewClient(paho.ClientConfig{ Conn: conn, }) - c.Router = paho.NewSingleHandlerRouter(func(m *paho.Publish) { + c.Router = paho.NewStandardRouterWithDefault(func(m *paho.Publish) { if m.Properties != nil && m.Properties.CorrelationData != nil && m.Properties.ResponseTopic != "" { log.Printf("Received message with response topic %s and correl id %s\n%s", m.Properties.ResponseTopic, string(m.Properties.CorrelationData), string(m.Payload)) @@ -138,7 +138,7 @@ func main() { password := flag.String("password", "", "Password to match username") flag.Parse() - //paho.SetDebugLogger(log.New(os.Stderr, "RPC: ", log.LstdFlags)) + // paho.SetDebugLogger(log.New(os.Stderr, "RPC: ", log.LstdFlags)) listener(*server, *rTopic, *username, *password) @@ -151,7 +151,7 @@ func main() { defer cancel() c := paho.NewClient(paho.ClientConfig{ - Router: paho.NewSingleHandlerRouter(nil), + Router: paho.NewStandardRouterWithDefault(nil), Conn: conn, }) diff --git a/paho/cmd/stdoutsub/main.go b/paho/cmd/stdoutsub/main.go index da42901..b8f5dd6 100644 --- a/paho/cmd/stdoutsub/main.go +++ b/paho/cmd/stdoutsub/main.go @@ -32,7 +32,7 @@ func main() { } c := paho.NewClient(paho.ClientConfig{ - Router: paho.NewSingleHandlerRouter(func(m *paho.Publish) { + Router: paho.NewStandardRouterWithDefault(func(m *paho.Publish) { msgChan <- m }), Conn: conn, diff --git a/paho/router.go b/paho/router.go index f6109f7..ce9fdda 100644 --- a/paho/router.go +++ b/paho/router.go @@ -50,6 +50,15 @@ func NewStandardRouter() *StandardRouter { } } +// NewStandardRouterWithDefault instantiates and returns an instance of a StandardRouter +// with the default handler set to the value passed in (for convenience when creating +// handler inline). +func NewStandardRouterWithDefault(h MessageHandler) *StandardRouter { + r := NewStandardRouter() + r.DefaultHandler(h) + return r +} + // RegisterHandler is the library provided StandardRouter's // implementation of the required interface function() func (r *StandardRouter) RegisterHandler(topic string, h MessageHandler) { @@ -173,60 +182,12 @@ func topicSplit(topic string) []string { return strings.Split(topic, "/") } -// SingleHandlerRouter is a library provided implementation of a Router -// that stores only a single MessageHandler and invokes this MessageHandler -// for all received Publishes -type SingleHandlerRouter struct { - sync.Mutex - aliases map[uint16]string - handler MessageHandler - debug log.Logger -} - -// NewSingleHandlerRouter instantiates and returns an instance of a SingleHandlerRouter -func NewSingleHandlerRouter(h MessageHandler) *SingleHandlerRouter { - return &SingleHandlerRouter{ - aliases: make(map[uint16]string), - handler: h, - debug: log.NOOPLogger{}, - } -} - -// RegisterHandler is the library provided SingleHandlerRouter's -// implementation of the required interface function() -func (s *SingleHandlerRouter) RegisterHandler(topic string, h MessageHandler) { - s.debug.Println("registering handler for:", topic) - s.handler = h -} - -// UnregisterHandler is the library provided SingleHandlerRouter's -// implementation of the required interface function() -func (s *SingleHandlerRouter) UnregisterHandler(topic string) {} - -// Route is the library provided SingleHandlerRouter's -// implementation of the required interface function() -func (s *SingleHandlerRouter) Route(pb *packets.Publish) { - m := PublishFromPacketPublish(pb) - - s.debug.Println("routing message for:", m.Topic) - - if pb.Properties.TopicAlias != nil { - s.debug.Println("message is using topic aliasing") - if pb.Topic != "" { - // Register new alias - s.debug.Printf("registering new topic alias '%d' for topic '%s'", *pb.Properties.TopicAlias, m.Topic) - s.aliases[*pb.Properties.TopicAlias] = pb.Topic - } - if t, ok := s.aliases[*pb.Properties.TopicAlias]; ok { - s.debug.Printf("aliased topic '%d' translates to '%s'", *pb.Properties.TopicAlias, m.Topic) - m.Topic = t - } - } - s.handler(m) -} - -// SetDebugLogger sets the logger l to be used for printing debug -// information for the router -func (s *SingleHandlerRouter) SetDebugLogger(l log.Logger) { - s.debug = l +// NewSingleHandlerRouter instantiates a router that will call the passed in message handler for all +// inbound messages (assuming `RegisterHandler` is never called). +// +// Deprecated: SingleHandlerRouter has been removed because it did not meet the requirements set out +// in the `Router` interface documentation. This function is only included to maintain compatibility, +// but there are limits (this version does not ignore calls to `RegisterHandler`). +func NewSingleHandlerRouter(h MessageHandler) *StandardRouter { + return NewStandardRouterWithDefault(h) }