Skip to content

Commit

Permalink
Add filter duration metrics to FilterChain (#262)
Browse files Browse the repository at this point in the history
  • Loading branch information
XAMPPRocky authored Jun 4, 2021
1 parent c2ba704 commit b98ac93
Show file tree
Hide file tree
Showing 12 changed files with 212 additions and 94 deletions.
12 changes: 12 additions & 0 deletions docs/extensions/filters/filters.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,18 @@ There are a few things we note here:

* Exactly one filter chain is specified and used to process all packets that flow through Quilkin.

**Metrics**

* `filter_read_duration_seconds` The duration it took for a `filter`'s
`read` implementation to execute.
* Labels
* `filter` The name of the filter being executed.

* `filter_write_duration_seconds` The duration it took for a `filter`'s
`write` implementation to execute.
* Labels
* `filter` The name of the filter being executed.

### Configuration Examples ###

```rust
Expand Down
2 changes: 1 addition & 1 deletion src/extensions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

use slog::Logger;

pub(crate) use filter_chain::CreateFilterError;
pub(crate) use filter_chain::Error as FilterChainError;
pub use filter_chain::FilterChain;
pub use filter_registry::{
ConfigType, CreateFilterArgs, Error, Filter, FilterFactory, FilterRegistry, ReadContext,
Expand Down
160 changes: 108 additions & 52 deletions src/extensions/filter_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,46 +16,89 @@

use std::fmt::{self, Formatter};

use prometheus::Registry;
use prometheus::{Error as PrometheusError, Histogram, HistogramOpts, Registry};

use crate::config::{Filter as FilterConfig, ValidationError};
use crate::extensions::{
CreateFilterArgs, Filter, FilterRegistry, ReadContext, ReadResponse, WriteContext,
WriteResponse,
};
use crate::metrics::CollectorExt;

/// FilterChain implements a chain of Filters amd the implementation
/// of passing the information between Filters for each filter function
const FILTER_LABEL: &str = "filter";

/// A chain of [`Filter`]s to be executed in order.
///
/// Each filter implementation loops around all the filters stored in the FilterChain, passing the results of each filter to the next in the chain.
/// The filter implementation returns the results of data that has gone through each of the filters in the chain.
/// If any of the Filters in the chain return a None, then the chain is broken, and nothing is returned.
#[derive(Default)]
/// Executes each filter, passing the [`ReadContext`] and [`WriteContext`]
/// between each filter's execution, returning the result of data that has gone
/// through all of the filters in the chain. If any of the filters in the chain
/// return `None`, then the chain is broken, and `None` is returned.
pub struct FilterChain {
filters: Vec<Box<dyn Filter>>,
filters: Vec<(String, Box<dyn Filter>)>,
filter_read_duration_seconds: Vec<Histogram>,
filter_write_duration_seconds: Vec<Histogram>,
}

/// Represents an error while creating a `FilterChain`
#[derive(Debug)]
pub struct CreateFilterError {
filter_name: String,
error: ValidationError,
pub enum Error {
Prometheus(PrometheusError),
Filter {
filter_name: String,
error: ValidationError,
},
}

impl fmt::Display for CreateFilterError {
impl fmt::Display for Error {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(
f,
"failed to create filter {}: {}",
self.filter_name,
format!("{}", self.error)
)
match self {
Self::Prometheus(error) => f.write_str(&error.to_string()),
Self::Filter { filter_name, error } => {
write!(f, "failed to create filter {}: {}", filter_name, error,)
}
}
}
}

impl From<PrometheusError> for Error {
fn from(error: PrometheusError) -> Self {
Self::Prometheus(error)
}
}

impl FilterChain {
pub fn new(filters: Vec<Box<dyn Filter>>) -> Self {
FilterChain { filters }
pub fn new(
filters: Vec<(String, Box<dyn Filter>)>,
registry: &Registry,
) -> Result<Self, Error> {
Ok(Self {
filter_read_duration_seconds: filters
.iter()
.map(|(name, _)| {
Histogram::with_opts(
HistogramOpts::new(
"filter_read_duration_seconds",
"Seconds taken to execute a given filter's `read`.",
)
.const_label(FILTER_LABEL, name),
)
.and_then(|histogram| histogram.register_if_not_exists(&registry))
})
.collect::<Result<_, prometheus::Error>>()?,
filter_write_duration_seconds: filters
.iter()
.map(|(name, _)| {
Histogram::with_opts(
HistogramOpts::new(
"filter_write_duration_seconds",
"Seconds taken to execute a given filter's `write`.",
)
.const_label(FILTER_LABEL, name),
)
.and_then(|histogram| histogram.register_if_not_exists(&registry))
})
.collect::<Result<_, prometheus::Error>>()?,
filters,
})
}

/// Validates the filter configurations in the provided config and constructs
Expand All @@ -64,52 +107,57 @@ impl FilterChain {
filter_configs: Vec<FilterConfig>,
filter_registry: &FilterRegistry,
metrics_registry: &Registry,
) -> std::result::Result<FilterChain, CreateFilterError> {
let mut filters = Vec::<Box<dyn Filter>>::new();
) -> Result<Self, Error> {
let mut filters = Vec::new();

for filter_config in filter_configs {
match filter_registry.get(
&filter_config.name,
CreateFilterArgs::fixed(metrics_registry.clone(), filter_config.config.as_ref())
.with_metrics_registry(metrics_registry.clone()),
) {
Ok(filter) => filters.push(filter),
Ok(filter) => filters.push((filter_config.name, filter)),
Err(err) => {
return Err(CreateFilterError {
return Err(Error::Filter {
filter_name: filter_config.name.clone(),
error: err.into(),
});
}
}
}
Ok(FilterChain::new(filters))

FilterChain::new(filters, &metrics_registry)
}
}

impl Filter for FilterChain {
fn read(&self, mut ctx: ReadContext) -> Option<ReadResponse> {
let from = ctx.from;
for f in &self.filters {
match f.read(ctx) {
None => return None,
Some(response) => ctx = ReadContext::with_response(from, response),
}
}
Some(ctx.into())
fn read(&self, ctx: ReadContext) -> Option<ReadResponse> {
self.filters
.iter()
.zip(self.filter_read_duration_seconds.iter())
.try_fold(ctx, |ctx, ((_, filter), histogram)| {
Some(ReadContext::with_response(
ctx.from,
histogram.observe_closure_duration(|| filter.read(ctx))?,
))
})
.map(ReadResponse::from)
}

fn write(&self, mut ctx: WriteContext) -> Option<WriteResponse> {
let endpoint = ctx.endpoint;
let from = ctx.from;
let to = ctx.to;
for f in self.filters.iter().rev() {
match f.write(ctx) {
None => return None,
Some(response) => {
ctx = WriteContext::with_response(endpoint, from, to, response);
}
}
}
Some(ctx.into())
fn write(&self, ctx: WriteContext) -> Option<WriteResponse> {
self.filters
.iter()
.rev()
.zip(self.filter_write_duration_seconds.iter().rev())
.try_fold(ctx, |ctx, ((_, filter), histogram)| {
Some(WriteContext::with_response(
ctx.endpoint,
ctx.from,
ctx.to,
histogram.observe_closure_duration(|| filter.write(ctx))?,
))
})
.map(WriteResponse::from)
}
}

Expand All @@ -121,7 +169,7 @@ mod tests {
use crate::config::{Endpoints, UpstreamEndpoints};
use crate::extensions::filters::DebugFactory;
use crate::extensions::{default_registry, FilterFactory};
use crate::test_utils::{logger, TestFilter};
use crate::test_utils::{logger, new_test_chain, TestFilter};

use super::*;
use crate::cluster::Endpoint;
Expand Down Expand Up @@ -164,8 +212,8 @@ mod tests {

#[test]
fn chain_single_test_filter() {
let chain = FilterChain::new(vec![Box::new(TestFilter {})]);

let registry = prometheus::Registry::default();
let chain = new_test_chain(&registry);
let endpoints_fixture = endpoints();

let response = chain
Expand Down Expand Up @@ -215,7 +263,15 @@ mod tests {

#[test]
fn chain_double_test_filter() {
let chain = FilterChain::new(vec![Box::new(TestFilter {}), Box::new(TestFilter {})]);
let registry = prometheus::Registry::default();
let chain = FilterChain::new(
vec![
("TestFilter".into(), Box::new(TestFilter {})),
("TestFilter".into(), Box::new(TestFilter {})),
],
&registry,
)
.unwrap();

let endpoints_fixture = endpoints();

Expand Down
13 changes: 9 additions & 4 deletions src/extensions/filter_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,9 @@ mod tests {

#[tokio::test]
async fn dynamic_filter_manager_update_filter_chain() {
let filter_manager = FilterManager::fixed(Arc::new(FilterChain::new(vec![])));
let registry = prometheus::Registry::default();
let filter_manager =
FilterManager::fixed(Arc::new(FilterChain::new(vec![], &registry).unwrap()));
let (filter_chain_updates_tx, filter_chain_updates_rx) = mpsc::channel(10);
let (_shutdown_tx, shutdown_rx) = watch::channel(());

Expand Down Expand Up @@ -183,7 +185,8 @@ mod tests {
None
}
}
let filter_chain = Arc::new(FilterChain::new(vec![Box::new(Drop)]));
let filter_chain =
Arc::new(FilterChain::new(vec![("Drop".into(), Box::new(Drop))], &registry).unwrap());
assert!(filter_chain_updates_tx.send(filter_chain).await.is_ok());

let mut num_iterations = 0;
Expand Down Expand Up @@ -218,7 +221,9 @@ mod tests {
async fn dynamic_filter_manager_shutdown_task_on_shutdown_signal() {
// Test that we shut down the background task if we receive a shutdown signal.

let filter_manager = FilterManager::fixed(Arc::new(FilterChain::new(vec![])));
let registry = prometheus::Registry::default();
let filter_manager =
FilterManager::fixed(Arc::new(FilterChain::new(vec![], &registry).unwrap()));
let (filter_chain_updates_tx, filter_chain_updates_rx) = mpsc::channel(10);
let (shutdown_tx, shutdown_rx) = watch::channel(());

Expand All @@ -237,7 +242,7 @@ mod tests {

// Send a filter chain update on the channel. This should fail
// since the listening task should have shut down.
let filter_chain = Arc::new(FilterChain::new(vec![]));
let filter_chain = Arc::new(FilterChain::new(vec![], &registry).unwrap());
assert!(filter_chain_updates_tx.send(filter_chain).await.is_err());
}
}
7 changes: 5 additions & 2 deletions src/extensions/filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,11 @@ impl fmt::Display for ConvertProtoConfigError {
}

impl ConvertProtoConfigError {
pub fn new(reason: String, field: Option<String>) -> Self {
Self { reason, field }
pub fn new(reason: impl Into<String>, field: Option<String>) -> Self {
Self {
reason: reason.into(),
field,
}
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/proxy/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::config::{
parse_endpoint_metadata_from_yaml, Config, Endpoints, ManagementServer, Proxy, Source,
ValidationError, ValueInvalidArgs,
};
use crate::extensions::{default_registry, CreateFilterError, FilterChain, FilterRegistry};
use crate::extensions::{default_registry, FilterChain, FilterChainError, FilterRegistry};
use crate::proxy::server::metrics::Metrics as ProxyMetrics;
use crate::proxy::sessions::metrics::Metrics as SessionMetrics;
use crate::proxy::{Admin as ProxyAdmin, Health, Metrics, Server};
Expand All @@ -57,7 +57,7 @@ pub(super) struct ValidatedConfig {
#[derive(Debug)]
pub enum Error {
InvalidConfig(ValidationError),
CreateFilterChain(CreateFilterError),
CreateFilterChain(FilterChainError),
}

impl std::error::Error for Error {}
Expand All @@ -68,8 +68,8 @@ impl From<ValidationError> for Error {
}
}

impl From<CreateFilterError> for Error {
fn from(err: CreateFilterError) -> Self {
impl From<FilterChainError> for Error {
fn from(err: FilterChainError) -> Self {
Error::CreateFilterChain(err)
}
}
Expand Down
Loading

0 comments on commit b98ac93

Please sign in to comment.