Skip to content

Commit

Permalink
Use ClusterMap for Filter::read
Browse files Browse the repository at this point in the history
  • Loading branch information
XAMPPRocky committed Nov 17, 2023
1 parent 044de43 commit 4bd89a8
Show file tree
Hide file tree
Showing 17 changed files with 182 additions and 69 deletions.
19 changes: 11 additions & 8 deletions src/cli/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,10 @@ impl Proxy {
});
}

if config.clusters.read().endpoints().count() == 0 && self.management_server.is_empty() {
if !config.clusters.read().has_endpoints() && self.management_server.is_empty() {
return Err(eyre::eyre!(
"`quilkin proxy` requires at least one `to` address or `management_server` endpoint."
));
"`quilkin proxy` requires at least one `to` address or `management_server` endpoint."
));
}

let id = config.id.load();
Expand Down Expand Up @@ -263,7 +263,7 @@ impl RuntimeConfig {
.read()
.as_ref()
.map_or(true, |health| health.load(Ordering::SeqCst))
&& config.clusters.read().endpoints().count() != 0
&& config.clusters.read().has_endpoints()
}
}

Expand Down Expand Up @@ -393,17 +393,20 @@ impl DownstreamReceiveWorkerConfig {
config: &Arc<Config>,
sessions: &Arc<SessionPool>,
) -> Result<usize, PipelineError> {
let endpoints: Vec<_> = config.clusters.read().endpoints().collect();
if endpoints.is_empty() {
if !config.clusters.read().has_endpoints() {
return Err(PipelineError::NoUpstreamEndpoints);
}

let filters = config.filters.load();
let mut context = ReadContext::new(endpoints, packet.source.into(), packet.contents);
let mut context = ReadContext::new(
config.clusters.clone_value(),
packet.source.into(),
packet.contents,
);
filters.read(&mut context).await?;
let mut bytes_written = 0;

for endpoint in context.endpoints.iter() {
for endpoint in context.destinations.iter() {
let session_key = SessionKey {
source: packet.source,
dest: endpoint.address.to_socket_addr().await?,
Expand Down
2 changes: 1 addition & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ impl Config {
pub fn apply_metrics(&self) {
let clusters = self.clusters.read();
crate::net::cluster::active_clusters().set(clusters.len() as i64);
crate::net::cluster::active_endpoints().set(clusters.endpoints().count() as i64);
crate::net::cluster::active_endpoints().set(clusters.num_of_endpoints() as i64);
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/config/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ impl<T: Clone> Watch<T> {
pub fn watch(&self) -> watch::Receiver<T> {
self.watchers.subscribe()
}

pub fn clone_value(&self) -> std::sync::Arc<T> {
self.value.clone()
}
}

impl<T: Clone + PartialEq + std::fmt::Debug> Watch<T> {
Expand Down
12 changes: 8 additions & 4 deletions src/filters/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,12 @@ mod tests {
}),
};
let filter = Capture::from_config(config.into());
let endpoints = vec![Endpoint::new("127.0.0.1:81".parse().unwrap())];
let endpoints = crate::net::cluster::ClusterMap::new_default(
[Endpoint::new("127.0.0.1:81".parse().unwrap())].into(),
);
assert!(filter
.read(&mut ReadContext::new(
endpoints,
endpoints.into(),
(std::net::Ipv4Addr::LOCALHOST, 80).into(),
"abc".to_string().into_bytes(),
))
Expand Down Expand Up @@ -235,9 +237,11 @@ mod tests {
where
F: Filter + ?Sized,
{
let endpoints = vec![Endpoint::new("127.0.0.1:81".parse().unwrap())];
let endpoints = crate::net::cluster::ClusterMap::new_default(
[Endpoint::new("127.0.0.1:81".parse().unwrap())].into(),
);
let mut context = ReadContext::new(
endpoints,
endpoints.into(),
"127.0.0.1:80".parse().unwrap(),
"helloabc".to_string().into_bytes(),
);
Expand Down
49 changes: 40 additions & 9 deletions src/filters/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,17 @@ impl Filter for FilterChain {
}
}

// Special case to handle to allow for pass-through, if no filter
// has rejected, and the destinations is empty, we passthrough to all.
// Which mimics the old behaviour while avoid clones in most cases.
if ctx.destinations.is_empty() {
ctx.destinations = ctx
.endpoints
.iter()
.flat_map(|e| e.value().iter().cloned().collect::<Vec<_>>())
.collect();
}

Ok(())
}

Expand Down Expand Up @@ -340,11 +351,15 @@ mod tests {
assert!(result.is_err());
}

fn endpoints() -> Vec<Endpoint> {
vec![
Endpoint::new("127.0.0.1:80".parse().unwrap()),
Endpoint::new("127.0.0.1:90".parse().unwrap()),
]
fn endpoints() -> std::sync::Arc<crate::net::cluster::ClusterMap> {
crate::net::cluster::ClusterMap::new_default(
[
Endpoint::new("127.0.0.1:80".parse().unwrap()),
Endpoint::new("127.0.0.1:90".parse().unwrap()),
]
.into(),
)
.into()
}

#[tokio::test]
Expand All @@ -361,15 +376,23 @@ mod tests {
config.filters.read(&mut context).await.unwrap();
let expected = endpoints_fixture.clone();

assert_eq!(expected, &*context.endpoints);
assert_eq!(
&*expected.endpoints().collect::<Vec<_>>(),
&*context.destinations
);
assert_eq!(b"hello:odr:127.0.0.1:70", &*context.contents);
assert_eq!(
"receive",
context.metadata[&"downstream".into()].as_string().unwrap()
);

let mut context = WriteContext::new(
endpoints_fixture[0].address.clone(),
endpoints_fixture
.endpoints()
.next()
.unwrap()
.address
.clone(),
"127.0.0.1:70".parse().unwrap(),
b"hello".to_vec(),
);
Expand Down Expand Up @@ -405,7 +428,10 @@ mod tests {

chain.read(&mut context).await.unwrap();
let expected = endpoints_fixture.clone();
assert_eq!(expected, context.endpoints.to_vec());
assert_eq!(
expected.endpoints().collect::<Vec<_>>(),
context.destinations
);
assert_eq!(
b"hello:odr:127.0.0.1:70:odr:127.0.0.1:70",
&*context.contents
Expand All @@ -416,7 +442,12 @@ mod tests {
);

let mut context = WriteContext::new(
endpoints_fixture[0].address.clone(),
endpoints_fixture
.endpoints()
.next()
.unwrap()
.address
.clone(),
"127.0.0.1:70".parse().unwrap(),
b"hello".to_vec(),
);
Expand Down
20 changes: 16 additions & 4 deletions src/filters/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,11 @@ mod tests {
let expected = contents_fixture();

// read compress
let endpoints = crate::net::cluster::ClusterMap::new_default(
[Endpoint::new("127.0.0.1:81".parse().unwrap())].into(),
);
let mut read_context = ReadContext::new(
vec![Endpoint::new("127.0.0.1:80".parse().unwrap())],
endpoints.into(),
"127.0.0.1:8080".parse().unwrap(),
expected.clone(),
);
Expand Down Expand Up @@ -238,9 +241,12 @@ mod tests {
Metrics::new(),
);

let endpoints = crate::net::cluster::ClusterMap::new_default(
[Endpoint::new("127.0.0.1:81".parse().unwrap())].into(),
);
assert!(compression
.read(&mut ReadContext::new(
vec![Endpoint::new("127.0.0.1:80".parse().unwrap())],
endpoints.into(),
"127.0.0.1:8080".parse().unwrap(),
b"hello".to_vec(),
))
Expand All @@ -259,8 +265,11 @@ mod tests {
Metrics::new(),
);

let endpoints = crate::net::cluster::ClusterMap::new_default(
[Endpoint::new("127.0.0.1:81".parse().unwrap())].into(),
);
let mut read_context = ReadContext::new(
vec![Endpoint::new("127.0.0.1:80".parse().unwrap())],
endpoints.into(),
"127.0.0.1:8080".parse().unwrap(),
b"hello".to_vec(),
);
Expand Down Expand Up @@ -345,8 +354,11 @@ mod tests {
);

// read decompress
let endpoints = crate::net::cluster::ClusterMap::new_default(
[Endpoint::new("127.0.0.1:81".parse().unwrap())].into(),
);
let mut read_context = ReadContext::new(
vec![Endpoint::new("127.0.0.1:80".parse().unwrap())],
endpoints.into(),
"127.0.0.1:8080".parse().unwrap(),
write_context.contents.clone(),
);
Expand Down
14 changes: 6 additions & 8 deletions src/filters/firewall.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,18 +137,16 @@ mod tests {
};

let local_ip = [192, 168, 75, 20];
let mut ctx = ReadContext::new(
vec![Endpoint::new((Ipv4Addr::LOCALHOST, 8080).into())],
(local_ip, 80).into(),
vec![],
let endpoints = crate::net::cluster::ClusterMap::new_default(
[Endpoint::new((Ipv4Addr::LOCALHOST, 8080).into())].into(),
);
let mut ctx = ReadContext::new(endpoints.into(), (local_ip, 80).into(), vec![]);
assert!(firewall.read(&mut ctx).await.is_ok());

let mut ctx = ReadContext::new(
vec![Endpoint::new((Ipv4Addr::LOCALHOST, 8080).into())],
(local_ip, 2000).into(),
vec![],
let endpoints = crate::net::cluster::ClusterMap::new_default(
[Endpoint::new((Ipv4Addr::LOCALHOST, 8080).into())].into(),
);
let mut ctx = ReadContext::new(endpoints.into(), (local_ip, 2000).into(), vec![]);
assert!(logs_contain("quilkin::filters::firewall")); // the given name to the the logger by tracing
assert!(logs_contain("Allow"));

Expand Down
14 changes: 8 additions & 6 deletions src/filters/load_balancer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,18 @@ mod tests {
input_addresses: &[EndpointAddress],
source: EndpointAddress,
) -> Vec<EndpointAddress> {
let mut context = ReadContext::new(
Vec::from_iter(input_addresses.iter().cloned().map(Endpoint::new)),
source,
vec![],
);
let endpoints = input_addresses
.iter()
.cloned()
.map(Endpoint::new)
.collect::<std::collections::BTreeSet<_>>();
let endpoints = crate::net::cluster::ClusterMap::new_default(endpoints);
let mut context = ReadContext::new(endpoints.into(), source, vec![]);

filter.read(&mut context).await.unwrap();

context
.endpoints
.destinations
.iter()
.map(|ep| ep.address.clone())
.collect::<Vec<_>>()
Expand Down
16 changes: 12 additions & 4 deletions src/filters/load_balancer/endpoint_chooser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ impl EndpointChooser for RoundRobinEndpointChooser {
fn choose_endpoints(&self, ctx: &mut ReadContext) {
let count = self.next_endpoint.fetch_add(1, Ordering::Relaxed);
// Note: The index is guaranteed to be in range.
ctx.endpoints = vec![ctx.endpoints[count % ctx.endpoints.len()].clone()];
ctx.destinations = vec![ctx
.endpoints
.nth_endpoint(count % ctx.endpoints.num_of_endpoints())
.unwrap()
.clone()];
}
}

Expand All @@ -58,8 +62,8 @@ pub struct RandomEndpointChooser;
impl EndpointChooser for RandomEndpointChooser {
fn choose_endpoints(&self, ctx: &mut ReadContext) {
// The index is guaranteed to be in range.
let index = thread_rng().gen_range(0..ctx.endpoints.len());
ctx.endpoints = vec![ctx.endpoints[index].clone()];
let index = thread_rng().gen_range(0..ctx.endpoints.num_of_endpoints());
ctx.destinations = vec![ctx.endpoints.nth_endpoint(index).unwrap().clone()];
}
}

Expand All @@ -70,6 +74,10 @@ impl EndpointChooser for HashEndpointChooser {
fn choose_endpoints(&self, ctx: &mut ReadContext) {
let mut hasher = DefaultHasher::new();
ctx.source.hash(&mut hasher);
ctx.endpoints = vec![ctx.endpoints[hasher.finish() as usize % ctx.endpoints.len()].clone()];
ctx.destinations = vec![ctx
.endpoints
.nth_endpoint(hasher.finish() as usize % ctx.endpoints.num_of_endpoints())
.unwrap()
.clone()];
}
}
13 changes: 8 additions & 5 deletions src/filters/local_rate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,11 +222,14 @@ mod tests {

/// Send a packet to the filter and assert whether or not it was processed.
async fn read(r: &LocalRateLimit, address: &EndpointAddress, should_succeed: bool) {
let endpoints = vec![crate::net::endpoint::Endpoint::new(
(Ipv4Addr::LOCALHOST, 8089).into(),
)];

let mut context = ReadContext::new(endpoints, address.clone(), vec![9]);
let endpoints = crate::net::cluster::ClusterMap::new_default(
[crate::net::endpoint::Endpoint::new(
(Ipv4Addr::LOCALHOST, 8089).into(),
)]
.into(),
);

let mut context = ReadContext::new(endpoints.into(), address.clone(), vec![9]);
let result = r.read(&mut context).await;

if should_succeed {
Expand Down
12 changes: 7 additions & 5 deletions src/filters/match.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,11 @@ mod tests {
assert_eq!(0, filter.metrics.packets_matched_total.get());

// config so we can test match and fallthrough.
let endpoints = crate::net::cluster::ClusterMap::new_default(
[Endpoint::new("127.0.0.1:81".parse().unwrap())].into(),
);
let mut ctx = ReadContext::new(
vec![Default::default()],
endpoints.into(),
([127, 0, 0, 1], 7000).into(),
contents.clone(),
);
Expand All @@ -216,11 +219,10 @@ mod tests {
assert_eq!(1, filter.metrics.packets_matched_total.get());
assert_eq!(0, filter.metrics.packets_fallthrough_total.get());

let mut ctx = ReadContext::new(
vec![Default::default()],
([127, 0, 0, 1], 7000).into(),
contents,
let endpoints = crate::net::cluster::ClusterMap::new_default(
[Endpoint::new("127.0.0.1:81".parse().unwrap())].into(),
);
let mut ctx = ReadContext::new(endpoints.into(), ([127, 0, 0, 1], 7000).into(), contents);
ctx.metadata.insert(key, "xyz".into());

let result = filter.read(&mut ctx).await;
Expand Down
Loading

0 comments on commit 4bd89a8

Please sign in to comment.