Skip to content

Commit

Permalink
Added mqtt-policy crate (#3593)
Browse files Browse the repository at this point in the history
This is a second to last PR in a series of PRs to integrate Policy Engine into the Broker.

In this PR I've added a crate that contains MQTT specific implementation of Policy engine plugins.
  • Loading branch information
vadim-kovalyov authored Sep 23, 2020
1 parent c186d4c commit 5effde9
Show file tree
Hide file tree
Showing 12 changed files with 871 additions and 7 deletions.
15 changes: 15 additions & 0 deletions mqtt/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions mqtt/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ members = [
"mqtt-broker-tests-util",
"mqtt-edgehub",
"mqtt-generic",
"mqtt-policy",
"mqttd",
"policy"
]
Expand Down
2 changes: 1 addition & 1 deletion mqtt/mqtt-broker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ regex = "1"
serde = { version = "1.0", features = ["derive", "rc"] }
serde_json = "1.0"
thiserror = "1.0"
tokio = { version = "0.2", features = ["blocking", "stream", "sync", "tcp", "dns"] }
tokio = { version = "0.2", features = ["blocking", "stream", "sync", "tcp", "dns", "io-util"] }
tokio-io-timeout = "0.4"
tokio-openssl = "0.4"
tokio-util = { version = "0.2", features = ["codec"] }
Expand Down
12 changes: 11 additions & 1 deletion mqtt/mqtt-broker/src/auth/authorization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub trait Authorizer {
/// Authorizes a MQTT client to perform some action.
fn authorize(&self, activity: Activity) -> Result<Authorization, Self::Error>;

fn update(&self, _update: Box<dyn Any>) -> Result<(), Self::Error> {
fn update(&mut self, _update: Box<dyn Any>) -> Result<(), Self::Error> {
Ok(())
}
}
Expand Down Expand Up @@ -72,6 +72,7 @@ impl Authorizer for AllowAll {
}

/// Describes a client activity to authorized.
#[derive(Debug)]
pub struct Activity {
client_id: ClientId,
client_info: ClientInfo,
Expand All @@ -91,6 +92,10 @@ impl Activity {
}
}

pub fn client_id(&self) -> &ClientId {
&self.client_id
}

pub fn client_info(&self) -> &ClientInfo {
&self.client_info
}
Expand All @@ -105,6 +110,7 @@ impl Activity {
}

/// Describes a client operation to be authorized.
#[derive(Debug)]
pub enum Operation {
Connect(Connect),
Publish(Publish),
Expand All @@ -129,6 +135,7 @@ impl Operation {
}

/// Represents a client attempt to connect to the broker.
#[derive(Debug)]
pub struct Connect {
will: Option<Publication>,
}
Expand All @@ -148,6 +155,7 @@ impl From<proto::Connect> for Connect {
}

/// Represents a publication description without payload to be used for authorization.
#[derive(Debug)]
pub struct Publication {
topic_name: String,
qos: proto::QoS,
Expand Down Expand Up @@ -179,6 +187,7 @@ impl From<proto::Publication> for Publication {
}

/// Represents a client attempt to publish a new message on a specified MQTT topic.
#[derive(Debug)]
pub struct Publish {
publication: Publication,
}
Expand Down Expand Up @@ -206,6 +215,7 @@ impl From<proto::Publish> for Publish {
}

/// Represents a client attempt to subscribe to a specified MQTT topic in order to received messages.
#[derive(Clone, Debug)]
pub struct Subscribe {
topic_filter: String,
qos: proto::QoS,
Expand Down
14 changes: 10 additions & 4 deletions mqtt/mqtt-broker/src/auth/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,21 @@ pub enum AuthId {
Identity(Identity),
}

impl Display for AuthId {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
impl AuthId {
pub fn as_str(&self) -> &str {
match self {
Self::Anonymous => write!(f, "*"),
Self::Identity(identity) => write!(f, "{}", identity),
AuthId::Anonymous => "*",
AuthId::Identity(identity) => identity.as_str(),
}
}
}

impl Display for AuthId {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
write!(f, "{}", self.as_str())
}
}

impl AuthId {
/// Creates a MQTT identity for known client.
pub fn from_identity<T: Into<Identity>>(identity: T) -> Self {
Expand Down
2 changes: 1 addition & 1 deletion mqtt/mqtt-edgehub/src/auth/authorization/edgehub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ impl Authorizer for EdgeHubAuthorizer {
Ok(auth)
}

fn update(&self, update: Box<dyn Any>) -> Result<(), Self::Error> {
fn update(&mut self, update: Box<dyn Any>) -> Result<(), Self::Error> {
let identities = update.as_ref();
if let Some(service_identities) = identities.downcast_ref::<ServiceIdentity>() {
debug!("{:?}", service_identities);
Expand Down
20 changes: 20 additions & 0 deletions mqtt/mqtt-policy/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[package]
name = "mqtt-policy"
version = "0.1.0"
authors = ["Azure IoT Edge Devs"]
edition = "2018"
description = "This crate contains MQTT specific plugins for authorization policy engine. See 'policy' crate."

[dependencies]
lazy_static = "1.4"
tracing = "0.1"
thiserror = "1.0"

mqtt-broker = { path = "../mqtt-broker" }
mqtt3 = { path = "../mqtt3" }
policy = { path = "../policy" }

[dev-dependencies]
assert_matches = "1.3"
bytes = "0.5"
test-case = "1.0"
34 changes: 34 additions & 0 deletions mqtt/mqtt-policy/src/errors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use thiserror::Error;

#[derive(Debug, Error, PartialEq)]
pub enum Error {
#[error("One or several errors occurred validating MQTT broker policy definition: {0:?}.")]
ValidationSummary(Vec<Error>),

#[error("Unsupported schema version: {0}")]
UnsupportedVersion(String),

#[error("Identities list must not be empty")]
EmptyIdentities,

#[error("Operations list must not be empty")]
EmptyOperations,

#[error("Resources list must not be empty")]
EmptyResources,

#[error("Identity name is invalid: {0}")]
InvalidIdentity(String),

#[error("Resource (topic filter) is invalid: {0}")]
InvalidResource(String),

#[error("Unknown mqtt operation: {0}. List of supported operations: mqtt:publish, mqtt:subscribe, mqtt:connect")]
InvalidOperation(String),

#[error("Invalid identity variable name: {0}")]
InvalidIdentityVariable(String),

#[error("Invalid resource variable name: {0}")]
InvalidResourceVariable(String),
}
76 changes: 76 additions & 0 deletions mqtt/mqtt-policy/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#![deny(rust_2018_idioms, warnings)]
#![deny(clippy::all, clippy::pedantic)]
#![allow(
clippy::cognitive_complexity,
clippy::large_enum_variant,
clippy::similar_names,
clippy::module_name_repetitions,
clippy::use_self,
clippy::match_same_arms,
clippy::must_use_candidate,
clippy::missing_errors_doc
)]

mod errors;
mod matcher;
mod substituter;
mod validator;

pub use crate::matcher::MqttTopicFilterMatcher;
pub use crate::substituter::MqttSubstituter;
pub use crate::validator::MqttValidator;

pub(crate) const IDENTITY_VAR: &str = "{{iot:identity}}";
pub(crate) const DEVICE_ID_VAR: &str = "{{iot:device_id}}";
pub(crate) const MODULE_ID_VAR: &str = "{{iot:module_id}}";
pub(crate) const CLIENT_ID_VAR: &str = "{{mqtt:client_id}}";
pub(crate) const EDGEHUB_ID_VAR: &str = "{{iot:this_device_id}}";
pub(crate) const TOPIC_VAR: &str = "{{mqtt:topic}}";

#[cfg(test)]
mod tests {
use std::time::Duration;

use bytes::Bytes;
use mqtt3::proto;
use mqtt_broker::{auth::Activity, auth::Operation, AuthId, ClientId, ClientInfo};

pub(crate) fn create_connect_activity(
client_id: impl Into<ClientId>,
auth_id: impl Into<AuthId>,
) -> Activity {
let client_id = client_id.into();
Activity::new(
client_id.clone(),
ClientInfo::new("127.0.0.1:80".parse().unwrap(), auth_id),
Operation::new_connect(proto::Connect {
username: None,
password: None,
will: None,
client_id: proto::ClientId::IdWithExistingSession(client_id.to_string()),
keep_alive: Duration::default(),
protocol_name: mqtt3::PROTOCOL_NAME.into(),
protocol_level: mqtt3::PROTOCOL_LEVEL,
}),
)
}

pub(crate) fn create_publish_activity(
client_id: impl Into<ClientId>,
auth_id: impl Into<AuthId>,
) -> Activity {
Activity::new(
client_id.into(),
ClientInfo::new("127.0.0.1:80".parse().unwrap(), auth_id),
Operation::new_publish(proto::Publish {
packet_identifier_dup_qos: proto::PacketIdentifierDupQoS::AtLeastOnce(
proto::PacketIdentifier::new(1).unwrap(),
false,
),
retain: true,
topic_name: "/foo/bar".to_string(),
payload: Bytes::new(),
}),
)
}
}
96 changes: 96 additions & 0 deletions mqtt/mqtt-policy/src/matcher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use std::str::FromStr;

use mqtt_broker::{
auth::{Activity, Operation},
TopicFilter,
};
use policy::{Request, ResourceMatcher};

/// This is MQTT-specific resource matcher that matches topics and topic filters
/// according to MQTT spec.
///
/// # Example:
/// ```json
/// {
/// "effect": "allow",
/// "identities": ["client_1"],
/// "operations": ["mqtt:publish"],
/// "resources": ["floor1/#"]
/// }
/// ```
/// The policy statement above will allow `client_1` to publish to any topic
/// that matches "floor1/#" topic filter (like "floor1/station1/events")
#[derive(Debug)]
pub struct MqttTopicFilterMatcher;

impl ResourceMatcher for MqttTopicFilterMatcher {
type Context = Activity;

fn do_match(&self, context: &Request<Activity>, input: &str, policy: &str) -> bool {
match context.context() {
Some(context) => {
match context.operation() {
// special case for Connect operation, since it doesn't really have a "resource".
Operation::Connect(_) => true,
// for pub or sub just match the topic filter.
_ => {
if let Ok(filter) = TopicFilter::from_str(policy) {
filter.matches(input)
} else {
false
}
}
}
}
None => false,
}
}
}

#[cfg(test)]
mod tests {
use test_case::test_case;

use policy::Request;

use crate::tests;

use super::*;

#[test_case("/foo", "/foo", true; "simple topic matches")]
#[test_case("/bar", "/foo", false; "simple topic doesn't match")]
#[test_case("/foo/bar", "/foo/#", true; "wildcard 1")]
#[test_case("/foo/bar", "/foo/+", true; "wildcard 2")]
#[test_case("#invalid", "/foo/+", false; "invalid topic")]
#[test_case("/foo", "#invalid", false; "invalid topic filter")]
fn do_match_test(input: &str, policy: &str, result: bool) {
let request = Request::with_context(
"some_identity",
"some_operation",
"some_resource",
tests::create_publish_activity("client_id", "auth_id"),
)
.unwrap();

// connect operation should match any input value.
assert_eq!(
result,
MqttTopicFilterMatcher.do_match(&request, input, policy)
);
}

#[test]
fn do_match_connect_activity_test() {
let request = Request::with_context(
"some_identity",
"some_operation",
"some_resource",
tests::create_connect_activity("client_id", "auth_id"),
)
.unwrap();

// connect operation should match any input value.
assert!(MqttTopicFilterMatcher.do_match(&request, "any_value", "ignored_value1"));
assert!(MqttTopicFilterMatcher.do_match(&request, "some_value", "ignored_value2"));
}
}
Loading

0 comments on commit 5effde9

Please sign in to comment.