Skip to content

Commit

Permalink
Config maps are now ignored for agents (#957)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jake-Shadle authored May 16, 2024
1 parent cbd7f3d commit 69fb369
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 51 deletions.
3 changes: 2 additions & 1 deletion src/components/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,13 @@ impl Agent {
}

let _mds_task = if !self.relay_servers.is_empty() {
let _provider_task = match self.provider.as_ref() {
let _provider_task = match self.provider {
Some(provider) => Some(provider.spawn(
config.clone(),
ready.provider_is_healthy.clone(),
self.locality,
self.address_selector,
true,
)),
None => return Err(eyre::eyre!("no configuration provider given")),
};
Expand Down
1 change: 1 addition & 0 deletions src/components/manage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ impl Manage {
ready.provider_is_healthy.clone(),
self.locality,
None,
false,
);

let _relay_stream = if !self.relay_servers.is_empty() {
Expand Down
3 changes: 2 additions & 1 deletion src/components/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ impl Relay {
Providers::Agones {
config_namespace, ..
} => {
let config_namespace = config_namespace.unwrap_or_else(|| "default".into());
let fut = Providers::task(provider_is_healthy.clone(), move || {
let config = config.clone();
let config_namespace = config_namespace.clone();
Expand All @@ -69,7 +70,7 @@ impl Relay {
let configmap_reflector =
crate::config::providers::k8s::update_filters_from_configmap(
client.clone(),
config_namespace,
&config_namespace,
config.clone(),
);

Expand Down
55 changes: 31 additions & 24 deletions src/config/providers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,8 @@ pub enum Providers {
/// and for a `ConfigMap` that specifies the filter configuration.
Agones {
/// The namespace under which the configmap is stored.
#[clap(
short,
long,
env = "QUILKIN_AGONES_CONFIG_NAMESPACE",
default_value = "default"
)]
config_namespace: String,
#[clap(short, long, env = "QUILKIN_AGONES_CONFIG_NAMESPACE")]
config_namespace: Option<String>,
/// The namespace under which the game servers run.
#[clap(
short,
Expand All @@ -43,31 +38,43 @@ pub enum Providers {
impl Providers {
#[tracing::instrument(level = "trace", skip_all)]
pub fn spawn(
&self,
self,
config: std::sync::Arc<crate::Config>,
health_check: Arc<AtomicBool>,
locality: Option<crate::net::endpoint::Locality>,
address_selector: Option<crate::config::AddressSelector>,
is_agent: bool,
) -> tokio::task::JoinHandle<crate::Result<()>> {
match &self {
match self {
Self::Agones {
gameservers_namespace,
config_namespace,
} => tokio::spawn(Self::task(health_check.clone(), {
let gameservers_namespace = gameservers_namespace.clone();
let config_namespace = config_namespace.clone();
let health_check = health_check.clone();
move || {
crate::config::watch::agones(
gameservers_namespace.clone(),
config_namespace.clone(),
health_check.clone(),
locality.clone(),
config.clone(),
address_selector.clone(),
)
}
})),
} => tokio::spawn(async move {
let config_namespace = match (config_namespace, is_agent) {
(Some(cns), false) => Some(cns),
(None, true) => None,
(None, false) => Some("default".into()),
(Some(cns), true) => {
tracing::warn!("'{cns}' via --config-namespace, -c, or QUILKIN_AGONES_CONFIG_NAMESPACE is ignored for agents and should not be set");
None
}
};

Self::task(health_check.clone(), {
let health_check = health_check.clone();
move || {
crate::config::watch::agones(
gameservers_namespace.clone(),
config_namespace.clone(),
health_check.clone(),
locality.clone(),
config.clone(),
address_selector.clone(),
)
}
})
.await
}),
Self::File { path } => tokio::spawn(Self::task(health_check.clone(), {
let path = path.clone();
let health_check = health_check.clone();
Expand Down
70 changes: 45 additions & 25 deletions src/config/watch/agones.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use std::sync::{
use crate::{net::endpoint::Locality, Config};

pub async fn watch(
gameservers_namespace: impl AsRef<str>,
config_namespace: impl AsRef<str>,
gameservers_namespace: String,
config_namespace: Option<String>,
health_check: Arc<AtomicBool>,
locality: Option<Locality>,
config: Arc<Config>,
Expand All @@ -35,31 +35,51 @@ pub async fn watch(
kube::Client::try_default(),
)
.await??;
let configmap_reflector = crate::config::providers::k8s::update_filters_from_configmap(
client.clone(),
config_namespace,
config.clone(),
);
let gameserver_reflector = crate::config::providers::k8s::update_endpoints_from_gameservers(
client,
gameservers_namespace,
config.clone(),
locality,
address_selector,
);
tokio::pin!(configmap_reflector);
tokio::pin!(gameserver_reflector);

loop {
let result = tokio::select! {
result = configmap_reflector.try_next() => result,
result = gameserver_reflector.try_next() => result,
};
if let Some(cns) = config_namespace {
let configmap_reflector = crate::config::providers::k8s::update_filters_from_configmap(
client.clone(),
cns,
config.clone(),
);
let gameserver_reflector = crate::config::providers::k8s::update_endpoints_from_gameservers(
client,
gameservers_namespace,
config.clone(),
locality,
address_selector,
);
tokio::pin!(configmap_reflector);
tokio::pin!(gameserver_reflector);

match result {
Ok(Some(_)) => health_check.store(true, Ordering::SeqCst),
Ok(None) => break Err(eyre::eyre!("kubernetes watch stream terminated")),
Err(error) => break Err(error),
loop {
let result = tokio::select! {
result = configmap_reflector.try_next() => result,
result = gameserver_reflector.try_next() => result,
};

match result {
Ok(Some(_)) => health_check.store(true, Ordering::SeqCst),
Ok(None) => break Err(eyre::eyre!("kubernetes watch stream terminated")),
Err(error) => break Err(error),
}
}
} else {
let gameserver_reflector = crate::config::providers::k8s::update_endpoints_from_gameservers(
client,
gameservers_namespace,
config.clone(),
locality,
address_selector,
);
tokio::pin!(gameserver_reflector);

loop {
match gameserver_reflector.try_next().await {
Ok(Some(_)) => health_check.store(true, Ordering::SeqCst),
Ok(None) => break Err(eyre::eyre!("kubernetes watch stream terminated")),
Err(error) => break Err(error),
}
}
}
}

0 comments on commit 69fb369

Please sign in to comment.