From 45740e834359a50d05c5d285bf42c62b134fa968 Mon Sep 17 00:00:00 2001 From: fpagliughi Date: Wed, 9 Aug 2023 20:46:22 -0400 Subject: [PATCH] Minor cleanup of subscriber examples. --- examples/async_publish.rs | 3 ++- examples/async_subscribe.rs | 14 +++++++++++--- examples/async_subscribe_v5.rs | 13 +++++++++++-- examples/dyn_subscribe.rs | 2 +- examples/legacy_async_subscribe.rs | 2 +- examples/sync_consume.rs | 22 +++++++++++++++------- examples/sync_consume_v5.rs | 17 +++++++++-------- 7 files changed, 50 insertions(+), 23 deletions(-) diff --git a/examples/async_publish.rs b/examples/async_publish.rs index 64ab6bb..e7e4cbb 100644 --- a/examples/async_publish.rs +++ b/examples/async_publish.rs @@ -41,6 +41,8 @@ fn main() { .nth(1) .unwrap_or_else(|| "mqtt://localhost:1883".to_string()); + println!("Connecting to the MQTT server at '{}'", host); + // Create the client let cli = mqtt::AsyncClient::new(host).unwrap_or_else(|err| { println!("Error creating the client: {}", err); @@ -50,7 +52,6 @@ fn main() { if let Err(err) = block_on(async { // Connect with default options and wait for it to complete or fail // The default is an MQTT v3.x connection. - println!("Connecting to the MQTT server"); cli.connect(None).await?; // Create a message and publish it diff --git a/examples/async_subscribe.rs b/examples/async_subscribe.rs index 9894982..a1c8844 100644 --- a/examples/async_subscribe.rs +++ b/examples/async_subscribe.rs @@ -9,13 +9,16 @@ //! The sample demonstrates: //! - An async/await subscriber //! - Connecting to an MQTT server/broker. -//! - Subscribing to a topic +//! - Subscribing to topics //! - Receiving messages from an async stream. //! - Handling disconnects and attempting manual reconnects. //! - Using a "persistent" (non-clean) session so the broker keeps //! subscriptions and messages through reconnects. //! - Last will and testament //! +//! Note that this example specifically does *not* handle a ^C, so breaking +//! out of the app will always result in an un-clean disconnect causing the +//! broker to emit the LWT message. /******************************************************************************* * Copyright (c) 2017-2023 Frank Pagliughi @@ -51,6 +54,8 @@ fn main() { .nth(1) .unwrap_or_else(|| "mqtt://localhost:1883".to_string()); + println!("Connecting to the MQTT server at '{}'...", host); + // Create the client. Use a Client ID for a persistent session. // A real system should try harder to use a unique ID. let create_opts = mqtt::CreateOptionsBuilder::new_v3() @@ -69,7 +74,11 @@ fn main() { let mut strm = cli.get_stream(25); // Define the set of options for the connection - let lwt = mqtt::Message::new("test", "Async subscriber lost connection", mqtt::QOS_1); + let lwt = mqtt::Message::new( + "test/lwt", + "[LWT] Async subscriber lost connection", + mqtt::QOS_1, + ); // Create the connect options, explicitly requesting MQTT v3.x let conn_opts = mqtt::ConnectOptionsBuilder::new_v3() @@ -79,7 +88,6 @@ fn main() { .finalize(); // Make the connection to the broker - println!("Connecting to the MQTT server..."); cli.connect(conn_opts).await?; println!("Subscribing to topics: {:?}", TOPICS); diff --git a/examples/async_subscribe_v5.rs b/examples/async_subscribe_v5.rs index 6578869..295c0eb 100644 --- a/examples/async_subscribe_v5.rs +++ b/examples/async_subscribe_v5.rs @@ -4,6 +4,7 @@ // //! This application is an MQTT subscriber using the asynchronous client //! interface of the Paho Rust client library. +//! //! It also monitors for disconnects and performs manual re-connections. //! //! The sample demonstrates: @@ -17,6 +18,9 @@ //! subscriptions and messages through reconnects. //! - Last will and testament //! +//! Note that this example specifically does *not* handle a ^C, so breaking +//! out of the app will always result in an un-clean disconnect causing the +//! broker to emit the LWT message. /******************************************************************************* * Copyright (c) 2017-2023 Frank Pagliughi @@ -52,6 +56,8 @@ fn main() { .nth(1) .unwrap_or_else(|| "mqtt://localhost:1883".to_string()); + println!("Connecting to the MQTT server at '{}'...", host); + // Create the client. Use an ID for a persistent session. // A real system should try harder to use a unique ID. let create_opts = mqtt::CreateOptionsBuilder::new() @@ -70,7 +76,11 @@ fn main() { let mut strm = cli.get_stream(25); // Define the set of options for the connection - let lwt = mqtt::Message::new("test", "Async subscriber lost connection", mqtt::QOS_1); + let lwt = mqtt::Message::new( + "test/lwt", + "[LWT] Async subscriber v5 lost connection", + mqtt::QOS_1, + ); // Connect with MQTT v5 and a persistent server session (no clean start). // For a persistent v5 session, we must set the Session Expiry Interval @@ -83,7 +93,6 @@ fn main() { .finalize(); // Make the connection to the broker - println!("Connecting to the MQTT server..."); cli.connect(conn_opts).await?; println!("Subscribing to topics: {:?}", TOPICS); diff --git a/examples/dyn_subscribe.rs b/examples/dyn_subscribe.rs index 7db7138..0b1f5dc 100644 --- a/examples/dyn_subscribe.rs +++ b/examples/dyn_subscribe.rs @@ -155,7 +155,7 @@ fn main() { }); // Define the set of options for the connection - let lwt = mqtt::Message::new("test", "Dynamic subscriber lost connection", 1); + let lwt = mqtt::Message::new("test/lwt", "[LWT] Dynamic subscriber lost connection", 1); // The connect options. Defaults to an MQTT v3.x connection. let conn_opts = mqtt::ConnectOptionsBuilder::new() diff --git a/examples/legacy_async_subscribe.rs b/examples/legacy_async_subscribe.rs index 17097f4..6d786d0 100644 --- a/examples/legacy_async_subscribe.rs +++ b/examples/legacy_async_subscribe.rs @@ -115,7 +115,7 @@ fn main() { }); // Define the set of options for the connection - let lwt = mqtt::Message::new("test", "Async subscriber lost connection", 1); + let lwt = mqtt::Message::new("test/lwt", "[LWT] Async subscriber lost connection", 1); let conn_opts = mqtt::ConnectOptionsBuilder::new_v3() .keep_alive_interval(Duration::from_secs(20)) diff --git a/examples/sync_consume.rs b/examples/sync_consume.rs index a218ee0..e80b234 100644 --- a/examples/sync_consume.rs +++ b/examples/sync_consume.rs @@ -45,11 +45,11 @@ use std::{env, process, thread, time::Duration}; // trying indefinitely, with a backoff, or something like that. fn try_reconnect(cli: &mqtt::Client) -> bool { - println!("Connection lost. Waiting to retry connection"); - for _ in 0..12 { - thread::sleep(Duration::from_millis(5000)); + println!("Connection lost. Reconnecting..."); + for _ in 0..60 { + thread::sleep(Duration::from_secs(1)); if cli.reconnect().is_ok() { - println!("Successfully reconnected"); + println!(" Successfully reconnected"); return true; } } @@ -67,6 +67,8 @@ fn main() { .nth(1) .unwrap_or_else(|| "mqtt://localhost:1883".to_string()); + println!("Connecting to the MQTT broker at '{}'...", host); + // Create the client. Use an ID for a persistent session. // A real system should try harder to use a unique ID. let create_opts = mqtt::CreateOptionsBuilder::new() @@ -98,7 +100,6 @@ fn main() { let qos = [1, 1]; // Make the connection to the broker - println!("Connecting to the MQTT broker..."); match cli.connect(conn_opts) { Ok(rsp) => { if let Some(conn_rsp) = rsp.connect_response() { @@ -107,11 +108,18 @@ fn main() { conn_rsp.server_uri, conn_rsp.mqtt_version ); if conn_rsp.session_present { + // Since our persistent session is already on the broker + // we don't need to subscribe to the topics. println!(" w/ client session already present on broker."); } else { - // Register subscriptions on the server - println!("Subscribing to topics with requested QoS: {:?}...", qos); + // The server doesn't have a persistent session already + // stored for us (1st connection?), so we need to subscribe + // to the topics we want to receive. + println!( + "Subscribing to topics {:?} with requested QoS: {:?}...", + subscriptions, qos + ); cli.subscribe_many(&subscriptions, &qos) .and_then(|rsp| { diff --git a/examples/sync_consume_v5.rs b/examples/sync_consume_v5.rs index f17d691..5895e82 100644 --- a/examples/sync_consume_v5.rs +++ b/examples/sync_consume_v5.rs @@ -74,11 +74,11 @@ fn command_handler(msg: mqtt::Message) -> bool { // trying indefinitely, with a backoff, or something like that. fn try_reconnect(cli: &mqtt::Client) -> bool { - println!("Connection lost. Waiting to retry connection"); - for _ in 0..12 { - thread::sleep(Duration::from_millis(5000)); + println!("Connection lost. Attempting to reconnect..."); + for _ in 0..60 { + thread::sleep(Duration::from_secs(1)); if cli.reconnect().is_ok() { - println!("Successfully reconnected"); + println!(" Successfully reconnected"); return true; } } @@ -103,6 +103,8 @@ fn main() -> mqtt::Result<()> { .nth(1) .unwrap_or_else(|| "mqtt://localhost:1883".to_string()); + println!("Connecting to the MQTT broker at '{}'...", host); + // Create the client. Use an ID for a persistent session. // A real system should try harder to use a unique ID. let create_opts = mqtt::CreateOptionsBuilder::new() @@ -136,12 +138,11 @@ fn main() -> mqtt::Result<()> { let handler: Vec bool> = vec![data_handler, command_handler]; // Make the connection to the broker - let rsp = cli.connect(conn_opts)?; // We're connecting with a persistent session. So we check if - // the server already knows about us and rembers about out - // subscription(s). If not, we subscribe for incoming requests. + // the server already knows about us and rembers our subscription(s). + // If not, we subscribe for incoming requests. if let Some(conn_rsp) = rsp.connect_response() { println!( @@ -154,7 +155,7 @@ fn main() -> mqtt::Result<()> { } else { // Register subscriptions on the server, using Subscription ID's. - println!("Subscribing to topics..."); + println!(r#"Subscribing to topics ["data/#", "command"]..."#); cli.subscribe_with_options("data/#", 0, None, sub_id(1))?; cli.subscribe_with_options("command", 1, None, sub_id(2))?; }