Skip to content

Commit

Permalink
Minor cleanup of subscriber examples.
Browse files Browse the repository at this point in the history
  • Loading branch information
fpagliughi committed Aug 10, 2023
1 parent 058c4f8 commit 45740e8
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 23 deletions.
3 changes: 2 additions & 1 deletion examples/async_publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ fn main() {
.nth(1)
.unwrap_or_else(|| "mqtt://localhost:1883".to_string());

println!("Connecting to the MQTT server at '{}'", host);

// Create the client
let cli = mqtt::AsyncClient::new(host).unwrap_or_else(|err| {
println!("Error creating the client: {}", err);
Expand All @@ -50,7 +52,6 @@ fn main() {
if let Err(err) = block_on(async {
// Connect with default options and wait for it to complete or fail
// The default is an MQTT v3.x connection.
println!("Connecting to the MQTT server");
cli.connect(None).await?;

// Create a message and publish it
Expand Down
14 changes: 11 additions & 3 deletions examples/async_subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@
//! The sample demonstrates:
//! - An async/await subscriber
//! - Connecting to an MQTT server/broker.
//! - Subscribing to a topic
//! - Subscribing to topics
//! - Receiving messages from an async stream.
//! - Handling disconnects and attempting manual reconnects.
//! - Using a "persistent" (non-clean) session so the broker keeps
//! subscriptions and messages through reconnects.
//! - Last will and testament
//!
//! Note that this example specifically does *not* handle a ^C, so breaking
//! out of the app will always result in an un-clean disconnect causing the
//! broker to emit the LWT message.
/*******************************************************************************
* Copyright (c) 2017-2023 Frank Pagliughi <fpagliughi@mindspring.com>
Expand Down Expand Up @@ -51,6 +54,8 @@ fn main() {
.nth(1)
.unwrap_or_else(|| "mqtt://localhost:1883".to_string());

println!("Connecting to the MQTT server at '{}'...", host);

// Create the client. Use a Client ID for a persistent session.
// A real system should try harder to use a unique ID.
let create_opts = mqtt::CreateOptionsBuilder::new_v3()
Expand All @@ -69,7 +74,11 @@ fn main() {
let mut strm = cli.get_stream(25);

// Define the set of options for the connection
let lwt = mqtt::Message::new("test", "Async subscriber lost connection", mqtt::QOS_1);
let lwt = mqtt::Message::new(
"test/lwt",
"[LWT] Async subscriber lost connection",
mqtt::QOS_1,
);

// Create the connect options, explicitly requesting MQTT v3.x
let conn_opts = mqtt::ConnectOptionsBuilder::new_v3()
Expand All @@ -79,7 +88,6 @@ fn main() {
.finalize();

// Make the connection to the broker
println!("Connecting to the MQTT server...");
cli.connect(conn_opts).await?;

println!("Subscribing to topics: {:?}", TOPICS);
Expand Down
13 changes: 11 additions & 2 deletions examples/async_subscribe_v5.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
//
//! This application is an MQTT subscriber using the asynchronous client
//! interface of the Paho Rust client library.
//!
//! It also monitors for disconnects and performs manual re-connections.
//!
//! The sample demonstrates:
Expand All @@ -17,6 +18,9 @@
//! subscriptions and messages through reconnects.
//! - Last will and testament
//!
//! Note that this example specifically does *not* handle a ^C, so breaking
//! out of the app will always result in an un-clean disconnect causing the
//! broker to emit the LWT message.
/*******************************************************************************
* Copyright (c) 2017-2023 Frank Pagliughi <fpagliughi@mindspring.com>
Expand Down Expand Up @@ -52,6 +56,8 @@ fn main() {
.nth(1)
.unwrap_or_else(|| "mqtt://localhost:1883".to_string());

println!("Connecting to the MQTT server at '{}'...", host);

// Create the client. Use an ID for a persistent session.
// A real system should try harder to use a unique ID.
let create_opts = mqtt::CreateOptionsBuilder::new()
Expand All @@ -70,7 +76,11 @@ fn main() {
let mut strm = cli.get_stream(25);

// Define the set of options for the connection
let lwt = mqtt::Message::new("test", "Async subscriber lost connection", mqtt::QOS_1);
let lwt = mqtt::Message::new(
"test/lwt",
"[LWT] Async subscriber v5 lost connection",
mqtt::QOS_1,
);

// Connect with MQTT v5 and a persistent server session (no clean start).
// For a persistent v5 session, we must set the Session Expiry Interval
Expand All @@ -83,7 +93,6 @@ fn main() {
.finalize();

// Make the connection to the broker
println!("Connecting to the MQTT server...");
cli.connect(conn_opts).await?;

println!("Subscribing to topics: {:?}", TOPICS);
Expand Down
2 changes: 1 addition & 1 deletion examples/dyn_subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ fn main() {
});

// Define the set of options for the connection
let lwt = mqtt::Message::new("test", "Dynamic subscriber lost connection", 1);
let lwt = mqtt::Message::new("test/lwt", "[LWT] Dynamic subscriber lost connection", 1);

// The connect options. Defaults to an MQTT v3.x connection.
let conn_opts = mqtt::ConnectOptionsBuilder::new()
Expand Down
2 changes: 1 addition & 1 deletion examples/legacy_async_subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ fn main() {
});

// Define the set of options for the connection
let lwt = mqtt::Message::new("test", "Async subscriber lost connection", 1);
let lwt = mqtt::Message::new("test/lwt", "[LWT] Async subscriber lost connection", 1);

let conn_opts = mqtt::ConnectOptionsBuilder::new_v3()
.keep_alive_interval(Duration::from_secs(20))
Expand Down
22 changes: 15 additions & 7 deletions examples/sync_consume.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ use std::{env, process, thread, time::Duration};
// trying indefinitely, with a backoff, or something like that.

fn try_reconnect(cli: &mqtt::Client) -> bool {
println!("Connection lost. Waiting to retry connection");
for _ in 0..12 {
thread::sleep(Duration::from_millis(5000));
println!("Connection lost. Reconnecting...");
for _ in 0..60 {
thread::sleep(Duration::from_secs(1));
if cli.reconnect().is_ok() {
println!("Successfully reconnected");
println!(" Successfully reconnected");
return true;
}
}
Expand All @@ -67,6 +67,8 @@ fn main() {
.nth(1)
.unwrap_or_else(|| "mqtt://localhost:1883".to_string());

println!("Connecting to the MQTT broker at '{}'...", host);

// Create the client. Use an ID for a persistent session.
// A real system should try harder to use a unique ID.
let create_opts = mqtt::CreateOptionsBuilder::new()
Expand Down Expand Up @@ -98,7 +100,6 @@ fn main() {
let qos = [1, 1];

// Make the connection to the broker
println!("Connecting to the MQTT broker...");
match cli.connect(conn_opts) {
Ok(rsp) => {
if let Some(conn_rsp) = rsp.connect_response() {
Expand All @@ -107,11 +108,18 @@ fn main() {
conn_rsp.server_uri, conn_rsp.mqtt_version
);
if conn_rsp.session_present {
// Since our persistent session is already on the broker
// we don't need to subscribe to the topics.
println!(" w/ client session already present on broker.");
}
else {
// Register subscriptions on the server
println!("Subscribing to topics with requested QoS: {:?}...", qos);
// The server doesn't have a persistent session already
// stored for us (1st connection?), so we need to subscribe
// to the topics we want to receive.
println!(
"Subscribing to topics {:?} with requested QoS: {:?}...",
subscriptions, qos
);

cli.subscribe_many(&subscriptions, &qos)
.and_then(|rsp| {
Expand Down
17 changes: 9 additions & 8 deletions examples/sync_consume_v5.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ fn command_handler(msg: mqtt::Message) -> bool {
// trying indefinitely, with a backoff, or something like that.

fn try_reconnect(cli: &mqtt::Client) -> bool {
println!("Connection lost. Waiting to retry connection");
for _ in 0..12 {
thread::sleep(Duration::from_millis(5000));
println!("Connection lost. Attempting to reconnect...");
for _ in 0..60 {
thread::sleep(Duration::from_secs(1));
if cli.reconnect().is_ok() {
println!("Successfully reconnected");
println!(" Successfully reconnected");
return true;
}
}
Expand All @@ -103,6 +103,8 @@ fn main() -> mqtt::Result<()> {
.nth(1)
.unwrap_or_else(|| "mqtt://localhost:1883".to_string());

println!("Connecting to the MQTT broker at '{}'...", host);

// Create the client. Use an ID for a persistent session.
// A real system should try harder to use a unique ID.
let create_opts = mqtt::CreateOptionsBuilder::new()
Expand Down Expand Up @@ -136,12 +138,11 @@ fn main() -> mqtt::Result<()> {
let handler: Vec<fn(mqtt::Message) -> bool> = vec![data_handler, command_handler];

// Make the connection to the broker

let rsp = cli.connect(conn_opts)?;

// We're connecting with a persistent session. So we check if
// the server already knows about us and rembers about out
// subscription(s). If not, we subscribe for incoming requests.
// the server already knows about us and rembers our subscription(s).
// If not, we subscribe for incoming requests.

if let Some(conn_rsp) = rsp.connect_response() {
println!(
Expand All @@ -154,7 +155,7 @@ fn main() -> mqtt::Result<()> {
}
else {
// Register subscriptions on the server, using Subscription ID's.
println!("Subscribing to topics...");
println!(r#"Subscribing to topics ["data/#", "command"]..."#);
cli.subscribe_with_options("data/#", 0, None, sub_id(1))?;
cli.subscribe_with_options("command", 1, None, sub_id(2))?;
}
Expand Down

0 comments on commit 45740e8

Please sign in to comment.