diff --git a/autopaho/auto.go b/autopaho/auto.go index 8faf455..28bc9f2 100644 --- a/autopaho/auto.go +++ b/autopaho/auto.go @@ -464,7 +464,7 @@ func (c *ConnectionManager) PublishViaQueue(ctx context.Context, p *QueuePublish func (c *ConnectionManager) TerminateConnectionForTest() { c.mu.Lock() if c.cli != nil { - _ = c.cli.Conn.Close() + c.cli.TerminateConnectionForTest() } c.mu.Unlock() } diff --git a/paho/client.go b/paho/client.go index eee37e9..94b3ae9 100644 --- a/paho/client.go +++ b/paho/client.go @@ -94,8 +94,8 @@ type ( } // Client is the struct representing an MQTT client Client struct { - mu sync.Mutex - ClientConfig + mu sync.Mutex + config ClientConfig // OnPublishReceived copy of OnPublishReceived from ClientConfig (perhaps with added callback form Router) onPublishReceived []func(PublishReceived) (bool, error) @@ -155,26 +155,26 @@ func NewClient(conf ClientConfig) *Client { MaximumPacketSize: 0, TopicAliasMaximum: 0, }, - ClientConfig: conf, + config: conf, onPublishReceived: conf.OnPublishReceived, done: make(chan struct{}), errors: log.NOOPLogger{}, debug: log.NOOPLogger{}, } - if c.Session == nil { - c.Session = state.NewInMemory() - c.autoCloseSession = true // We created `Session`, so need to close it when done (so handlers all return) + if c.config.Session == nil { + c.config.Session = state.NewInMemory() + c.config.autoCloseSession = true // We created `Session`, so need to close it when done (so handlers all return) } - if c.PacketTimeout == 0 { - c.PacketTimeout = 10 * time.Second + if c.config.PacketTimeout == 0 { + c.config.PacketTimeout = 10 * time.Second } - if c.Router == nil && len(c.onPublishReceived) == 0 { - c.Router = NewStandardRouter() // Maintain backwards compatibility (for now!) + if c.config.Router == nil && len(c.onPublishReceived) == 0 { + c.config.Router = NewStandardRouter() // Maintain backwards compatibility (for now!) } - if c.Router != nil { - r := c.Router + if c.config.Router != nil { + r := c.config.Router c.onPublishReceived = append(c.onPublishReceived, func(p PublishReceived) (bool, error) { r.Route(p.Packet.Packet()) @@ -183,13 +183,13 @@ func NewClient(conf ClientConfig) *Client { } c.onPublishReceivedTracker = make([]int, len(c.onPublishReceived)) // Must have the same number of elements as onPublishReceived - if c.PingHandler == nil { - c.PingHandler = DefaultPingerWithCustomFailHandler(func(e error) { + if c.config.PingHandler == nil { + c.config.PingHandler = DefaultPingerWithCustomFailHandler(func(e error) { go c.error(e) }) } - if c.OnClientError == nil { - c.OnClientError = func(e error) {} + if c.config.OnClientError == nil { + c.config.OnClientError = func(e error) {} } return c @@ -203,14 +203,14 @@ func NewClient(conf ClientConfig) *Client { // returned. Otherwise, the failure Connack (if there is one) is returned // along with an error indicating the reason for the failure to connect. func (c *Client) Connect(ctx context.Context, cp *Connect) (*Connack, error) { - if c.Conn == nil { + if c.config.Conn == nil { return nil, fmt.Errorf("client connection is nil") } cleanup := func() { close(c.stop) close(c.publishPackets) - _ = c.Conn.Close() + _ = c.config.Conn.Close() close(c.done) c.mu.Unlock() } @@ -226,7 +226,7 @@ func (c *Client) Connect(ctx context.Context, cp *Connect) (*Connack, error) { c.publishPackets = make(chan *packets.Publish, publishPacketsSize) keepalive := cp.KeepAlive - c.ClientID = cp.ClientID + c.config.ClientID = cp.ClientID if cp.Properties != nil { if cp.Properties.MaximumPacketSize != nil { c.clientProps.MaximumPacketSize = *cp.Properties.MaximumPacketSize @@ -243,7 +243,7 @@ func (c *Client) Connect(ctx context.Context, cp *Connect) (*Connack, error) { } c.debug.Println("connecting") - connCtx, cf := context.WithTimeout(ctx, c.PacketTimeout) + connCtx, cf := context.WithTimeout(ctx, c.config.PacketTimeout) defer cf() ccp := cp.Packet() @@ -251,7 +251,7 @@ func (c *Client) Connect(ctx context.Context, cp *Connect) (*Connack, error) { ccp.ProtocolVersion = 5 c.debug.Println("sending CONNECT") - if _, err := ccp.WriteTo(c.Conn); err != nil { + if _, err := ccp.WriteTo(c.config.Conn); err != nil { cleanup() return nil, err } @@ -292,7 +292,7 @@ func (c *Client) Connect(ctx context.Context, cp *Connect) (*Connack, error) { return ca, fmt.Errorf("failed to connect to server: %s", reason) } - if err := c.Session.ConAckReceived(c.Conn, ccp, caPacket); err != nil { + if err := c.config.Session.ConAckReceived(c.config.Conn, ccp, caPacket); err != nil { cleanup() return ca, fmt.Errorf("session error: %w", err) } @@ -305,7 +305,7 @@ func (c *Client) Connect(ctx context.Context, cp *Connect) (*Connack, error) { keepalive = *ca.Properties.ServerKeepAlive } if ca.Properties.AssignedClientID != "" { - c.ClientID = ca.Properties.AssignedClientID + c.config.ClientID = ca.Properties.AssignedClientID } if ca.Properties.ReceiveMaximum != nil { c.serverProps.ReceiveMaximum = *ca.Properties.ReceiveMaximum @@ -331,7 +331,7 @@ func (c *Client) Connect(ctx context.Context, cp *Connect) (*Connack, error) { go func() { defer c.workers.Done() defer c.debug.Println("returning from ping handler worker") - c.PingHandler.Start(c.Conn, time.Duration(keepalive)*time.Second) + c.config.PingHandler.Start(c.config.Conn, time.Duration(keepalive)*time.Second) }() } @@ -353,13 +353,13 @@ func (c *Client) Connect(ctx context.Context, cp *Connect) (*Connack, error) { c.incoming() }() - if c.EnableManualAcknowledgment { + if c.config.EnableManualAcknowledgment { c.debug.Println("starting acking routine") c.acksTracker.reset() sendAcksInterval := defaultSendAckInterval - if c.SendAcksInterval > 0 { - sendAcksInterval = c.SendAcksInterval + if c.config.SendAcksInterval > 0 { + sendAcksInterval = c.config.SendAcksInterval } c.workers.Add(1) @@ -395,7 +395,7 @@ func (c *Client) Done() <-chan struct{} { // WARNING: Calling Ack after the connection is closed may have unpredictable results (particularly if the sessionState // is being accessed by a new connection). See issue #160. func (c *Client) Ack(pb *Publish) error { - if !c.EnableManualAcknowledgment { + if !c.config.EnableManualAcknowledgment { return ErrManualAcknowledgmentDisabled } if pb.QoS == 0 { @@ -406,7 +406,7 @@ func (c *Client) Ack(pb *Publish) error { // ack acknowledges a message (note: called by acksTracker to ensure these are sent in order) func (c *Client) ack(pb *packets.Publish) { - c.Session.Ack(pb) + c.config.Session.Ack(pb) } func (c *Client) routePublishPackets() { @@ -419,7 +419,7 @@ func (c *Client) routePublishPackets() { } c.onPublishReceivedMu.Unlock() - if c.ClientConfig.EnableManualAcknowledgment && pb.QoS != 0 { + if c.config.EnableManualAcknowledgment && pb.QoS != 0 { c.acksTracker.add(pb) } @@ -439,7 +439,7 @@ func (c *Client) routePublishPackets() { errs = append(errs, err) } - if !c.ClientConfig.EnableManualAcknowledgment { + if !c.config.EnableManualAcknowledgment { c.ack(pb) } } @@ -460,7 +460,7 @@ func (c *Client) incoming() { case <-c.stop: return default: - recv, err := packets.ReadPacket(c.Conn) + recv, err := packets.ReadPacket(c.config.Conn) if err != nil { go c.error(err) return @@ -475,15 +475,15 @@ func (c *Client) incoming() { ap := recv.Content.(*packets.Auth) switch ap.ReasonCode { case packets.AuthSuccess: - if c.AuthHandler != nil { - go c.AuthHandler.Authenticated() + if c.config.AuthHandler != nil { + go c.config.AuthHandler.Authenticated() } if c.authResponse != nil { c.authResponse <- *recv } case packets.AuthContinueAuthentication: - if c.AuthHandler != nil { - if _, err := c.AuthHandler.Authenticate(AuthFromPacketAuth(ap)).Packet().WriteTo(c.Conn); err != nil { + if c.config.AuthHandler != nil { + if _, err := c.config.AuthHandler.Authenticate(AuthFromPacketAuth(ap)).Packet().WriteTo(c.config.Conn); err != nil { go c.error(err) return } @@ -492,7 +492,7 @@ func (c *Client) incoming() { case packets.PUBLISH: pb := recv.Content.(*packets.Publish) if pb.QoS > 0 { // QOS1 or 2 need to be recorded in session state - c.Session.PacketReceived(recv, c.publishPackets) + c.config.Session.PacketReceived(recv, c.publishPackets) } else { c.debug.Printf("received QoS%d PUBLISH", pb.QoS) c.mu.Lock() @@ -506,16 +506,16 @@ func (c *Client) incoming() { } } case packets.PUBACK, packets.PUBCOMP, packets.SUBACK, packets.UNSUBACK, packets.PUBREC, packets.PUBREL: - c.Session.PacketReceived(recv, c.publishPackets) + c.config.Session.PacketReceived(recv, c.publishPackets) case packets.DISCONNECT: pd := recv.Content.(*packets.Disconnect) c.debug.Println("received DISCONNECT") if c.authResponse != nil { c.authResponse <- *recv } - c.Session.ConnectionLost(pd) // this may impact the session state + c.config.Session.ConnectionLost(pd) // this may impact the session state go func() { - if c.OnServerDisconnect != nil { + if c.config.OnServerDisconnect != nil { go c.serverDisconnect(DisconnectFromPacketDisconnect(pd)) } else { go c.error(fmt.Errorf("server initiated disconnect")) @@ -524,7 +524,7 @@ func (c *Client) incoming() { return case packets.PINGRESP: c.debug.Println("received PINGRESP") - c.PingHandler.PingResp() + c.config.PingHandler.PingResp() } } } @@ -545,15 +545,15 @@ func (c *Client) close() { close(c.stop) c.debug.Println("client stopped") - c.PingHandler.Stop() + c.config.PingHandler.Stop() c.debug.Println("ping stopped") - _ = c.Conn.Close() + _ = c.config.Conn.Close() c.debug.Println("conn closed") c.acksTracker.reset() c.debug.Println("acks tracker reset") - c.Session.ConnectionLost(nil) - if c.autoCloseSession { - if err := c.Session.Close(); err != nil { + c.config.Session.ConnectionLost(nil) + if c.config.autoCloseSession { + if err := c.config.Session.Close(); err != nil { c.errors.Println("error closing session", err) } } @@ -570,13 +570,13 @@ func (c *Client) close() { func (c *Client) error(e error) { c.debug.Println("error called:", e) c.close() - go c.OnClientError(e) + go c.config.OnClientError(e) } func (c *Client) serverDisconnect(d *Disconnect) { c.close() c.debug.Println("calling OnServerDisconnect") - go c.OnServerDisconnect(d) + go c.config.OnServerDisconnect(d) } // Authenticate is used to initiate a reauthentication of credentials with the @@ -601,7 +601,7 @@ func (c *Client) Authenticate(ctx context.Context, a *Auth) (*AuthResponse, erro }() c.debug.Println("sending AUTH") - if _, err := a.Packet().WriteTo(c.Conn); err != nil { + if _, err := a.Packet().WriteTo(c.config.Conn); err != nil { return nil, err } @@ -654,19 +654,19 @@ func (c *Client) Subscribe(ctx context.Context, s *Subscribe) (*Suback, error) { ret := make(chan packets.ControlPacket, 1) sp := s.Packet() - if err := c.Session.AddToSession(ctx, sp, ret); err != nil { + if err := c.config.Session.AddToSession(ctx, sp, ret); err != nil { return nil, err } // From this point on the message is in store, and ret will receive something regardless of whether we succeed in // writing the packet to the connection or not. - if _, err := sp.WriteTo(c.Conn); err != nil { + if _, err := sp.WriteTo(c.config.Conn); err != nil { // The packet will remain in the session state until `Session` is notified of the disconnection. return nil, err } c.debug.Println("waiting for SUBACK") - subCtx, cf := context.WithTimeout(ctx, c.PacketTimeout) + subCtx, cf := context.WithTimeout(ctx, c.config.PacketTimeout) defer cf() var sap packets.ControlPacket @@ -718,18 +718,18 @@ func (c *Client) Unsubscribe(ctx context.Context, u *Unsubscribe) (*Unsuback, er c.debug.Printf("unsubscribing from %+v", u.Topics) ret := make(chan packets.ControlPacket, 1) up := u.Packet() - if err := c.Session.AddToSession(ctx, up, ret); err != nil { + if err := c.config.Session.AddToSession(ctx, up, ret); err != nil { return nil, err } // From this point on the message is in store, and ret will receive something regardless of whether we succeed in // writing the packet to the connection or not - if _, err := up.WriteTo(c.Conn); err != nil { + if _, err := up.WriteTo(c.config.Conn); err != nil { // The packet will remain in the session state until `Session` is notified of the disconnection. return nil, err } - unsubCtx, cf := context.WithTimeout(ctx, c.PacketTimeout) + unsubCtx, cf := context.WithTimeout(ctx, c.config.PacketTimeout) defer cf() var uap packets.ControlPacket @@ -819,8 +819,8 @@ func (c *Client) PublishWithOptions(ctx context.Context, p *Publish, o PublishOp return nil, fmt.Errorf("cannot send a publish with no TopicAlias and no Topic set") } - if c.ClientConfig.PublishHook != nil { - c.ClientConfig.PublishHook(p) + if c.config.PublishHook != nil { + c.config.PublishHook(p) } c.debug.Printf("sending message to %s", p.Topic) @@ -830,7 +830,7 @@ func (c *Client) PublishWithOptions(ctx context.Context, p *Publish, o PublishOp switch p.QoS { case 0: c.debug.Println("sending QoS0 message") - if _, err := pb.WriteTo(c.Conn); err != nil { + if _, err := pb.WriteTo(c.config.Conn); err != nil { go c.error(err) return nil, err } @@ -844,17 +844,17 @@ func (c *Client) PublishWithOptions(ctx context.Context, p *Publish, o PublishOp func (c *Client) publishQoS12(ctx context.Context, pb *packets.Publish, o PublishOptions) (*PublishResponse, error) { c.debug.Println("sending QoS12 message") - pubCtx, cf := context.WithTimeout(ctx, c.PacketTimeout) + pubCtx, cf := context.WithTimeout(ctx, c.config.PacketTimeout) defer cf() ret := make(chan packets.ControlPacket, 1) - if err := c.Session.AddToSession(pubCtx, pb, ret); err != nil { + if err := c.config.Session.AddToSession(pubCtx, pb, ret); err != nil { return nil, err } // From this point on the message is in store, and ret will receive something regardless of whether we succeed in // writing the packet to the connection - if _, err := pb.WriteTo(c.Conn); err != nil { + if _, err := pb.WriteTo(c.config.Conn); err != nil { c.debug.Printf("failed to write packet %d to connection: %s", pb.PacketID, err) if o.Method == PublishMethod_AsyncSend { return nil, ErrNetworkErrorAfterStored // Async send, so we don't wait for the response (may add callbacks in the future to enable user to obtain status) @@ -909,7 +909,7 @@ func (c *Client) publishQoS12(ctx context.Context, pb *packets.Publish, o Publis } func (c *Client) expectConnack(packet chan<- *packets.Connack, errs chan<- error) { - recv, err := packets.ReadPacket(c.Conn) + recv, err := packets.ReadPacket(c.config.Conn) if err != nil { errs <- err return @@ -919,17 +919,17 @@ func (c *Client) expectConnack(packet chan<- *packets.Connack, errs chan<- error c.debug.Println("received CONNACK") if r.ReasonCode == packets.ConnackSuccess && r.Properties != nil && r.Properties.AuthMethod != "" { // Successful connack and AuthMethod is defined, must have successfully authed during connect - go c.AuthHandler.Authenticated() + go c.config.AuthHandler.Authenticated() } packet <- r case *packets.Auth: c.debug.Println("received AUTH") - if c.AuthHandler == nil { + if c.config.AuthHandler == nil { errs <- fmt.Errorf("enhanced authentication flow started but no AuthHandler configured") return } c.debug.Println("sending AUTH") - _, err := c.AuthHandler.Authenticate(AuthFromPacketAuth(r)).Packet().WriteTo(c.Conn) + _, err := c.config.AuthHandler.Authenticate(AuthFromPacketAuth(r)).Packet().WriteTo(c.config.Conn) if err != nil { errs <- fmt.Errorf("error sending authentication packet: %w", err) return @@ -948,7 +948,7 @@ func (c *Client) expectConnack(packet chan<- *packets.Connack, errs chan<- error // is closed. func (c *Client) Disconnect(d *Disconnect) error { c.debug.Println("disconnecting", d) - _, err := d.Packet().WriteTo(c.Conn) + _, err := d.Packet().WriteTo(c.config.Conn) c.close() @@ -990,12 +990,17 @@ idLoop: } } +// ClientID retrieves the client ID from the config (sometimes used in handlers that require the ID) +func (c *Client) ClientID() string { + return c.config.ClientID +} + // SetDebugLogger takes an instance of the paho Logger interface // and sets it to be used by the debug log endpoint func (c *Client) SetDebugLogger(l log.Logger) { c.debug = l - if c.autoCloseSession { // If we created the session store then it should use the same logger - c.Session.SetDebugLogger(l) + if c.config.autoCloseSession { // If we created the session store then it should use the same logger + c.config.Session.SetDebugLogger(l) } } @@ -1003,7 +1008,13 @@ func (c *Client) SetDebugLogger(l log.Logger) { // and sets it to be used by the error log endpoint func (c *Client) SetErrorLogger(l log.Logger) { c.errors = l - if c.autoCloseSession { // If we created the session store then it should use the same logger - c.Session.SetErrorLogger(l) + if c.config.autoCloseSession { // If we created the session store then it should use the same logger + c.config.Session.SetErrorLogger(l) } } + +// TerminateConnectionForTest closes the active connection (if any). This function is intended for testing only, it +// simulates connection loss which supports testing QOS1 and 2 message delivery. +func (c *Client) TerminateConnectionForTest() { + _ = c.config.Conn.Close() +} diff --git a/paho/client_test.go b/paho/client_test.go index 4888a2d..8043f25 100644 --- a/paho/client_test.go +++ b/paho/client_test.go @@ -23,9 +23,9 @@ func TestNewClient(t *testing.T) { c := NewClient(ClientConfig{}) require.NotNil(t, c) - require.NotNil(t, c.Session) - require.NotNil(t, c.Router) - require.NotNil(t, c.PingHandler) + require.NotNil(t, c.config.Session) + require.NotNil(t, c.config.Router) + require.NotNil(t, c.config.PingHandler) assert.Equal(t, uint16(65535), c.serverProps.ReceiveMaximum) assert.Equal(t, uint8(2), c.serverProps.MaximumQoS) @@ -41,7 +41,7 @@ func TestNewClient(t *testing.T) { assert.Equal(t, uint32(0), c.clientProps.MaximumPacketSize) assert.Equal(t, uint16(0), c.clientProps.TopicAliasMaximum) - assert.Equal(t, 10*time.Second, c.PacketTimeout) + assert.Equal(t, 10*time.Second, c.config.PacketTimeout) } func TestClientConnect(t *testing.T) { @@ -116,9 +116,9 @@ func TestClientSubscribe(t *testing.T) { }() go func() { defer c.workers.Done() - c.PingHandler.Start(c.Conn, 30*time.Second) + c.config.PingHandler.Start(c.config.Conn, 30*time.Second) }() - c.Session.ConAckReceived(c.Conn, &packets.Connect{}, &packets.Connack{}) + c.config.Session.ConAckReceived(c.config.Conn, &packets.Connect{}, &packets.Connack{}) s := &Subscribe{ Subscriptions: []SubscribeOptions{ @@ -161,9 +161,9 @@ func TestClientUnsubscribe(t *testing.T) { }() go func() { defer c.workers.Done() - c.PingHandler.Start(c.Conn, 30*time.Second) + c.config.PingHandler.Start(c.config.Conn, 30*time.Second) }() - c.Session.ConAckReceived(c.Conn, &packets.Connect{}, &packets.Connack{}) + c.config.Session.ConAckReceived(c.config.Conn, &packets.Connect{}, &packets.Connack{}) u := &Unsubscribe{ Topics: []string{ @@ -199,9 +199,9 @@ func TestClientPublishQoS0(t *testing.T) { }() go func() { defer c.workers.Done() - c.PingHandler.Start(c.Conn, 30*time.Second) + c.config.PingHandler.Start(c.config.Conn, 30*time.Second) }() - c.Session.ConAckReceived(c.Conn, &packets.Connect{}, &packets.Connack{}) + c.config.Session.ConAckReceived(c.config.Conn, &packets.Connect{}, &packets.Connack{}) p := &Publish{ Topic: "test/0", @@ -241,9 +241,9 @@ func TestClientPublishQoS1(t *testing.T) { }() go func() { defer c.workers.Done() - c.PingHandler.Start(c.Conn, 30*time.Second) + c.config.PingHandler.Start(c.config.Conn, 30*time.Second) }() - c.Session.ConAckReceived(c.Conn, &packets.Connect{}, &packets.Connack{}) + c.config.Session.ConAckReceived(c.config.Conn, &packets.Connect{}, &packets.Connack{}) p := &Publish{ Topic: "test/1", @@ -286,9 +286,9 @@ func TestClientPublishQoS2(t *testing.T) { }() go func() { defer c.workers.Done() - c.PingHandler.Start(c.Conn, 30*time.Second) + c.config.PingHandler.Start(c.config.Conn, 30*time.Second) }() - c.Session.ConAckReceived(c.Conn, &packets.Connect{}, &packets.Connack{}) + c.config.Session.ConAckReceived(c.config.Conn, &packets.Connect{}, &packets.Connack{}) p := &Publish{ Topic: "test/2", @@ -333,9 +333,9 @@ func TestClientReceiveQoS0(t *testing.T) { }() go func() { defer c.workers.Done() - c.PingHandler.Start(c.Conn, 30*time.Second) + c.config.PingHandler.Start(c.config.Conn, 30*time.Second) }() - c.Session.ConAckReceived(c.Conn, &packets.Connect{}, &packets.Connack{}) + c.config.Session.ConAckReceived(c.config.Conn, &packets.Connect{}, &packets.Connack{}) go c.routePublishPackets() err := ts.SendPacket(&packets.Publish{ @@ -380,9 +380,9 @@ func TestClientReceiveQoS1(t *testing.T) { }() go func() { defer c.workers.Done() - c.PingHandler.Start(c.Conn, 30*time.Second) + c.config.PingHandler.Start(c.config.Conn, 30*time.Second) }() - c.Session.ConAckReceived(c.Conn, &packets.Connect{}, &packets.Connack{}) + c.config.Session.ConAckReceived(c.config.Conn, &packets.Connect{}, &packets.Connack{}) go c.routePublishPackets() err := ts.SendPacket(&packets.Publish{ @@ -428,9 +428,9 @@ func TestClientReceiveQoS2(t *testing.T) { }() go func() { defer c.workers.Done() - c.PingHandler.Start(c.Conn, 30*time.Second) + c.config.PingHandler.Start(c.config.Conn, 30*time.Second) }() - c.Session.ConAckReceived(c.Conn, &packets.Connect{}, &packets.Connack{}) + c.config.Session.ConAckReceived(c.config.Conn, &packets.Connect{}, &packets.Connack{}) go c.routePublishPackets() err := ts.SendPacket(&packets.Publish{ @@ -645,9 +645,9 @@ func TestReceiveServerDisconnect(t *testing.T) { }() go func() { defer c.workers.Done() - c.PingHandler.Start(c.Conn, 30*time.Second) + c.config.PingHandler.Start(c.config.Conn, 30*time.Second) }() - c.Session.ConAckReceived(c.Conn, &packets.Connect{}, &packets.Connack{}) + c.config.Session.ConAckReceived(c.config.Conn, &packets.Connect{}, &packets.Connack{}) err := ts.SendPacket(&packets.Disconnect{ ReasonCode: packets.DisconnectServerShuttingDown, @@ -686,9 +686,9 @@ func TestAuthenticate(t *testing.T) { }() go func() { defer c.workers.Done() - c.PingHandler.Start(c.Conn, 30*time.Second) + c.config.PingHandler.Start(c.config.Conn, 30*time.Second) }() - c.Session.ConAckReceived(c.Conn, &packets.Connect{}, &packets.Connack{}) + c.config.Session.ConAckReceived(c.config.Conn, &packets.Connect{}, &packets.Connack{}) ctx, cf := context.WithTimeout(context.Background(), 5*time.Second) defer cf() diff --git a/paho/extensions/rpc/rpc.go b/paho/extensions/rpc/rpc.go index 02c3acc..2ce0e7e 100644 --- a/paho/extensions/rpc/rpc.go +++ b/paho/extensions/rpc/rpc.go @@ -23,7 +23,7 @@ func NewHandler(ctx context.Context, c *paho.Client) (*Handler, error) { correlData: make(map[string]chan *paho.Publish), } - responseTopic := fmt.Sprintf("%s/responses", c.ClientID) + responseTopic := fmt.Sprintf("%s/responses", c.ClientID()) c.AddOnPublishReceived(func(pr paho.PublishReceived) (bool, error) { if pr.Packet.Topic == responseTopic { h.responseHandler(pr.Packet) @@ -34,7 +34,7 @@ func NewHandler(ctx context.Context, c *paho.Client) (*Handler, error) { _, err := c.Subscribe(ctx, &paho.Subscribe{ Subscriptions: []paho.SubscribeOptions{ - {Topic: fmt.Sprintf("%s/responses", c.ClientID), QoS: 1}, + {Topic: fmt.Sprintf("%s/responses", c.ClientID()), QoS: 1}, }, }) if err != nil { @@ -72,7 +72,7 @@ func (h *Handler) Request(ctx context.Context, pb *paho.Publish) (*paho.Publish, } pb.Properties.CorrelationData = []byte(cID) - pb.Properties.ResponseTopic = fmt.Sprintf("%s/responses", h.c.ClientID) + pb.Properties.ResponseTopic = fmt.Sprintf("%s/responses", h.c.ClientID()) pb.Retain = false _, err := h.c.Publish(ctx, pb) diff --git a/paho/packet_ids_test.go b/paho/packet_ids_test.go index 4b0dd76..91410e0 100644 --- a/paho/packet_ids_test.go +++ b/paho/packet_ids_test.go @@ -31,8 +31,8 @@ func TestPackedIdNoExhaustion(t *testing.T) { c.stop = make(chan struct{}) c.publishPackets = make(chan *packets.Publish) go c.incoming() - go c.PingHandler.Start(c.Conn, 30*time.Second) - c.Session.ConAckReceived(c.Conn, &packets.Connect{}, &packets.Connack{}) + go c.config.PingHandler.Start(c.config.Conn, 30*time.Second) + c.config.Session.ConAckReceived(c.config.Conn, &packets.Connect{}, &packets.Connack{}) for i := 0; i < 70000; i++ { p := &Publish{