Skip to content
This repository has been archived by the owner on Jan 17, 2020. It is now read-only.

Re-subscribe after reconnect #85

Open
dbrgn opened this issue Apr 10, 2018 · 28 comments
Open

Re-subscribe after reconnect #85

dbrgn opened this issue Apr 10, 2018 · 28 comments

Comments

@dbrgn
Copy link

dbrgn commented Apr 10, 2018

I've experienced the situation a few times now that the connection would reconnect, but no messages would be received afterwards.

(reverse log)

Apr 10 01:50:26 sepp smartmail[8637]: INFO 2018-04-09T23:50:26Z: rumqtt::client::connection: mqtt connection successful
Apr 10 01:50:26 sepp smartmail[8637]: INFO 2018-04-09T23:50:26Z: rumqtt::client::connection: Address resolved to Some(V4(52.169.76.255:1883))
Apr 10 01:50:23 sepp smartmail[8637]: INFO 2018-04-09T23:50:23Z: rumqtt::client: Will sleep for Duration { secs: 3, nanos: 0 } seconds before reconnecting
Apr 10 01:50:23 sepp smartmail[8637]: ERROR 2018-04-09T23:50:23Z: rumqtt::client: Network connection failed. Error = Outgoing, Connection count = ConnectedBefo
Apr 10 01:50:23 sepp smartmail[8637]: ERROR 2018-04-09T23:50:23Z: rumqtt::client::connection: Reactor stopped. e = Outgoing
Apr 10 01:50:23 sepp smartmail[8637]: ERROR 2018-04-09T23:50:23Z: rumqtt::client::connection: Handling outgoing packet failed. Error = AwaitPingResp
Apr 10 01:50:23 sepp smartmail[8637]: ERROR 2018-04-09T23:50:23Z: rumqtt::client::state: Error awaiting for last ping response
Apr 10 01:50:23 sepp smartmail[8637]: DEBUG 2018-04-09T23:50:23Z: rumqtt::client::state: Last outgoing before 9 seconds. Last incoming packet before 69 seconds
Apr 10 01:50:13 sepp smartmail[8637]: DEBUG 2018-04-09T23:50:13Z: rumqtt::client::connection: Sending packet. Pingreq

Afterwards, ping messages are sent and received, but no messages from the server arrive. This would suggest that the subscriptions are not re-established.

@tekjar
Copy link

tekjar commented Apr 10, 2018

@dbrgn Yeah. Resubscribe is not implemented for clean sessions. I'll add that feature when I start working on rumqtt again.

@dbrgn
Copy link
Author

dbrgn commented Apr 10, 2018

Cool! Is there a workaround I can implement in the meantime?

(I guess I'll just restart the service every 15 min 🙂)

By the way, that's probably an important limitation that should be mentioned in the README.

@tekjar
Copy link

tekjar commented Apr 10, 2018

How about using clean_session=false?

@dbrgn
Copy link
Author

dbrgn commented Apr 11, 2018

Ah, I didn't realize that this was a configurable option. Thanks!

dbrgn added a commit to dbrgn/smartmail that referenced this issue Apr 11, 2018
@hmvp
Copy link

hmvp commented Jun 1, 2018

I am not sure if this feature request is actually wanted. I would say that documenting this behavior and giving a way to detect reconnects should be enough. If it would be implemented it should be configurable.

This also because it gives a false sense of security: When using clean session the subscribe will always take some time. Any message that is received by the broker in between the disconnect and the subscribe is lost for the client... Thus if you need to be sure that you receive stuff you need clean_session=false If you need clean_session=true there is a reasonable chance that you want to change your subscriptions anyway..

@tekjar
Copy link

tekjar commented Nov 16, 2018

@dbrgn I kinda agree with @hmvp here. Automatic resubscription during reconnections might create confusion when people want their subscriptions to be reset during reconnections. User resubscribing manually when he receives Disconnect notification might be a better idea here

@fooker
Copy link

fooker commented Dec 28, 2018

Usually, libraries provide some kind of connected-callback which can be used to (re)subscribe. AFAIK currently there is no way of getting those events or am I missing something?

@tekjar
Copy link

tekjar commented Dec 31, 2018

@fooker I'll make changes to enable disconnect and reconnect event notifications. Next week most probably

@bbigras
Copy link
Contributor

bbigras commented Feb 5, 2019

I'm using set_reconnect_opts(ReconnectOptions::Always(10)) and set_clean_session(false).

When I kill the broker (mosquitto), it reconnects and I see that the keepalives are working but I don't receive data anymore.

I'm using 3ecd3fe

[2019-02-05T18:09:12Z DEBUG rumqtt::client::mqttstate] Ping = Ping. keep alive = 10,
            last incoming packet before 9 secs,
            last outgoing packet before 15 secs
[2019-02-05T18:09:12Z DEBUG tokio_reactor] dropping I/O source: 0
[2019-02-05T18:09:12Z ERROR rumqtt::client::connection] Event loop returned. Error = NetworkStreamClosed

[2019-02-05T18:09:22Z INFO  rumqtt::client::connection] Mqtt connect response = Some(Connack(Connack { session_present: false, code: Accepted }))
[2019-02-05T18:09:22Z INFO  rumqtt::client::connection] Mqtt connection successful!!
[2019-02-05T18:09:32Z DEBUG rumqtt::client::mqttstate] Ping = Ping. keep alive = 10,
            last incoming packet before 10 secs,
            last outgoing packet before 10 secs
[2019-02-05T18:09:32Z DEBUG rumqtt::client::connection] Incoming packet = "Pingresp"
[2019-02-05T18:09:42Z DEBUG rumqtt::client::mqttstate] Ping = Ping. keep alive = 10,
            last incoming packet before 10 secs,
            last outgoing packet before 10 secs
[2019-02-05T18:09:42Z DEBUG rumqtt::client::connection] Incoming packet = "Pingresp"
[2019-02-05T18:09:52Z DEBUG rumqtt::client::mqttstate] Ping = Ping. keep alive = 10,
            last incoming packet before 9 secs,
            last outgoing packet before 10 secs
[2019-02-05T18:09:52Z DEBUG rumqtt::client::connection] Incoming packet = "Pingresp"
[2019-02-05T18:10:02Z DEBUG rumqtt::client::mqttstate] Ping = Ping. keep alive = 10,
            last incoming packet before 9 secs,
            last outgoing packet before 10 secs
[2019-02-05T18:10:02Z DEBUG rumqtt::client::connection] Incoming packet = "Pingresp"
[2019-02-05T18:10:12Z DEBUG rumqtt::client::mqttstate] Ping = Ping. keep alive = 10,
            last incoming packet before 10 secs,
            last outgoing packet before 10 secs
[2019-02-05T18:10:12Z DEBUG rumqtt::client::connection] Incoming packet = "Pingresp"

@tekjar
Copy link

tekjar commented Feb 5, 2019

I'm assuming you are restarting the broker. Unless broker writes session subscription to disk and reuses it in the next restart, you won't see data. cleansession is more in terms of n/w disconnection, not broker crashes

@bbigras
Copy link
Contributor

bbigras commented Feb 5, 2019

You mean for the retain stuff right?

I mean that after I restart the broker, I publish again but I don't receive it on the subscribed side.

@tekjar
Copy link

tekjar commented Feb 5, 2019

Unless you don't resubscribe again after broker restart, it won't work

@bbigras
Copy link
Contributor

bbigras commented Feb 5, 2019

Oh. I totally misread your last comment. Sorry. and thanks!

I guess it's a bad idea for me to use clean_session=false then.

@tekjar
Copy link

tekjar commented Feb 5, 2019

clean_session=false is the way to make broker remember subscriptions after network reconnections. Otherwise, broker will clean all the client state.

@bbigras
Copy link
Contributor

bbigras commented Feb 5, 2019

Is there a way to ensure being resubscribed after a broker crash? I think maybe ReconnectOptions::never and opening a new connection in a loop.

@tekjar
Copy link

tekjar commented Feb 5, 2019

This isn't usually a problem with gcloud/aws/azure SaaS brokers.

I'm not sure if rumqtt is sending a Disconnect/Reconnect events on notification channel. Idea is to leverage those.

@opensourcegeek
Copy link

Just wanted to check, is this behavior only applicable for subscribe/resubscribe? I've noticed that in poor connections publish doesn't work until the process is restarted but it could be anything in our network. Since this issue is so close to what I'm seeing just want to make sure that it's not related to publish. I only publish data from clients at the moment.

@tekjar
Copy link

tekjar commented May 22, 2019

@opensourcegeek Yeah. Just applicable to resubscriptions. Can you open a new issue and provide more details on the problem. I might not be able to work immediately but I'll be spending a lot more time on this crate from June

@TotalKrill
Copy link
Contributor

I will base my opinion on the assumption that MQTT systems are long running message brokers/notifications systems, thus from a user perspective, when subscribed to a topic, I will be getting messages on that topic until such time I manually unsubscribe from them.

Being able to react on network disconnect manually and then resubscribing seems like a workaround. Personally since the default for clean_session is "false", which is fine otherwise it will have an effect and memory footprint on the broker side, I think an option for automatic resubscription should be included and should default to true.

That said, being able to react to disconnects would be great anyway, since there are brokers can communicate permission errors by disconnecting the client, this is application specific i would say though, so if the library allows for reaction, that would be fine.

@opensourcegeek
Copy link

@tekjar Thanks for getting back - I don't think it's a problem with this library or at least I haven't got enough evidence to say this library is the issue. In my case I don't think broker restarted, instead the connectivity from client to server is pretty bad and once connectivity is re-established on network level the library doesn't seem to publish data. However there are various other parts to that system and it could be because of any of the other moving parts the data isn't getting pushed. All I wanted to check was all the conversation in this thread about bad connectivity whether it would be applicable for publishing data too. If so I'll give more attention and perhaps raise another ticket otherwise I'll rule this out for now in my investigation.

@michaelmarconi
Copy link

I'm struggling with the issue of resubscription on network disconnects. I've read through this issue and issue 143 and I'm none the wiser!

In my case, after an initial successful connection, I see this message:

2019-09-05T07:56:18Z ERROR rumqtt::client::connection] Event loop returned. Error = NetworkStreamClosed
[2019-09-05T07:56:28Z INFO  rumqtt::client::connection] Mqtt connect response = Some(Connack(Connack { session_present: false, code: Accepted }))
[2019-09-05T07:56:28Z INFO  rumqtt::client::connection] Mqtt connection successful!!

And that appears to drop all of my subscriptions, as I never hear from the MQTT broker again (Google IOT Core in my case). Note that my client is setting .set_clean_session(false).

@tekjar says:

Reconnection notifications and notification drop handling are part of master now.

I can't see any documentation about how to receive these however, so I don't know how to handle the disconnects myself.

Can anyone point me in the right direction please?

@TotalKrill
Copy link
Contributor

Which version are you using?

@michaelmarconi
Copy link

@TotalKrill I'm using 0.30.1, which is what cargo search turned up.

@tekjar
Copy link

tekjar commented Sep 5, 2019

@michaelmarconi Automatic resubscription isn't implemented yet. But I'm not sure if that's the problem you are facing. Are you connecting with clean_session = false so that broker remembers the client after disconnection?

@tekjar tekjar closed this as completed Sep 5, 2019
@tekjar tekjar reopened this Sep 5, 2019
@michaelmarconi
Copy link

michaelmarconi commented Sep 5, 2019

Hi @tekjar, I believe I'm setting clean_session to false:

    // MQTT options
    let mqtt_broker_host = env::var("MQTT_BROKER_HOST").expect("The MQTT broker host environment setting is missing!");
    let mqtt_broker_port: u16 = env::var("MQTT_BROKER_PORT").expect("The MQTT broker port environment setting is missing!").parse().unwrap();
    let mqtt_heartbeat_interval: u16 = env::var("MQTT_HEARTBEAT_INTERVAL").expect("The MQTT heartbeat interval environment setting is missing!").parse().unwrap();
    let mqtt_reconnect_interval: u64 = env::var("MQTT_RECONNECT_INTERVAL").expect("The MQTT heartbeat interval environment setting is missing!").parse().unwrap();
    let mqtt_options = MqttOptions::new(client_id, mqtt_broker_host, mqtt_broker_port)
        .set_connection_method(ConnectionMethod::Tls(root_cert, None))
        .set_keep_alive(mqtt_heartbeat_interval)
        .set_clean_session(false) // <-- IS THIS THE CORRECT SETTING?
        .set_reconnect_opts(ReconnectOptions::Always(mqtt_reconnect_interval))
        .set_security_opts(SecurityOptions::GcloudIot(
            String::from(project_id),
            private_key,
            60,
        ));

    // Start MQTT client
    let (mut mqtt_client, notifications) = MqttClient::start(mqtt_options).expect("Failed to connect to MQTT broker!");

Here's a recent set of logs:

2019-09-05T12:16:52Z INFO  rumqtt::client::connection] Mqtt connect response = Some(Connack(Connack { session_present: false, code: Accepted }))
[2019-09-05T12:16:52Z INFO  rumqtt::client::connection] Mqtt connection successful!!
[2019-09-05T12:16:52Z INFO  rumqtt::client::mqttstate] Subscribe. Topics = [SubscribeTopic { topic_path: "/devices/a7137d512884f41c/config", qos: AtLeastOnce }], Pkid = PacketIdentifier(1)
[2019-09-05T12:16:52Z DEBUG rumqtt::client::connection] Outgoing request = "Subscribe(Subscribe { pkid: PacketIdentifier(1), topics: [SubscribeTopic { topic_path: \"/devices/a7137d512884f41c/config\", qos: AtLeastOnce }] })"
[2019-09-05T12:16:52Z INFO  rumqtt::client::mqttstate] Subscribe. Topics = [SubscribeTopic { topic_path: "/devices/a7137d512884f41c/commands/#", qos: AtLeastOnce }], Pkid = PacketIdentifier(2)
[2019-09-05T12:16:52Z DEBUG rumqtt::client::connection] Outgoing request = "Subscribe(Subscribe { pkid: PacketIdentifier(2), topics: [SubscribeTopic { topic_path: \"/devices/a7137d512884f41c/commands/#\", qos: AtLeastOnce }] })"
[2019-09-05T12:16:52Z DEBUG rumqtt::client::connection] Incoming packet = "Suback(Suback { pkid: PacketIdentifier(1), return_codes: [Success(AtLeastOnce)] })"
[2019-09-05T12:16:52Z DEBUG rumqtt::client::connection] Incoming packet = "Suback(Suback { pkid: PacketIdentifier(2), return_codes: [Success(AtLeastOnce)] })"
[2019-09-05T12:16:52Z DEBUG rumqtt::client::connection] Incoming packet = "topic = /devices/a7137d512884f41c/config, qos = AtLeastOnce, pkid = Some(PacketIdentifier(1)), payload size = 137 bytes"
[2019-09-05T12:16:52Z DEBUG rumqtt::client::connection] Incoming packet = "topic = /devices/a7137d512884f41c/config, qos = AtLeastOnce, pkid = Some(PacketIdentifier(2)), payload size = 137 bytes"
[2019-09-05T12:16:52Z DEBUG rumqtt::client::connection] Outgoing request = "topic = /devices/a7137d512884f41c/state, qos = AtLeastOnce, pkid = Some(PacketIdentifier(3)), payload size = 170 bytes"
[2019-09-05T12:16:52Z DEBUG rumqtt::client::connection] Incoming packet = "Puback(PacketIdentifier(3))"
[2019-09-05T12:17:02Z DEBUG rumqtt::client::mqttstate] Ping = None. keep alive = 10,
            last incoming packet before 9 secs,
            last outgoing packet before 9 secs
[2019-09-05T12:17:08Z DEBUG rumqtt::client::mqttstate] Ping = Ping. keep alive = 10,
            last incoming packet before 15 secs,
            last outgoing packet before 5 secs
[2019-09-05T12:17:08Z ERROR rumqtt::client::connection] Event loop returned. Error = NetworkStreamClosed
[2019-09-05T12:17:18Z INFO  rumqtt::client::connection] Mqtt connect response = Some(Connack(Connack { session_present: false, code: Accepted }))
[2019-09-05T12:17:18Z INFO  rumqtt::client::connection] Mqtt connection successful!!
[2019-09-05T12:17:28Z DEBUG rumqtt::client::mqttstate] Ping = Ping. keep alive = 10,
            last incoming packet before 10 secs,
            last outgoing packet before 10 secs

I'm cutting the network connection to my development machine (see the error at 2019-09-05T12:17:08Z) and then plugging it in again. The client reconnects to the Google IOT Core MQTT broker but the state doesn't appear to be restored, as I never receive any further notifications from the broker, which I am sending manually to be sure.

Looking at the logs at the reconnect event (Some(Connack(Connack { session_present: false, code: Accepted }))), it looks to me like the Google MQTT broker isn't persisting session data. Indeed, it seems that their documentation indicates that persistent sessions are not supported.

Automatic resubscription isn't implemented yet.

Is there a way I can register for disconnect notifications so I can resubscribe to the topics manually instead?

@michaelmarconi
Copy link

michaelmarconi commented Sep 5, 2019

@tekjar, I just noticed your comment that states that the disconnect handling is implemented on master.

I've just tried pulling the master branch into my cargo dependencies but it won't compile:

 Compiling rumqtt v0.31.0 (https://github.com/AtherEnergy/rumqtt#eadd783a)
error[E0308]: mismatched types
   --> .../.cargo/git/checkouts/rumqtt-97ac1717aa98c207/eadd783/src/client/network.rs:130:34
    |
130 |             config.set_protocols(&self.alpn_protocols);
    |                                  ^^^^^^^^^^^^^^^^^^^^ expected slice, found struct `std::vec::Vec`
    |
    = note: expected type `&[std::string::String]`
               found type `&std::vec::Vec<std::vec::Vec<u8>>`

Any chance you could release a new version with an example of how to register for disconnection notices?

@tekjar
Copy link

tekjar commented Sep 6, 2019

Looking at the logs at the reconnect event (Some(Connack(Connack { session_present: false, code: Accepted }))), it looks to me like the Google MQTT broker isn't persisting session data. Indeed, it seems that their documentation indicates that persistent sessions are not supported.

Oh wow. I didn't know of this

I've just tried pulling the master branch into my cargo dependencies but it won't compile:

Remove your target and Cargo.lock and rebuild

@michaelmarconi
Copy link

Thanks @tekjar, that did the trick. I'm receiving the Reconnection notification now and resubscribing to topics when that happens.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

8 participants