Skip to content

Commit

Permalink
Basic Bridge controller (#3427)
Browse files Browse the repository at this point in the history
Starting bridge controller from mqttd, reading settings from env and config file for nested bridge and starting an empty bridge.
It's mostly a scaffolding to be able to start working in parallel on bridge tasks.

Todo: handle error when loading settings
  • Loading branch information
ancaantochi authored Aug 22, 2020
1 parent c151475 commit 5a79646
Show file tree
Hide file tree
Showing 10 changed files with 345 additions and 0 deletions.
13 changes: 13 additions & 0 deletions mqtt/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions mqtt/mqtt-bridge/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[package]
name = "mqtt-bridge"
version = "0.1.0"
authors = ["Azure IoT Edge Devs"]
edition = "2018"

[dependencies]
anyhow = "1.0"
config = { version = "0.10", features = ["json"], default-features = false }
serde = { version = "1.0", features = ["derive", "rc"] }
serial_test = "0.4"
tracing = "0.1"

edgelet-client = { path = "../edgelet-client"}
4 changes: 4 additions & 0 deletions mqtt/mqtt-bridge/config/default.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"subscriptions": [],
"forwards": []
}
30 changes: 30 additions & 0 deletions mqtt/mqtt-bridge/src/bridge.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use tracing::info;

use crate::settings::Settings;

/// Bridge implementation for nested scenario.
/// It is used when IOTEDGE_GATEWAYHOSTNAME env variable is set.
pub struct NestedBridge {
settings: Settings,
}

impl NestedBridge {
pub fn new(settings: Settings) -> Self {
NestedBridge { settings }
}

pub async fn start(&self) {
info!("Starting nested bridge...{:?}", self.settings);

self.connect_to_local().await;
self.connect_upstream().await;
}

async fn connect_upstream(&self) {
info!("connecting to upstream broker");
}

async fn connect_to_local(&self) {
info!("connecting to local broker");
}
}
32 changes: 32 additions & 0 deletions mqtt/mqtt-bridge/src/controller.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use anyhow::Result;
use tracing::info;

use crate::bridge::NestedBridge;
use crate::settings::Settings;

/// Controller that handles the settings and monitors changes, spawns new Bridges and monitors shutdown signal.
#[derive(Default)]
pub struct BridgeController {
nested_bridge: Option<NestedBridge>,
}

impl BridgeController {
pub fn new() -> Self {
Self::default()
}

pub async fn start(&mut self) -> Result<()> {
let settings = Settings::new()?;
match settings.nested_bridge().gateway_hostname() {
Some(_) => {
let nested_bridge = NestedBridge::new(settings.clone());
nested_bridge.start().await;

self.nested_bridge = Option::Some(nested_bridge);
}
None => info!("No nested bridge found."),
};

Ok(())
}
}
5 changes: 5 additions & 0 deletions mqtt/mqtt-bridge/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
mod bridge;
pub mod controller;
mod settings;

pub use crate::controller::BridgeController;
215 changes: 215 additions & 0 deletions mqtt/mqtt-bridge/src/settings.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
use std::{path::Path, vec::Vec};

use config::{Config, ConfigError, Environment, File, FileFormat};
use serde::Deserialize;

pub const DEFAULTS: &str = include_str!("../config/default.json");
pub const ENVIRONMENT_PREFIX: &str = "iotedge";

#[derive(Debug, Clone, Default, PartialEq, Deserialize)]
pub struct Settings {
#[serde(flatten)]
nested_bridge: NestedBridgeSettings,

#[serde(flatten)]
subscriptions: Subscriptions,

#[serde(flatten)]
forwards: Forwards,
}

impl Settings {
pub fn new() -> Result<Self, ConfigError> {
let mut config = Config::new();

config.merge(File::from_str(DEFAULTS, FileFormat::Json))?;
config.merge(Environment::with_prefix(ENVIRONMENT_PREFIX))?;

config.try_into()
}

pub fn from_file<P>(path: P) -> Result<Self, ConfigError>
where
P: AsRef<Path>,
{
let mut config = Config::new();

config.merge(File::from_str(DEFAULTS, FileFormat::Json))?;
config.merge(File::from(path.as_ref()))?;
config.merge(Environment::with_prefix(ENVIRONMENT_PREFIX))?;

config.try_into()
}

pub fn nested_bridge(&self) -> &NestedBridgeSettings {
&self.nested_bridge
}

pub fn subscriptions(&self) -> &Subscriptions {
&self.subscriptions
}

pub fn forwards(&self) -> &Forwards {
&self.forwards()
}
}

#[derive(Debug, Clone, Default, PartialEq, Deserialize)]
pub struct NestedBridgeSettings {
#[serde(rename = "gatewayhostname")]
gateway_hostname: Option<String>,

#[serde(rename = "deviceid")]
device_id: Option<String>,

#[serde(rename = "moduleid")]
module_id: Option<String>,

#[serde(rename = "modulegenerationid")]
generation_id: Option<String>,

#[serde(rename = "workloaduri")]
workload_uri: Option<String>,
}

impl NestedBridgeSettings {
pub fn gateway_hostname(&self) -> Option<&str> {
self.gateway_hostname.as_ref().map(AsRef::as_ref)
}

pub fn device_id(&self) -> Option<&str> {
self.device_id.as_ref().map(AsRef::as_ref)
}

pub fn module_id(&self) -> Option<&str> {
self.module_id.as_ref().map(AsRef::as_ref)
}

pub fn generation_id(&self) -> Option<&str> {
self.generation_id.as_ref().map(AsRef::as_ref)
}

pub fn workload_uri(&self) -> Option<&str> {
self.workload_uri.as_ref().map(AsRef::as_ref)
}
}

#[derive(Debug, Clone, Default, PartialEq, Deserialize)]
pub struct Subscriptions {
subscriptions: Vec<Subscription>,
}

impl Subscriptions {
pub fn subscriptions(self) -> Vec<Subscription> {
self.subscriptions
}
}

#[derive(Debug, Clone, Default, PartialEq, Deserialize)]
pub struct Subscription {
pattern: String,

remote: String,
}

impl Subscription {
pub fn pattern(&self) -> &str {
&self.pattern
}

pub fn remote(&self) -> &str {
&self.remote
}
}

#[derive(Debug, Clone, Default, PartialEq, Deserialize)]
pub struct Forwards {
forwards: Vec<Forward>,
}

impl Forwards {
pub fn forwards(self) -> Vec<Forward> {
self.forwards
}
}

#[derive(Debug, Clone, Default, PartialEq, Deserialize)]
pub struct Forward {
pattern: String,

remote: String,
}

impl Forward {
pub fn pattern(&self) -> &str {
&self.pattern
}

pub fn remote(&self) -> &str {
&self.remote
}
}

#[cfg(test)]
mod tests {
use serial_test::serial;

use super::Settings;

use config::ConfigError;

#[test]
#[serial(env_settings)]
fn new_overrides_settings_from_env() {
it_overrides_settings_from_env(Settings::new);
}

#[test]
#[serial(env_settings)]
fn from_file_reads_nested_bridge_settings() {
let settings = Settings::from_file("tests/config.json").unwrap();

assert_eq!(
settings.nested_bridge().gateway_hostname().unwrap(),
"edge1"
);
assert_eq!(settings.nested_bridge().device_id().unwrap(), "d1");
assert_eq!(settings.nested_bridge().module_id().unwrap(), "mymodule");
assert_eq!(settings.nested_bridge().generation_id().unwrap(), "321");
assert_eq!(settings.nested_bridge().workload_uri().unwrap(), "uri");
}

#[test]
#[serial(env_settings)]
fn from_file_overrides_settings_from_env() {
it_overrides_settings_from_env(|| Settings::from_file("tests/config.json"));
}

fn it_overrides_settings_from_env<F>(make_settings: F)
where
F: FnOnce() -> Result<Settings, ConfigError>,
{
let _gateway_hostname = std::env::set_var("IOTEDGE_GATEWAYHOSTNAME", "upstream");
let _device_id = std::env::set_var("IOTEDGE_DEVICEID", "device1");
let _module_id = std::env::set_var("IOTEDGE_MODULEID", "m1");
let _generation_id = std::env::set_var("IOTEDGE_MODULEGENERATIONID", "123");
let _workload_uri = std::env::set_var("IOTEDGE_WORKLOADURI", "workload");

let settings = make_settings().unwrap();

assert_eq!(
settings.nested_bridge().gateway_hostname().unwrap(),
"upstream"
);
assert_eq!(settings.nested_bridge().device_id().unwrap(), "device1");
assert_eq!(settings.nested_bridge().module_id().unwrap(), "m1");
assert_eq!(settings.nested_bridge().generation_id().unwrap(), "123");
assert_eq!(settings.nested_bridge().workload_uri().unwrap(), "workload");

std::env::remove_var("IOTEDGE_GATEWAYHOSTNAME");
std::env::remove_var("IOTEDGE_DEVICEID");
std::env::remove_var("IOTEDGE_MODULEID");
std::env::remove_var("IOTEDGE_MODULEGENERATIONID");
std::env::remove_var("IOTEDGE_WORKLOADURI");
}
}
19 changes: 19 additions & 0 deletions mqtt/mqtt-bridge/tests/config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"gatewayhostname": "edge1",
"deviceid": "d1",
"moduleid": "mymodule",
"modulegenerationid": "321",
"workloaduri": "uri",
"subscriptions": [
{
"pattern": "temp/#",
"remote": "floor/kitchen/"
}
],
"forwards": [
{
"pattern": "some",
"remote": "remote"
}
]
}
1 change: 1 addition & 0 deletions mqtt/mqttd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ edgelet-client = { path = "../edgelet-client", optional = true }
mqtt-broker = { path = "../mqtt-broker" }
mqtt-edgehub = { path = "../mqtt-edgehub", optional = true }
mqtt-generic = { path = "../mqtt-generic", optional = true }
mqtt-bridge = { path = "../mqtt-bridge" }

[dev-dependencies]
mockito = "0.25"
Expand Down
12 changes: 12 additions & 0 deletions mqtt/mqttd/src/broker/bootstrap/edgehub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use tokio::{
};
use tracing::{info, warn};

use mqtt_bridge::BridgeController;
use mqtt_broker::BrokerHandle;
use mqtt_broker::{
auth::Authorizer, Broker, BrokerBuilder, BrokerConfig, BrokerSnapshot, Server,
Expand Down Expand Up @@ -175,6 +176,8 @@ async fn start_sidecars(
let (mut command_handler_shutdown_handle, command_handler_join_handle) =
start_command_handler(broker_handle, system_address).await?;

start_bridge().await?;

tx.await?;

command_handler_shutdown_handle.shutdown().await?;
Expand Down Expand Up @@ -202,6 +205,15 @@ async fn start_command_handler(
Ok((shutdown_handle, join_handle))
}

// TODO: allow for bridge shutdown
async fn start_bridge() -> Result<()> {
info!("starting bridge...");
let mut bridge_controller = BridgeController::new();
bridge_controller.start().await?;

Ok(())
}

#[derive(Debug, thiserror::Error)]
pub enum ServerCertificateLoadError {
#[error("unable to load server certificate from file {0} and private key {1}")]
Expand Down

0 comments on commit 5a79646

Please sign in to comment.