Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove SingleHandlerRouter #186

Merged
merged 2 commits into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion autopaho/auto_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func TestBasicPubSub(t *testing.T) {
PahoErrors: logger,
ClientConfig: paho.ClientConfig{
ClientID: "test",
Router: paho.NewSingleHandlerRouter(func(publish *paho.Publish) {
Router: paho.NewStandardRouterWithDefault(func(publish *paho.Publish) {
mrMu.Lock()
defer mrMu.Unlock()
messagesReceived = append(messagesReceived, publish)
Expand Down
2 changes: 1 addition & 1 deletion autopaho/cmd/rpc/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func getCmConfig(cfg config) autopaho.ClientConfig {
OnConnectError: func(err error) { fmt.Printf("error whilst attempting connection: %s\n", err) },
ClientConfig: paho.ClientConfig{
ClientID: cfg.clientID,
Router: paho.NewSingleHandlerRouter(func(m *paho.Publish) {
Router: paho.NewStandardRouterWithDefault(func(m *paho.Publish) {
log.Printf("%v+", m)
}),
OnClientError: func(err error) { fmt.Printf("%s requested disconnect: %s\n", cfg.clientID, err) },
Expand Down
4 changes: 2 additions & 2 deletions autopaho/examples/basics/basics.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ func main() {
ClientConfig: paho.ClientConfig{
// If you are using QOS 1/2, then it's important to specify a client id (which must be unique)
ClientID: clientID,
// The Router will receive any inbound messages (SingleHandlerRouter ignores the topic and always calls the function)
Router: paho.NewSingleHandlerRouter(func(m *paho.Publish) {
// The Router will receive any inbound messages (the router can map for you or just pass messages to a single handler)
Router: paho.NewStandardRouterWithDefault(func(m *paho.Publish) {
fmt.Printf("received message on topic %s; body: %s (retain: %t)\n", m.Topic, m.Payload, m.Retain)
}),
OnClientError: func(err error) { fmt.Printf("client error: %s\n", err) },
Expand Down
2 changes: 1 addition & 1 deletion autopaho/examples/docker/subscriber/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func main() {
ClientConfig: paho.ClientConfig{
ClientID: cfg.clientID,
Session: sessionState,
Router: paho.NewSingleHandlerRouter(func(m *paho.Publish) {
Router: paho.NewStandardRouterWithDefault(func(m *paho.Publish) {
h.handle(m)
}),
OnClientError: func(err error) { fmt.Printf("client error: %s\n", err) },
Expand Down
2 changes: 1 addition & 1 deletion autopaho/examples/queue/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func subscribe(ctx context.Context, serverURL *url.URL, msgCount uint64, ready c
// eclipse/paho.golang/paho provides base mqtt functionality, the below config will be passed in for each connection
ClientConfig: paho.ClientConfig{
ClientID: "TestSub",
Router: paho.NewSingleHandlerRouter(func(m *paho.Publish) {
Router: paho.NewStandardRouterWithDefault(func(m *paho.Publish) {
msgNo, err := binary.ReadUvarint(bytes.NewReader(m.Payload))
if err != nil {
panic(err) // Message corruption or something else is using our topic!
Expand Down
4 changes: 2 additions & 2 deletions autopaho/persistence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func TestDisconnectAfterOutgoingPublish(t *testing.T) {
ClientConfig: paho.ClientConfig{
ClientID: "test",
Session: session,
Router: paho.NewSingleHandlerRouter(func(publish *paho.Publish) {}),
Router: paho.NewStandardRouterWithDefault(func(publish *paho.Publish) {}),
},
}

Expand Down Expand Up @@ -255,7 +255,7 @@ func TestQueueResume(t *testing.T) {
ClientConfig: paho.ClientConfig{
ClientID: "test",
Session: session,
Router: paho.NewSingleHandlerRouter(func(publish *paho.Publish) {}),
Router: paho.NewStandardRouterWithDefault(func(publish *paho.Publish) {}),
},
}

Expand Down
2 changes: 1 addition & 1 deletion autopaho/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func TestQueuedMessages(t *testing.T) {
ClientConfig: paho.ClientConfig{
ClientID: "test",
Session: session,
Router: paho.NewSingleHandlerRouter(func(publish *paho.Publish) {}),
Router: paho.NewStandardRouterWithDefault(func(publish *paho.Publish) {}),
},
}

Expand Down
4 changes: 2 additions & 2 deletions autopaho/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ cliCfg := autopaho.ClientConfig{
ClientConfig: paho.ClientConfig{
// If you are using QOS 1/2, then it's important to specify a client id (which must be unique)
ClientID: clientID,
// The Router will receive any inbound messages (SingleHandlerRouter ignores the topic and always calls the function)
Router: paho.NewSingleHandlerRouter(func(m *paho.Publish) {
// The Router will receive any inbound messages (the router can map for you or just pass messages to a single handler)
Router: paho.NewStandardRouterWithDefault(func(m *paho.Publish) {
fmt.Printf("received message on topic %s; body: %s (retain: %t)\n", m.Topic, m.Payload, m.Retain)
}),
OnClientError: func(err error) { fmt.Printf("client error: %s\n", err) },
Expand Down
10 changes: 5 additions & 5 deletions paho/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func TestClientReceiveQoS0(t *testing.T) {

c := NewClient(ClientConfig{
Conn: ts.ClientConn(),
Router: NewSingleHandlerRouter(func(p *Publish) {
Router: NewStandardRouterWithDefault(func(p *Publish) {
assert.Equal(t, "test/0", p.Topic)
assert.Equal(t, "test payload", string(p.Payload))
assert.Equal(t, byte(0), p.QoS)
Expand Down Expand Up @@ -356,7 +356,7 @@ func TestClientReceiveQoS1(t *testing.T) {

c := NewClient(ClientConfig{
Conn: ts.ClientConn(),
Router: NewSingleHandlerRouter(func(p *Publish) {
Router: NewStandardRouterWithDefault(func(p *Publish) {
assert.Equal(t, "test/1", p.Topic)
assert.Equal(t, "test payload", string(p.Payload))
assert.Equal(t, byte(1), p.QoS)
Expand Down Expand Up @@ -402,7 +402,7 @@ func TestClientReceiveQoS2(t *testing.T) {

c := NewClient(ClientConfig{
Conn: ts.ClientConn(),
Router: NewSingleHandlerRouter(func(p *Publish) {
Router: NewStandardRouterWithDefault(func(p *Publish) {
assert.Equal(t, "test/2", p.Topic)
assert.Equal(t, "test payload", string(p.Payload))
assert.Equal(t, byte(2), p.QoS)
Expand Down Expand Up @@ -464,7 +464,7 @@ func TestClientReceiveAndAckInOrder(t *testing.T) {
wg.Add(expectedPacketsCount)
c := NewClient(ClientConfig{
Conn: ts.ClientConn(),
Router: NewSingleHandlerRouter(func(p *Publish) {
Router: NewStandardRouterWithDefault(func(p *Publish) {
defer wg.Done()
actualPublishPackets = append(actualPublishPackets, *p.Packet())
}),
Expand Down Expand Up @@ -545,7 +545,7 @@ func TestManualAcksInOrder(t *testing.T) {
Conn: ts.ClientConn(),
EnableManualAcknowledgment: true,
})
c.Router = NewSingleHandlerRouter(func(p *Publish) {
c.Router = NewStandardRouterWithDefault(func(p *Publish) {
defer wg.Done()
actualPublishPackets = append(actualPublishPackets, *p.Packet())
require.NoError(t, c.Ack(p))
Expand Down
2 changes: 1 addition & 1 deletion paho/cmd/chat/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func main() {
}

c := paho.NewClient(paho.ClientConfig{
Router: paho.NewSingleHandlerRouter(func(m *paho.Publish) {
Router: paho.NewStandardRouterWithDefault(func(m *paho.Publish) {
log.Printf("%s : %s", m.Properties.User.Get("chatname"), string(m.Payload))
}),
Conn: conn,
Expand Down
6 changes: 3 additions & 3 deletions paho/cmd/rpc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func listener(server, rTopic, username, password string) {
c := paho.NewClient(paho.ClientConfig{
Conn: conn,
})
c.Router = paho.NewSingleHandlerRouter(func(m *paho.Publish) {
c.Router = paho.NewStandardRouterWithDefault(func(m *paho.Publish) {
if m.Properties != nil && m.Properties.CorrelationData != nil && m.Properties.ResponseTopic != "" {
log.Printf("Received message with response topic %s and correl id %s\n%s", m.Properties.ResponseTopic, string(m.Properties.CorrelationData), string(m.Payload))

Expand Down Expand Up @@ -138,7 +138,7 @@ func main() {
password := flag.String("password", "", "Password to match username")
flag.Parse()

//paho.SetDebugLogger(log.New(os.Stderr, "RPC: ", log.LstdFlags))
// paho.SetDebugLogger(log.New(os.Stderr, "RPC: ", log.LstdFlags))

listener(*server, *rTopic, *username, *password)

Expand All @@ -151,7 +151,7 @@ func main() {
defer cancel()

c := paho.NewClient(paho.ClientConfig{
Router: paho.NewSingleHandlerRouter(nil),
Router: paho.NewStandardRouterWithDefault(nil),
Conn: conn,
})

Expand Down
2 changes: 1 addition & 1 deletion paho/cmd/stdoutsub/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func main() {
}

c := paho.NewClient(paho.ClientConfig{
Router: paho.NewSingleHandlerRouter(func(m *paho.Publish) {
Router: paho.NewStandardRouterWithDefault(func(m *paho.Publish) {
msgChan <- m
}),
Conn: conn,
Expand Down
73 changes: 17 additions & 56 deletions paho/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ func NewStandardRouter() *StandardRouter {
}
}

// NewStandardRouterWithDefault instantiates and returns an instance of a StandardRouter
// with the default handler set to the value passed in (for convenience when creating
// handler inline).
func NewStandardRouterWithDefault(h MessageHandler) *StandardRouter {
r := NewStandardRouter()
r.DefaultHandler(h)
return r
}

// RegisterHandler is the library provided StandardRouter's
// implementation of the required interface function()
func (r *StandardRouter) RegisterHandler(topic string, h MessageHandler) {
Expand Down Expand Up @@ -173,60 +182,12 @@ func topicSplit(topic string) []string {
return strings.Split(topic, "/")
}

// SingleHandlerRouter is a library provided implementation of a Router
// that stores only a single MessageHandler and invokes this MessageHandler
// for all received Publishes
type SingleHandlerRouter struct {
sync.Mutex
aliases map[uint16]string
handler MessageHandler
debug log.Logger
}

// NewSingleHandlerRouter instantiates and returns an instance of a SingleHandlerRouter
func NewSingleHandlerRouter(h MessageHandler) *SingleHandlerRouter {
return &SingleHandlerRouter{
aliases: make(map[uint16]string),
handler: h,
debug: log.NOOPLogger{},
}
}

// RegisterHandler is the library provided SingleHandlerRouter's
// implementation of the required interface function()
func (s *SingleHandlerRouter) RegisterHandler(topic string, h MessageHandler) {
s.debug.Println("registering handler for:", topic)
s.handler = h
}

// UnregisterHandler is the library provided SingleHandlerRouter's
// implementation of the required interface function()
func (s *SingleHandlerRouter) UnregisterHandler(topic string) {}

// Route is the library provided SingleHandlerRouter's
// implementation of the required interface function()
func (s *SingleHandlerRouter) Route(pb *packets.Publish) {
m := PublishFromPacketPublish(pb)

s.debug.Println("routing message for:", m.Topic)

if pb.Properties.TopicAlias != nil {
s.debug.Println("message is using topic aliasing")
if pb.Topic != "" {
// Register new alias
s.debug.Printf("registering new topic alias '%d' for topic '%s'", *pb.Properties.TopicAlias, m.Topic)
s.aliases[*pb.Properties.TopicAlias] = pb.Topic
}
if t, ok := s.aliases[*pb.Properties.TopicAlias]; ok {
s.debug.Printf("aliased topic '%d' translates to '%s'", *pb.Properties.TopicAlias, m.Topic)
m.Topic = t
}
}
s.handler(m)
}

// SetDebugLogger sets the logger l to be used for printing debug
// information for the router
func (s *SingleHandlerRouter) SetDebugLogger(l log.Logger) {
s.debug = l
// NewSingleHandlerRouter instantiates a router that will call the passed in message handler for all
// inbound messages (assuming `RegisterHandler` is never called).
//
// Deprecated: SingleHandlerRouter has been removed because it did not meet the requirements set out
// in the `Router` interface documentation. This function is only included to maintain compatibility,
// but there are limits (this version does not ignore calls to `RegisterHandler`).
func NewSingleHandlerRouter(h MessageHandler) *StandardRouter {
return NewStandardRouterWithDefault(h)
}