A library to use Redis as the persistence store for Paho MQTT Rust clients.
MQTT is a light-weight, distributed, messaging system particularly tailored for devices that have unreliable network connections. Part of the way it achieves a higher quality of service is that clients can use a persistence store to keep messages tha are in-flight until they are confirmed as received by the remote system. This way, even if the client application crashes and restarts, the messages can be confirmed or re-sent.
Typically, when using persistence, messages are saved to disk. On some smaller flash-based devices such as Embedded Linux systems, this might not be the most efficient means to store the messages, as continuous writes could wear out the flash chips prematurely.
A Redis server, running on the local client device can serve as a convenient store for messages. The Paho library's user-defined persistence mechanism uses key/value access which maps perfectly to any similar API such as Redis. Using it in this context takes less than 100 lines of code.
Note that this only applies to a local Redis server running on the same host as the MQTT client application. It would not make sense to try to persist messages across the network as that would introduce network problems and latency at the persistence level. But a local Redis service has proven to work well in production.
The Paho MQTT Rust library is a wrapper around the Paho C library. It can be included in a project via crates.io, with the package name paho-mqtt
. Add this to the dependencies section of your Cargo.toml file:
[dependencies]
paho-mqtt = "0.12"
paho-mqtt-redis = "0.3"
The source repository lives on GitHub at:
https://github.com/eclipse/paho.mqtt.rust
This library uses the "redis" crate to communicate with the Redis server. It is listed as a dependency in the Cargo.toml
file. The project's home page can be found at:
https://github.com/mitsuhiko/redis-rs
Note that this client assumes that Redis is running on the local machine, bound to localhost using the default Redis port. It probably wouldn't make a lot of sense to use a remote service as a persistence store since its primary purpose is to protect from unreliable network connections. Thus, using a local service seems the proper choice. But it would be trivial to update the RedisClient constructor RedisPersistence::new()
to take a URI string to specify another server or port.
The Paho Rust library contains a trait that can be used to supply a user-defined persistence:
pub trait ClientPersistence {
fn open(&mut self, client_id: &str, server_uri: &str) -> MqttResult<()>;
fn close(&mut self) -> MqttResult<()>;
fn put(&mut self, key: &str, buffers: Vec<&[u8]>) -> MqttResult<()>;
fn get(&mut self, key: &str) -> MqttResult<Vec<u8>>;
fn remove(&mut self, key: &str) -> MqttResult<()>;
fn keys(&mut self) -> MqttResult<Vec<String>>;
fn clear(&mut self) -> MqttResult<()>;
fn contains_key(&mut self, key: &str) -> bool;
}
These operations map closely to those provided by key/value stores such as hash maps. Redis can implement these operations on a one-to-one basis using a hash type, with the operations hset
, hget
, hdel
, hkeys
, del
, and hexists
. The key
string sent to the functions can act as the Redis hash field.
The name (primary key) for the hash is generated by concatenating the client_id
and server_uri
as provided to the call to open()
.
The bulk of this library is dedicated to the implementation of a RedisPersistence
struct which implements the ClientPersistence
trait for use with a Redis server.
Using the Redis persisence is fairly trivial. There's an example application, redis_persist_pub.rs
demonstrating its use in the examples folder.
Do the following:
- Create an instance of a
RedisPersistence
struct. - Create an instance of an MQTT
CreateOptions
struct, specifying the RedisPersistence object as the user-defined persistence. - Create an MQTT client, using the options.
It can be done like this:
let persistence = RedisPersistence::new();
let opts = mqtt::CreateOptionsBuilder::new()
.server_uri("tcp://localhost:1883")
.user_persistence(persistence)
.finalize();
let cli = mqtt::AsyncClient::new(opts).unwrap_or_else(|e| {
println!("Error creating the client: {:?}", e);
process::exit(1);
});
The library will then automatically use the Redis persistence to save and restore messages and other data as needed.