Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix watch on clusters, by removing inner Arc #773

Merged
merged 1 commit into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ once_cell = "1.17.1"
tracing-test = "0.2.4"
pretty_assertions = "1.3.0"
tempfile = "3.5.0"
rand = "0.8.5"

[build-dependencies]
tonic-build = { version = "0.9.2", default_features = false, features = ["transport", "prost"] }
Expand Down
6 changes: 3 additions & 3 deletions src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ fn handle_request(
}

fn check_proxy_readiness(config: &Config) -> Response<Body> {
if config.clusters.value().endpoints().count() > 0 {
if config.clusters.read().endpoints().count() > 0 {
return Response::new("ok".into());
}

Expand Down Expand Up @@ -144,7 +144,7 @@ mod tests {
#[test]
fn check_proxy_readiness() {
let config = Config::default();
assert_eq!(config.clusters.value().endpoints().count(), 0);
assert_eq!(config.clusters.read().endpoints().count(), 0);

let response = super::check_proxy_readiness(&config);
assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
Expand All @@ -154,7 +154,7 @@ mod tests {
)]
.into()]);

config.clusters.value().insert(cluster);
config.clusters.write().insert(cluster);

let response = super::check_proxy_readiness(&config);
assert_eq!(response.status(), StatusCode::OK);
Expand Down
92 changes: 64 additions & 28 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,6 @@ mod tests {
};

use crate::{
cluster::ClusterMap,
config::{Filter, Providers},
endpoint::{Endpoint, LocalityEndpoints},
filters::{Capture, StaticFilter, TokenRouter},
Expand All @@ -242,9 +241,9 @@ mod tests {
.map(Arc::new)
.unwrap();
let filters_file = tempfile::NamedTempFile::new().unwrap();
let config = Config::default();

std::fs::write(filters_file.path(), {
let config = Config::default();
config.filters.store(
vec![
Filter {
Expand Down Expand Up @@ -272,16 +271,18 @@ mod tests {
.unwrap();

let endpoints_file = tempfile::NamedTempFile::new().unwrap();
let config = Config::default();
std::fs::write(endpoints_file.path(), {
let mut config = Config::default();
config.clusters = crate::config::Watch::new(ClusterMap::new_with_default_cluster(
LocalityEndpoints::from(vec![Endpoint::with_metadata(
config
.clusters
.write()
.default_cluster_mut()
.insert(LocalityEndpoints::from(vec![Endpoint::with_metadata(
(std::net::Ipv4Addr::LOCALHOST, server_port).into(),
crate::endpoint::Metadata {
tokens: vec!["abc".into()].into_iter().collect(),
},
)]),
));
)]));
serde_yaml::to_string(&config).unwrap()
})
.unwrap();
Expand Down Expand Up @@ -335,36 +336,71 @@ mod tests {
tokio::time::sleep(Duration::from_millis(500)).await;
tokio::spawn(proxy.drive());
tokio::time::sleep(Duration::from_millis(500)).await;

let local_addr = crate::test_utils::available_addr().await;
let socket = UdpSocket::bind((Ipv4Addr::UNSPECIFIED, local_addr.port()))
.await
.map(Arc::new)
.unwrap();
let msg = b"helloabc";
socket
.send_to(msg, &(std::net::Ipv4Addr::LOCALHOST, 7777))
.await
let config = Config::default();

for _ in 0..5 {
let token = random_three_characters();

tracing::info!(?token, "writing new config");
std::fs::write(endpoints_file.path(), {
config
.clusters
.write()
.default_cluster_mut()
.insert(LocalityEndpoints::from(vec![Endpoint::with_metadata(
(std::net::Ipv4Addr::LOCALHOST, server_port).into(),
crate::endpoint::Metadata {
tokens: vec![token.clone()].into_iter().collect(),
},
)]));
serde_yaml::to_string(&config).unwrap()
})
.unwrap();
tokio::time::sleep(Duration::from_millis(80)).await;
let mut msg = Vec::from(*b"hello");
msg.extend_from_slice(&token);
tracing::info!(?token, "sending packet");
socket
.send_to(&msg, &(std::net::Ipv4Addr::LOCALHOST, 7777))
.await
.unwrap();

let recv = |socket: Arc<UdpSocket>| async move {
let mut buf = [0; u16::MAX as usize];
let length = socket.recv(&mut buf).await.unwrap();
buf[0..length].to_vec()
};
let recv = |socket: Arc<UdpSocket>| async move {
let mut buf = [0; u16::MAX as usize];
let length = socket.recv(&mut buf).await.unwrap();
buf[0..length].to_vec()
};

assert_eq!(
b"hello",
&&*timeout(Duration::from_secs(5), (recv)(server_socket.clone()))
.await
.expect("should have received a packet")
);
assert_eq!(
b"hello",
&&*timeout(Duration::from_secs(5), (recv)(server_socket.clone()))
.await
.expect("should have received a packet")
);

tracing::info!(?token, "received packet");

tracing::info!(?token, "sending bad packet");
// send an invalid packet
let msg = b"hello\xFF\xFF\xFF";
socket.send_to(msg, &local_addr).await.unwrap();

let result = timeout(Duration::from_secs(3), (recv)(server_socket.clone())).await;
assert!(result.is_err(), "should not have received a packet");
tracing::info!(?token, "didn't receive bad packet");
}
}

// send an invalid packet
let msg = b"helloxyz";
socket.send_to(msg, &local_addr).await.unwrap();
fn random_three_characters() -> Vec<u8> {
use rand::prelude::SliceRandom;
let chars: Vec<u8> = (*b"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ").into();
let mut rng = rand::thread_rng();

let result = timeout(Duration::from_secs(3), (recv)(server_socket.clone())).await;
assert!(result.is_err(), "should not have received a packet");
(0..3).map(|_| *chars.choose(&mut rng).unwrap()).collect()
}
}
7 changes: 4 additions & 3 deletions src/cli/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,12 @@ impl Proxy {
});

if !self.to.is_empty() {
config.clusters.value().default_cluster_mut().localities =
vec![self.to.clone().into()].into();
config.clusters.modify(|clusters| {
clusters.default_cluster_mut().localities = vec![self.to.clone().into()].into();
});
}

if config.clusters.value().endpoints().count() == 0 && self.management_server.is_empty() {
if config.clusters.read().endpoints().count() == 0 && self.management_server.is_empty() {
return Err(eyre::eyre!(
"`quilkin proxy` requires at least one `to` address or `management_server` endpoint."
));
Expand Down
14 changes: 7 additions & 7 deletions src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

use std::{collections::HashMap, sync::Arc};
use std::collections::HashMap;

use dashmap::DashMap;
use once_cell::sync::Lazy;
Expand Down Expand Up @@ -107,7 +107,7 @@ fn default_cluster_name() -> String {

/// Represents a full snapshot of all clusters.
#[derive(Clone, Default, Debug, Serialize)]
pub struct ClusterMap(Arc<DashMap<String, Cluster>>);
pub struct ClusterMap(DashMap<String, Cluster>);

type DashMapRef<'inner> = dashmap::mapref::one::Ref<'inner, String, Cluster>;
type DashMapRefMut<'inner> = dashmap::mapref::one::RefMut<'inner, String, Cluster>;
Expand Down Expand Up @@ -262,13 +262,13 @@ impl<'de> Deserialize<'de> for ClusterMap {
entry.name = entry.key().clone();
}

Ok(Self(Arc::new(map)))
Ok(Self(map))
}
}

impl From<DashMap<String, Cluster>> for ClusterMap {
fn from(value: DashMap<String, Cluster>) -> Self {
Self(Arc::new(value))
Self(value)
}
}

Expand All @@ -289,11 +289,11 @@ impl FromIterator<Cluster> for ClusterMap {
where
T: IntoIterator<Item = Cluster>,
{
Self(Arc::new(
Self(
iter.into_iter()
.map(|cluster| (cluster.name.clone(), cluster))
.collect(),
))
)
}
}

Expand All @@ -308,7 +308,7 @@ impl FromIterator<(String, Cluster)> for ClusterMap {
where
T: IntoIterator<Item = (String, Cluster)>,
{
Self(Arc::new(iter.into_iter().collect()))
Self(iter.into_iter().collect())
}
}

Expand Down
37 changes: 19 additions & 18 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl Config {
($($field:ident),+) => {
$(
if let Some(value) = map.get(stringify!($field)) {
tracing::trace!(%value, "replacing {}", stringify!($field));
tracing::debug!(%value, "replacing {}", stringify!($field));
self.$field.try_replace(serde_json::from_value(value.clone())?);
}
)+
Expand All @@ -89,17 +89,18 @@ impl Config {

replace_if_present!(filters, id);

if let Some(new_clusters) = map
.get("clusters")
.map(|value| serde_json::from_value(value.clone()))
.transpose()?
{
let clusters = self.clusters.value();

if let Some(value) = map.get("clusters") {
tracing::trace!(clusters=%value, "merging new clusters");
clusters.merge(serde_json::from_value(value.clone())?);
}

if let Some(locality) = locality {
clusters.update_unlocated_endpoints(&locality);
}
tracing::debug!(?new_clusters, old_clusters=?self.clusters, "merging new clusters");
self.clusters.modify(|clusters| {
clusters.merge(new_clusters);
if let Some(locality) = locality {
clusters.update_unlocated_endpoints(&locality);
}
});
}

self.apply_metrics();
Expand All @@ -116,7 +117,7 @@ impl Config {
let mut resources = Vec::new();
match resource_type {
ResourceType::Endpoint => {
for entry in self.clusters.value().iter() {
for entry in self.clusters.read().iter() {
resources.push(
resource_type
.encode_to_any(&ClusterLoadAssignment::try_from(entry.value())?)?,
Expand All @@ -132,7 +133,7 @@ impl Config {
ResourceType::Cluster => {
let clusters: Vec<_> = if names.is_empty() {
self.clusters
.value()
.read()
.iter()
.map(|entry| entry.value().clone())
.collect()
Expand All @@ -141,7 +142,7 @@ impl Config {
.iter()
.filter_map(|name| {
self.clusters
.value()
.read()
.get(name)
.map(|entry| entry.value().clone())
})
Expand Down Expand Up @@ -171,7 +172,7 @@ impl Config {
let apply_cluster = |cluster: Cluster| {
tracing::trace!(endpoints = %serde_json::to_value(&cluster).unwrap(), "applying new endpoints");
self.clusters
.value()
.write()
.default_entry(cluster.name.clone())
.merge(&cluster);
};
Expand Down Expand Up @@ -208,7 +209,7 @@ impl Config {
}

pub fn apply_metrics(&self) {
let clusters = self.clusters.value();
let clusters = self.clusters.read();
crate::cluster::active_clusters().set(clusters.localities().count() as i64);
crate::cluster::active_endpoints().set(clusters.endpoints().count() as i64);
}
Expand Down Expand Up @@ -396,7 +397,7 @@ id: server-proxy
}))
.unwrap();

let value = config.clusters.value();
let value = config.clusters.read();
assert_eq!(
&*value,
&ClusterMap::new_with_default_cluster(vec![Endpoint::new(
Expand Down Expand Up @@ -436,7 +437,7 @@ id: server-proxy
}))
.unwrap_or_default();

let value = config.clusters.value();
let value = config.clusters.read();
assert_eq!(
&*value,
&ClusterMap::new_with_default_cluster(vec![
Expand Down
10 changes: 5 additions & 5 deletions src/config/providers/k8s.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,12 @@ pub fn update_endpoints_from_gameservers(
match &locality {
Some(locality) => config
.clusters
.value()
.write()
.default_cluster_mut()
.insert((endpoint, locality.clone())),
None => config
.clusters
.value()
.write()
.default_cluster_mut()
.insert(endpoint),
};
Expand All @@ -141,14 +141,14 @@ pub fn update_endpoints_from_gameservers(
.collect();
let endpoints = LocalityEndpoints::from((servers, locality.clone()));
tracing::trace!(?endpoints, "Restarting with endpoints");
config.clusters.value().insert_default(endpoints);
config.clusters.write().insert_default(endpoints);
}

Event::Deleted(server) => {
let found = if let Some(endpoint) = server.endpoint() {
config.clusters.value().remove_endpoint(&endpoint)
config.clusters.write().remove_endpoint(&endpoint)
} else {
config.clusters.value().remove_endpoint_if(|endpoint| {
config.clusters.write().remove_endpoint_if(|endpoint| {
endpoint.metadata.unknown.get("name") == server.metadata.name.clone().map(From::from).as_ref()
})
};
Expand Down
Loading