Skip to content

Commit

Permalink
Lazy instantiation of Filters
Browse files Browse the repository at this point in the history
This PR implements lazy instantiation of filters through a closure -
which also leads to the ability to pass the server_yaml::Value for the
config of the filter, which was a nice side effect.

To prove the lazy eval and config passing was working, this also added
the ability to pass an "id" value to the DebugFilter, such that you
could implement several of them in a Filter configuration in between
other filters, and be able to identify which log statement is which
step.

This also led to finding multiple places in which an Arc was being used,
but only the guarantees of a Box where actually needed, so this was also
switched out.

An integration test for the DebugFilter is still incoming.

Work on #1
  • Loading branch information
markmandel committed Jul 11, 2020
1 parent 5f0db6e commit 6d1f372
Show file tree
Hide file tree
Showing 9 changed files with 155 additions and 62 deletions.
18 changes: 9 additions & 9 deletions src/extensions/filter_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ use crate::extensions::{Filter, FilterRegistry};
/// The filter implementation returns the results of data that has gone through each of the filters in the chain.
/// If any of the Filters in the chain return a None, then the chain is broken, and nothing is returned.
pub struct FilterChain {
filters: Vec<Arc<dyn Filter>>,
filters: Vec<Box<dyn Filter>>,
}

impl FilterChain {
pub fn new(filters: Vec<Arc<dyn Filter>>) -> Self {
pub fn new(filters: Vec<Box<dyn Filter>>) -> Self {
FilterChain { filters }
}

Expand All @@ -42,17 +42,17 @@ impl FilterChain {
config: Arc<Config>,
filter_registry: &FilterRegistry,
) -> Result<FilterChain> {
let mut filters = Vec::<Arc<dyn Filter>>::new();
let mut filters = Vec::<Box<dyn Filter>>::new();
for filter_config in &config.filters {
match filter_registry.get(&filter_config.name) {
match filter_registry.get(&filter_config.name, &filter_config.config) {
None => {
return Err(Error::new(
ErrorKind::InvalidInput,
format!("Filter '{}' not found", filter_config.name),
));
}
Some(filter) => {
filters.push(filter.clone());
filters.push(filter);
}
}
}
Expand Down Expand Up @@ -137,7 +137,7 @@ mod tests {

use crate::config;
use crate::config::{ConnectionConfig, Local};
use crate::extensions::default_filters;
use crate::extensions::default_registry;
use crate::extensions::filters::DebugFilter;
use crate::test_utils::{logger, noop_endpoint, TestFilter};

Expand All @@ -161,7 +161,7 @@ mod tests {
},
});

let registry = default_filters(&log);
let registry = default_registry(&log);
let chain = FilterChain::from_config(config, &registry).unwrap();
assert_eq!(1, chain.filters.len());

Expand Down Expand Up @@ -199,7 +199,7 @@ mod tests {

#[test]
fn chain_single_test_filter() {
let chain = FilterChain::new(vec![Arc::new(TestFilter {})]);
let chain = FilterChain::new(vec![Box::new(TestFilter {})]);

let endpoints_fixture = endpoints();

Expand Down Expand Up @@ -256,7 +256,7 @@ mod tests {

#[test]
fn chain_double_test_filter() {
let chain = FilterChain::new(vec![Arc::new(TestFilter {}), Arc::new(TestFilter {})]);
let chain = FilterChain::new(vec![Box::new(TestFilter {}), Box::new(TestFilter {})]);

let endpoints_fixture = endpoints();

Expand Down
42 changes: 27 additions & 15 deletions src/extensions/filter_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
*/

use std::collections::HashMap;
use std::sync::Arc;

use crate::config::EndPoint;
use slog::Logger;
use std::net::SocketAddr;

/// Filter is a trait for routing and manipulating packets.
Expand Down Expand Up @@ -62,26 +62,32 @@ pub trait Filter: Send + Sync {
) -> Option<Vec<u8>>;
}

pub type BoxFilter = Box<dyn Filter>;
/// Function that returns a filter
type FnFilter = Box<dyn Fn(&Logger, &serde_yaml::Value) -> BoxFilter + Send>;

/// FilterRegistry is the registry of all Filters that can be applied in the system.
pub struct FilterRegistry {
registry: HashMap<String, Arc<dyn Filter>>,
log: Logger,
registry: HashMap<String, FnFilter>,
}

impl FilterRegistry {
pub fn new() -> FilterRegistry {
pub fn new(base: &Logger) -> FilterRegistry {
FilterRegistry {
log: base.clone(),
registry: Default::default(),
}
}

/// insert inserts a Filter into the registry.
pub fn insert(&mut self, key: String, filter: impl Filter + 'static) {
self.registry.insert(key, Arc::new(filter));
/// insert inserts the fucntion that will create a Filter into the registry.
pub fn insert(&mut self, key: String, filter: FnFilter) {
self.registry.insert(key, filter);
}

/// get returns the filter for a given Key. Returns None if not found.
pub fn get(&self, key: &String) -> Option<&Arc<dyn Filter>> {
self.registry.get(key)
/// get returns an instance of a filter for a given Key. Returns None if not found.
pub fn get(&self, key: &String, config: &serde_yaml::Value) -> Option<Box<dyn Filter>> {
self.registry.get(key).map(|f| f(&self.log, &config))
}
}

Expand All @@ -90,6 +96,7 @@ mod tests {
use std::net::{IpAddr, Ipv4Addr, SocketAddr};

use super::*;
use crate::test_utils::logger;

struct TestFilter {}

Expand Down Expand Up @@ -123,12 +130,17 @@ mod tests {

#[test]
fn insert_and_get() {
let mut reg = FilterRegistry::new();
reg.insert(String::from("test.filter"), TestFilter {});
assert!(reg.get(&String::from("not.found")).is_none());
assert!(reg.get(&String::from("test.filter")).is_some());

let filter = reg.get(&String::from("test.filter")).unwrap();
let logger = logger();
let mut reg = FilterRegistry::new(&logger);
reg.insert(
String::from("test.filter"),
Box::new(|_, _| Box::new(TestFilter {})),
);
let config = serde_yaml::Value::Null;
assert!(reg.get(&String::from("not.found"), &config).is_none());
assert!(reg.get(&String::from("test.filter"), &config).is_some());

let filter = reg.get(&String::from("test.filter"), &config).unwrap();

let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
let endpoint = EndPoint {
Expand Down
91 changes: 81 additions & 10 deletions src/extensions/filters/debug_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,64 @@ use std::net::SocketAddr;
use slog::{info, o, Logger};

use crate::config::EndPoint;
use crate::extensions::Filter;
use crate::extensions::{BoxFilter, Filter};

/// Debug Filter logs all incoming and outgoing packets
///
/// # Configuration
///
/// ```yaml
/// local:
/// port: 7000 # the port to receive traffic to locally
/// filters:
/// - name: quilkin.core.v1alpaha1.debug
/// config:
/// id: "debug-1"
/// client:
/// addresses:
/// - 127.0.0.1:7001
/// connection_id: 1x7ijy6
/// ```
/// `config.id` (optional) adds a "id" field with a given value to each log line.
/// This can be useful to identify debug log positioning within a filter config if you have
/// multiple DebugFilters configured.
///
pub struct DebugFilter {
log: Logger,
}

impl DebugFilter {
pub fn new(base: &Logger) -> Self {
DebugFilter {
log: base.new(o!("source" => "extensions::DebugFilter")),
}
/// Constructor for the DebugFilter. Pass in a "id" to append a string to your log messages from this
/// Filter.
pub fn new(base: &Logger, id: Option<String>) -> Self {
let log = match id {
None => base.new(o!("source" => "extensions::DebugFilter")),
Some(id) => base.new(o!("source" => "extensions::DebugFilter", "id" => id)),
};

DebugFilter { log }
}

/// name returns the configuration name for the DebugFilter
pub fn name() -> String {
return String::from("quilkin.core.alpahav1.debug");
return String::from("quilkin.core.v1alpaha1.debug");
}

/// config configures the DebugFilter from yaml and returns it ready for the FilterRegistry
pub fn config(base: &Logger, config: &serde_yaml::Value) -> BoxFilter {
let prefix = match config {
serde_yaml::Value::Mapping(map) => {
map.get(&serde_yaml::Value::from("id")).map(|value| {
value
.as_str()
.expect("DebugFilter.config.id should have a string value")
.to_string()
})
}
_ => None,
};

Box::new(DebugFilter::new(base, prefix))
}
}

Expand Down Expand Up @@ -95,10 +136,12 @@ mod tests {
use crate::test_utils::logger;

use super::*;
use serde_yaml::Mapping;
use serde_yaml::Value;

#[test]
fn local_receive_filter() {
let df = DebugFilter::new(&logger());
let df = DebugFilter::new(&logger(), None);
let endpoints = vec![EndPoint {
name: "e1".to_string(),
address: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 12357),
Expand All @@ -118,7 +161,7 @@ mod tests {

#[test]
fn local_send_filter() {
let df = DebugFilter::new(&logger());
let df = DebugFilter::new(&logger(), None);
let to = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 12358);
let contents = "hello".to_string().into_bytes();

Expand All @@ -130,7 +173,7 @@ mod tests {

#[test]
fn endpoint_receive_filter() {
let df = DebugFilter::new(&logger());
let df = DebugFilter::new(&logger(), None);
let endpoint = EndPoint {
name: "e1".to_string(),
address: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 12357),
Expand All @@ -146,7 +189,7 @@ mod tests {

#[test]
fn endpoint_send_filter() {
let df = DebugFilter::new(&logger());
let df = DebugFilter::new(&logger(), None);
let endpoint = EndPoint {
name: "e1".to_string(),
address: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 12357),
Expand All @@ -160,4 +203,32 @@ mod tests {
Some(result_contents) => assert_eq!(contents, result_contents),
}
}

#[test]
fn config_with_id() {
let log = logger();
let mut map = Mapping::new();
map.insert(Value::from("id"), Value::from("name"));

DebugFilter::config(&log, &Value::Mapping(map));
}

#[test]
fn config_without_id() {
let log = logger();
let mut map = Mapping::new();
map.insert(Value::from("id"), Value::from("name"));

DebugFilter::config(&log, &Value::Mapping(map));
}

#[test]
#[should_panic(expected = "DebugFilter.config.id should have a string value")]
fn config_should_panic() {
let log = logger();
let mut map = Mapping::new();
map.insert(Value::from("id"), Value::from(false));

DebugFilter::config(&log, &Value::Mapping(map));
}
}
12 changes: 6 additions & 6 deletions src/extensions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
use slog::Logger;

pub use filter_chain::FilterChain;
pub use filter_registry::{Filter, FilterRegistry};
pub use filter_registry::{BoxFilter, Filter, FilterRegistry};

use crate::extensions::filters::DebugFilter;

Expand All @@ -26,10 +26,10 @@ pub mod filters;

mod filter_chain;

/// default_filters returns a FilterRegistry with the default
/// set of filters registered to it
pub fn default_filters(base: &Logger) -> FilterRegistry {
let mut fr = FilterRegistry::new();
fr.insert(DebugFilter::name(), DebugFilter::new(base));
/// default_registry returns a FilterRegistry with the default
/// set of filters that are user configurable registered to it
pub fn default_registry(base: &Logger) -> FilterRegistry {
let mut fr = FilterRegistry::new(base);
fr.insert(DebugFilter::name(), Box::new(DebugFilter::config));
fr
}
4 changes: 2 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use clap::App;
use slog::{info, o, Drain, Logger};

use quilkin::config::Config;
use quilkin::extensions::default_filters;
use quilkin::extensions::default_registry;
use quilkin::server::Server;
use tokio::signal;
use tokio::sync::oneshot;
Expand All @@ -32,7 +32,7 @@ const VERSION: &'static str = env!("CARGO_PKG_VERSION");
async fn main() {
let base_logger = logger();
let log = base_logger.new(o!("source" => "main"));
let filter_registry = default_filters(&log);
let filter_registry = default_registry(&log);

let matches = App::new("Quilkin Proxy")
.version("0.1.0")
Expand Down
Loading

0 comments on commit 6d1f372

Please sign in to comment.