Skip to content

Commit

Permalink
Add filter duration metrics to FilterChain
Browse files Browse the repository at this point in the history
  • Loading branch information
XAMPPRocky committed May 20, 2021
1 parent 0d65bf6 commit 57d5476
Show file tree
Hide file tree
Showing 21 changed files with 256 additions and 86 deletions.
25 changes: 22 additions & 3 deletions docs/extensions/filters/writing_custom_filters.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ To extend Quilkin's code with our own custom filter, we need to do the following

struct Greet;
impl Filter for Greet {
fn label(&self) -> &'static str {
"Greet"
}

fn read(&self, mut ctx: ReadContext) -> Option<ReadResponse> {
ctx.contents.splice(0..0, String::from("Hello ").into_bytes());
Some(ctx.into())
Expand All @@ -85,7 +89,11 @@ To extend Quilkin's code with our own custom filter, we need to do the following
```rust
// src/main.rs
# struct Greet;
# impl Filter for Greet {}
# impl Filter for Greet {
# fn label(&self) -> &'static str {
# "Greet"
# }
# }
# use quilkin::extensions::Filter;
use quilkin::extensions::{CreateFilterArgs, Error, FilterFactory};

Expand Down Expand Up @@ -204,6 +212,9 @@ The [Serde] crate is used to describe static YAML configuration in code while [P

struct Greet(String);
impl Filter for Greet {
fn label(&self) -> &'static str {
"Greet"
}
fn read(&self, mut ctx: ReadContext) -> Option<ReadResponse> {
ctx.contents
.splice(0..0, format!("{} ",self.0).into_bytes());
Expand All @@ -230,7 +241,11 @@ The [Serde] crate is used to describe static YAML configuration in code while [P
# use quilkin::extensions::{CreateFilterArgs, Error, FilterFactory};
# use quilkin::extensions::{Filter, ReadContext, ReadResponse, WriteContext, WriteResponse};
# struct Greet(String);
# impl Filter for Greet { }
# impl Filter for Greet {
# fn label(&self) -> &'static str {
# "Greet"
# }
# }

use quilkin::extensions::ConfigType;

Expand Down Expand Up @@ -361,7 +376,11 @@ However, it usually contains a Protobuf equivalent of the filter's static config
# }
# }
# struct Greet(String);
# impl Filter for Greet { }
# impl Filter for Greet {
# fn label(&self) -> &'static str {
# "Greet"
# }
# }
use bytes::Bytes;

struct GreetFilterFactory;
Expand Down
4 changes: 4 additions & 0 deletions examples/quilkin-filter-example/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ mod greet {

struct Greet(String);
impl Filter for Greet {
fn label(&self) -> &'static str {
"Greet"
}

fn read(&self, mut ctx: ReadContext) -> Option<ReadResponse> {
ctx.contents
.splice(0..0, format!("{} ", self.0).into_bytes());
Expand Down
151 changes: 104 additions & 47 deletions src/extensions/filter_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,46 +16,85 @@

use std::fmt::{self, Formatter};

use prometheus::Registry;
use prometheus::{Error as PrometheusError, Histogram, HistogramOpts, Registry};

use crate::config::{Filter as FilterConfig, ValidationError};
use crate::extensions::{
CreateFilterArgs, Filter, FilterRegistry, ReadContext, ReadResponse, WriteContext,
WriteResponse,
};
use crate::metrics::CollectorExt;

/// FilterChain implements a chain of Filters amd the implementation
/// of passing the information between Filters for each filter function
const FILTER_WRITE_LABEL: &str = "filter";
const FILTER_READ_LABEL: &str = "filter";

/// A chain of [`Filter`]s to be executed in order.
///
/// Each filter implementation loops around all the filters stored in the FilterChain, passing the results of each filter to the next in the chain.
/// The filter implementation returns the results of data that has gone through each of the filters in the chain.
/// If any of the Filters in the chain return a None, then the chain is broken, and nothing is returned.
#[derive(Default)]
/// Executes each filter, passing the [`ReadContext`] and [`WriteContext`]
/// between each filter's execution, returning the result of data that has gone
/// through all of the filters in the chain. If any of the filters in the chain
/// return `None`, then the chain is broken, and `None` is returned.
pub struct FilterChain {
filters: Vec<Box<dyn Filter>>,
filter_read_time_elapsed_seconds: Vec<Histogram>,
filter_write_time_elapsed_seconds: Vec<Histogram>,
}

/// Represents an error while creating a `FilterChain`
#[derive(Debug)]
pub struct CreateFilterError {
filter_name: String,
error: ValidationError,
pub enum Error {
Prometheus(PrometheusError),
Filter {
filter_name: String,
error: ValidationError,
},
}

impl fmt::Display for CreateFilterError {
impl fmt::Display for Error {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(
f,
"failed to create filter {}: {}",
self.filter_name,
format!("{}", self.error)
)
match self {
Self::Prometheus(error) => f.write_str(&error.to_string()),
Self::Filter { filter_name, error } => {
write!(f, "failed to create filter {}: {}", filter_name, error,)
}
}
}
}

impl From<PrometheusError> for Error {
fn from(error: PrometheusError) -> Self {
Self::Prometheus(error)
}
}

impl FilterChain {
pub fn new(filters: Vec<Box<dyn Filter>>) -> Self {
FilterChain { filters }
pub fn new(filters: Vec<Box<dyn Filter>>, registry: &Registry) -> Result<Self, Error> {
Ok(Self {
filter_read_time_elapsed_seconds: filters
.iter()
.map(|f| {
Histogram::with_opts(
HistogramOpts::new(
"filter_read_time_elapsed_seconds",
"Seconds taken to execute a given filter's `read`.",
)
.const_label(FILTER_READ_LABEL, f.label()),
)
.and_then(|h| h.register_if_not_exists(&registry))
}).collect::<Result<_, prometheus::Error>>()?,
filter_write_time_elapsed_seconds: filters
.iter()
.map(|f| {
Histogram::with_opts(
HistogramOpts::new(
"filter_write_time_elapsed_seconds",
"Seconds taken to execute a given filter's `write`.",
)
.const_label(FILTER_WRITE_LABEL, f.label()),
)
.and_then(|h| h.register_if_not_exists(&registry))
}).collect::<Result<_, prometheus::Error>>()?,
filters,
})
}

/// Validates the filter configurations in the provided config and constructs
Expand All @@ -64,7 +103,7 @@ impl FilterChain {
filter_configs: Vec<FilterConfig>,
filter_registry: &FilterRegistry,
metrics_registry: &Registry,
) -> std::result::Result<FilterChain, CreateFilterError> {
) -> Result<Self, Error> {
let mut filters = Vec::<Box<dyn Filter>>::new();
for filter_config in filter_configs {
match filter_registry.get(
Expand All @@ -74,42 +113,50 @@ impl FilterChain {
) {
Ok(filter) => filters.push(filter),
Err(err) => {
return Err(CreateFilterError {
return Err(Error::Filter {
filter_name: filter_config.name.clone(),
error: err.into(),
});
}
}
}
Ok(FilterChain::new(filters))

FilterChain::new(filters, &metrics_registry)
}
}

impl Filter for FilterChain {
fn read(&self, mut ctx: ReadContext) -> Option<ReadResponse> {
let from = ctx.from;
for f in &self.filters {
match f.read(ctx) {
None => return None,
Some(response) => ctx = ReadContext::with_response(from, response),
}
}
Some(ctx.into())
fn label(&self) -> &'static str {
"FilterChain"
}

fn write(&self, mut ctx: WriteContext) -> Option<WriteResponse> {
let endpoint = ctx.endpoint;
let from = ctx.from;
let to = ctx.to;
for f in self.filters.iter().rev() {
match f.write(ctx) {
None => return None,
Some(response) => {
ctx = WriteContext::with_response(endpoint, from, to, response);
}
}
}
Some(ctx.into())
fn read(&self, ctx: ReadContext) -> Option<ReadResponse> {
self.filters
.iter()
.zip(self.filter_read_time_elapsed_seconds.iter())
.try_fold(ctx, |ctx, (filter, histogram)| {
Some(ReadContext::with_response(
ctx.from,
histogram.observe_closure_duration(|| filter.read(ctx))?,
))
})
.map(ReadResponse::from)
}

fn write(&self, ctx: WriteContext) -> Option<WriteResponse> {
self.filters
.iter()
.rev()
.zip(self.filter_write_time_elapsed_seconds.iter().rev())
.try_fold(ctx, |ctx, (filter, histogram)| {
Some(WriteContext::with_response(
ctx.endpoint,
ctx.from,
ctx.to,
histogram.observe_closure_duration(|| filter.write(ctx))?,
))
})
.map(WriteResponse::from)
}
}

Expand Down Expand Up @@ -164,7 +211,12 @@ mod tests {

#[test]
fn chain_single_test_filter() {
let chain = FilterChain::new(vec![Box::new(TestFilter {})]);
let registry = prometheus::Registry::default();
let chain = FilterChain::new(
vec![Box::new(TestFilter {}), Box::new(TestFilter {})],
&registry,
)
.unwrap();

let endpoints_fixture = endpoints();

Expand Down Expand Up @@ -215,7 +267,12 @@ mod tests {

#[test]
fn chain_double_test_filter() {
let chain = FilterChain::new(vec![Box::new(TestFilter {}), Box::new(TestFilter {})]);
let registry = prometheus::Registry::default();
let chain = FilterChain::new(
vec![Box::new(TestFilter {}), Box::new(TestFilter {})],
&registry,
)
.unwrap();

let endpoints_fixture = endpoints();

Expand Down
16 changes: 12 additions & 4 deletions src/extensions/filter_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,9 @@ mod tests {

#[tokio::test]
async fn dynamic_filter_manager_update_filter_chain() {
let filter_manager = FilterManager::fixed(Arc::new(FilterChain::new(vec![])));
let registry = prometheus::Registry::default();
let filter_manager =
FilterManager::fixed(Arc::new(FilterChain::new(vec![], &registry).unwrap()));
let (filter_chain_updates_tx, filter_chain_updates_rx) = mpsc::channel(10);
let (_shutdown_tx, shutdown_rx) = watch::channel(());

Expand Down Expand Up @@ -179,11 +181,15 @@ mod tests {
// A simple test filter that drops all packets flowing upstream.
struct Drop;
impl Filter for Drop {
fn label(&self) -> &'static str {
"Drop"
}

fn read(&self, _: ReadContext) -> Option<ReadResponse> {
None
}
}
let filter_chain = Arc::new(FilterChain::new(vec![Box::new(Drop)]));
let filter_chain = Arc::new(FilterChain::new(vec![Box::new(Drop)], &registry).unwrap());
assert!(filter_chain_updates_tx.send(filter_chain).await.is_ok());

let mut num_iterations = 0;
Expand Down Expand Up @@ -218,7 +224,9 @@ mod tests {
async fn dynamic_filter_manager_shutdown_task_on_shutdown_signal() {
// Test that we shut down the background task if we receive a shutdown signal.

let filter_manager = FilterManager::fixed(Arc::new(FilterChain::new(vec![])));
let registry = prometheus::Registry::default();
let filter_manager =
FilterManager::fixed(Arc::new(FilterChain::new(vec![], &registry).unwrap()));
let (filter_chain_updates_tx, filter_chain_updates_rx) = mpsc::channel(10);
let (shutdown_tx, shutdown_rx) = watch::channel(());

Expand All @@ -237,7 +245,7 @@ mod tests {

// Send a filter chain update on the channel. This should fail
// since the listening task should have shut down.
let filter_chain = Arc::new(FilterChain::new(vec![]));
let filter_chain = Arc::new(FilterChain::new(vec![], &registry).unwrap());
assert!(filter_chain_updates_tx.send(filter_chain).await.is_err());
}
}
11 changes: 10 additions & 1 deletion src/extensions/filter_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ impl From<WriteContext<'_>> for WriteResponse {

/// Filter is a trait for routing and manipulating packets.
pub trait Filter: Send + Sync {
/// The ID of the filter to use as a prometheus label.
fn label(&self) -> &'static str;

/// Read is invoked when the proxy receives data from a downstream connection on the
/// listening port.
/// This function should return a [`ReadResponse`] containing the array of
Expand Down Expand Up @@ -341,7 +344,9 @@ pub trait FilterFactory: Sync + Send {
}
}

/// FilterRegistry is the registry of all Filters that can be applied in the system.
/// Registry of all [`Filter`]s that can be applied in the system.
///
/// **Note:** Cloning `FilterRegistry` is a shallow clone.
#[derive(Default)]
pub struct FilterRegistry {
registry: HashMap<String, Box<dyn FilterFactory>>,
Expand Down Expand Up @@ -386,6 +391,10 @@ mod tests {
struct TestFilter {}

impl Filter for TestFilter {
fn label(&self) -> &'static str {
"TestFilter"
}

fn read(&self, _: ReadContext) -> Option<ReadResponse> {
None
}
Expand Down
4 changes: 4 additions & 0 deletions src/extensions/filters/capture_bytes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ impl CaptureBytes {
}

impl Filter for CaptureBytes {
fn label(&self) -> &'static str {
"CaptureBytes"
}

fn read(&self, mut ctx: ReadContext) -> Option<ReadResponse> {
// if the capture size is bigger than the packet size, then we drop the packet,
// and occasionally warn
Expand Down
Loading

0 comments on commit 57d5476

Please sign in to comment.