Skip to content

Commit

Permalink
Add FilterFactory::measure
Browse files Browse the repository at this point in the history
  • Loading branch information
XAMPPRocky committed May 19, 2021
1 parent 34fd6bc commit 2be1362
Show file tree
Hide file tree
Showing 16 changed files with 145 additions and 44 deletions.
25 changes: 22 additions & 3 deletions docs/extensions/filters/writing_custom_filters.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ To extend Quilkin's code with our own custom filter, we need to do the following

struct Greet;
impl Filter for Greet {
fn label(&self) -> &'static str {
"Greet"
}

fn read(&self, mut ctx: ReadContext) -> Option<ReadResponse> {
ctx.contents.splice(0..0, String::from("Hello ").into_bytes());
Some(ctx.into())
Expand All @@ -85,7 +89,11 @@ To extend Quilkin's code with our own custom filter, we need to do the following
```rust
// src/main.rs
# struct Greet;
# impl Filter for Greet {}
# impl Filter for Greet {
# fn label(&self) -> &'static str {
# "Greet"
# }
# }
# use quilkin::extensions::Filter;
use quilkin::extensions::{CreateFilterArgs, Error, FilterFactory};

Expand Down Expand Up @@ -204,6 +212,9 @@ The [Serde] crate is used to describe static YAML configuration in code while [P

struct Greet(String);
impl Filter for Greet {
fn label(&self) -> &'static str {
"Greet"
}
fn read(&self, mut ctx: ReadContext) -> Option<ReadResponse> {
ctx.contents
.splice(0..0, format!("{} ",self.0).into_bytes());
Expand All @@ -230,7 +241,11 @@ The [Serde] crate is used to describe static YAML configuration in code while [P
# use quilkin::extensions::{CreateFilterArgs, Error, FilterFactory};
# use quilkin::extensions::{Filter, ReadContext, ReadResponse, WriteContext, WriteResponse};
# struct Greet(String);
# impl Filter for Greet { }
# impl Filter for Greet {
# fn label(&self) -> &'static str {
# "Greet"
# }
# }

use quilkin::extensions::ConfigType;

Expand Down Expand Up @@ -361,7 +376,11 @@ However, it usually contains a Protobuf equivalent of the filter's static config
# }
# }
# struct Greet(String);
# impl Filter for Greet { }
# impl Filter for Greet {
# fn label(&self) -> &'static str {
# "Greet"
# }
# }
use bytes::Bytes;

struct GreetFilterFactory;
Expand Down
4 changes: 4 additions & 0 deletions examples/quilkin-filter-example/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ mod greet {

struct Greet(String);
impl Filter for Greet {
fn label(&self) -> &'static str {
"Greet"
}

fn read(&self, mut ctx: ReadContext) -> Option<ReadResponse> {
ctx.contents
.splice(0..0, format!("{} ", self.0).into_bytes());
Expand Down
4 changes: 4 additions & 0 deletions src/extensions/filter_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ impl FilterChain {
}

impl Filter for FilterChain {
fn label(&self) -> &'static str {
"FilterChain"
}

fn read(&self, mut ctx: ReadContext) -> Option<ReadResponse> {
let from = ctx.from;
for f in &self.filters {
Expand Down
4 changes: 4 additions & 0 deletions src/extensions/filter_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,10 @@ mod tests {
// A simple test filter that drops all packets flowing upstream.
struct Drop;
impl Filter for Drop {
fn label(&self) -> &'static str {
"Drop"
}

fn read(&self, _: ReadContext) -> Option<ReadResponse> {
None
}
Expand Down
18 changes: 18 additions & 0 deletions src/extensions/filter_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ impl From<WriteContext<'_>> for WriteResponse {

/// Filter is a trait for routing and manipulating packets.
pub trait Filter: Send + Sync {
/// The ID of the filter to use as a prometheus label.
fn label(&self) -> &'static str;

/// Read is invoked when the proxy receives data from a downstream connection on the
/// listening port.
/// This function should return a [`ReadResponse`] containing the array of
Expand Down Expand Up @@ -341,6 +344,17 @@ pub trait FilterFactory: Sync + Send {
) -> Result<ConfigType<'b>, Error> {
config.ok_or_else(|| Error::MissingConfig(self.name()))
}

/// Wraps a given `Filter` in a [`Measure`][super::filters::Measure] filter, providing metrics over the
/// filter's execution time of it's [`Filter::read`] and
/// [`Filter::write`] methods.
fn measure(
&self,
metrics: &prometheus::Registry,
filter: Box<dyn Filter>,
) -> Result<Box<dyn Filter>, Error> {
Ok(Box::new(super::filters::Measure::new(filter, metrics)?))
}
}

/// Registry of all [`Filter`]s that can be applied in the system.
Expand Down Expand Up @@ -400,6 +414,10 @@ mod tests {
struct TestFilter {}

impl Filter for TestFilter {
fn label(&self) -> &'static str {
"TestFilter"
}

fn read(&self, _: ReadContext) -> Option<ReadResponse> {
None
}
Expand Down
19 changes: 13 additions & 6 deletions src/extensions/filters/capture_bytes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,15 @@ impl FilterFactory for CaptureBytesFactory {
}

fn create_filter(&self, args: CreateFilterArgs) -> Result<Box<dyn Filter>, Error> {
Ok(Box::new(CaptureBytes::new(
&self.log,
self.require_config(args.config)?
.deserialize::<Config, ProtoConfig>(self.name().as_str())?,
Metrics::new(&args.metrics_registry)?,
)))
self.measure(
&args.metrics_registry,
Box::new(CaptureBytes::new(
&self.log,
self.require_config(args.config)?
.deserialize::<Config, ProtoConfig>(self.name().as_str())?,
Metrics::new(&args.metrics_registry)?,
)),
)
}
}

Expand Down Expand Up @@ -159,6 +162,10 @@ impl CaptureBytes {
}

impl Filter for CaptureBytes {
fn label(&self) -> &'static str {
"CaptureBytes"
}

fn read(&self, mut ctx: ReadContext) -> Option<ReadResponse> {
// if the capture size is bigger than the packet size, then we drop the packet,
// and occasionally warn
Expand Down
6 changes: 5 additions & 1 deletion src/extensions/filters/compress/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ impl FilterFactory for CompressFactory {
&self,
args: CreateFilterArgs,
) -> std::result::Result<Box<dyn Filter>, RegistryError> {
Ok(Box::new(Compress::new(
self.measure(&args.metrics_registry, Box::new(Compress::new(
&self.log,
self.require_config(args.config)?
.deserialize::<Config, ProtoConfig>(self.name().as_str())?,
Expand Down Expand Up @@ -208,6 +208,10 @@ impl Compress {
}

impl Filter for Compress {
fn label(&self) -> &'static str {
"Compress"
}

fn read(&self, mut ctx: ReadContext) -> Option<ReadResponse> {
let original_size = ctx.contents.len();

Expand Down
16 changes: 11 additions & 5 deletions src/extensions/filters/concatenate_bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,17 +130,19 @@ impl Default for ConcatBytesFactory {
Self {}
}
}

impl FilterFactory for ConcatBytesFactory {
fn name(&self) -> String {
"quilkin.extensions.filters.concatenate_bytes.v1alpha1.ConcatenateBytes".into()
}

fn create_filter(&self, args: CreateFilterArgs) -> Result<Box<dyn Filter>, Error> {
Ok(Box::new(ConcatenateBytes::new(
self.require_config(args.config)?
.deserialize::<Config, ProtoConfig>(self.name().as_str())?,
)))
self.measure(
&args.metrics_registry,
Box::new(ConcatenateBytes::new(
self.require_config(args.config)?
.deserialize::<Config, ProtoConfig>(self.name().as_str())?,
)),
)
}
}

Expand All @@ -155,6 +157,10 @@ impl ConcatenateBytes {
}

impl Filter for ConcatenateBytes {
fn label(&self) -> &'static str {
"ConcatenateBytes"
}

fn read(&self, mut ctx: ReadContext) -> Option<ReadResponse> {
match self.on_read {
Strategy::Append => {
Expand Down
4 changes: 4 additions & 0 deletions src/extensions/filters/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ impl FilterFactory for DebugFactory {
}

impl Filter for Debug {
fn label(&self) -> &'static str {
"Debug"
}

fn read(&self, ctx: ReadContext) -> Option<ReadResponse> {
info!(self.log, "Read filter event"; "from" => ctx.from, "contents" => packet_to_string(ctx.contents.clone()));
Some(ctx.into())
Expand Down
8 changes: 7 additions & 1 deletion src/extensions/filters/load_balancer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,17 @@ impl FilterFactory for LoadBalancerFilterFactory {
Policy::Random => Box::new(RandomEndpointChooser),
};

Ok(Box::new(LoadBalancerFilter { endpoint_chooser }))
self.measure(
&args.metrics_registry,
Box::new(LoadBalancerFilter { endpoint_chooser }),
)
}
}

impl Filter for LoadBalancerFilter {
fn label(&self) -> &'static str {
"LoadBalancer"
}
fn read(&self, mut ctx: ReadContext) -> Option<ReadResponse> {
self.endpoint_chooser.choose_endpoints(&mut ctx.endpoints);
Some(ctx.into())
Expand Down
19 changes: 13 additions & 6 deletions src/extensions/filters/local_rate_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,10 @@ struct RateLimitFilter {
/// shutdown_tx signals the spawned token refill future to exit.
shutdown_tx: Option<Sender<()>>,
}

const ID: &str = "quilkin.extensions.filters.local_rate_limit.v1alpha1.LocalRateLimit";
impl FilterFactory for RateLimitFilterFactory {
fn name(&self) -> String {
"quilkin.extensions.filters.local_rate_limit.v1alpha1.LocalRateLimit".into()
ID.into()
}

fn create_filter(&self, args: CreateFilterArgs) -> Result<Box<dyn Filter>, Error> {
Expand All @@ -106,10 +106,13 @@ impl FilterFactory for RateLimitFilterFactory {
reason: "value must be at least 100ms".into(),
})
} else {
Ok(Box::new(RateLimitFilter::new(
config,
Metrics::new(&args.metrics_registry)?,
)))
self.measure(
&args.metrics_registry,
Box::new(RateLimitFilter::new(
config,
Metrics::new(&args.metrics_registry)?,
)),
)
}
}
}
Expand Down Expand Up @@ -194,6 +197,10 @@ impl Drop for RateLimitFilter {
}

impl Filter for RateLimitFilter {
fn label(&self) -> &'static str {
"RateLimit"
}

fn read(&self, ctx: ReadContext) -> Option<ReadResponse> {
self.acquire_token().map(|()| ctx.into()).or_else(|| {
self.metrics.packets_dropped_total.inc();
Expand Down
29 changes: 12 additions & 17 deletions src/extensions/filters/measure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,34 +32,34 @@ pub struct Measure {
}

impl Measure {
pub fn new(
name: impl std::fmt::Display,
filter: Box<dyn Filter>,
registry: &Registry,
) -> Result<Self, Error> {
pub fn new(filter: Box<dyn Filter>, registry: &Registry) -> Result<Self, Error> {
Ok(Self {
filter,
read_time_elapsed_seconds: Histogram::with_opts(
HistogramOpts::new(
"read_time_elapsed_seconds",
"Seconds taken to execute a given filter's `read`.",
)
.variable_label(name.to_string()),
.const_label("filter", filter.label()),
)?
.register_if_not_exists(registry)?,
write_time_elapsed_seconds: Histogram::with_opts(
HistogramOpts::new(
"write_time_elapsed_seconds",
"Seconds taken to execute a given filter's `write`.",
)
.variable_label(name.to_string()),
.const_label("filter", filter.label()),
)?
.register_if_not_exists(registry)?,
filter,
})
}
}

impl Filter for Measure {
fn label(&self) -> &'static str {
"Measure"
}

fn read(&self, ctx: ReadContext) -> Option<ReadResponse> {
self.read_time_elapsed_seconds
.observe_closure_duration(|| self.filter.read(ctx))
Expand Down Expand Up @@ -89,7 +89,6 @@ impl FilterFactory for MeasureFactory {
.require_config(args.config)?
.deserialize::<Config, DynamicConfig>(self.name().as_str())?;
Ok(Box::new(Measure::new(
&config.name,
self.0.get(
&config.name,
CreateFilterArgs::fixed(args.metrics_registry.clone(), Some(&config.config)),
Expand Down Expand Up @@ -161,10 +160,6 @@ mod tests {
assert_filter_read_no_change, assert_write_no_change, assert_write_with_filter,
};

fn filter_name() -> String {
filter::ConcatBytesFactory.name()
}

#[test]
fn write_append() {
let config = filter::Config {
Expand All @@ -173,7 +168,7 @@ mod tests {
bytes: b"hello".to_vec(),
};
let filter = filter::ConcatenateBytes::new(config);
let measure = Measure::new(filter_name(), Box::new(filter), &Registry::default()).unwrap();
let measure = Measure::new(Box::new(filter), &Registry::default()).unwrap();
assert_write_with_filter(&measure, "abchello");
}

Expand All @@ -185,7 +180,7 @@ mod tests {
bytes: b"hello".to_vec(),
};
let filter = filter::ConcatenateBytes::new(config);
let measure = Measure::new(filter_name(), Box::new(filter), &Registry::default()).unwrap();
let measure = Measure::new(Box::new(filter), &Registry::default()).unwrap();
assert_write_with_filter(&measure, "helloabc");
}

Expand All @@ -197,7 +192,7 @@ mod tests {
bytes: vec![],
};
let filter = filter::ConcatenateBytes::new(config);
let measure = Measure::new(filter_name(), Box::new(filter), &Registry::default()).unwrap();
let measure = Measure::new(Box::new(filter), &Registry::default()).unwrap();
assert_filter_read_no_change(&measure);
}

Expand All @@ -209,7 +204,7 @@ mod tests {
bytes: vec![],
};
let filter = filter::ConcatenateBytes::new(config);
let measure = Measure::new(filter_name(), Box::new(filter), &Registry::default()).unwrap();
let measure = Measure::new(Box::new(filter), &Registry::default()).unwrap();
assert_write_no_change(&measure);
}
}
Loading

0 comments on commit 2be1362

Please sign in to comment.