From 2134a2e37e899fd883da5a8551544f0bc0dcc541 Mon Sep 17 00:00:00 2001 From: Erin Power Date: Tue, 14 Feb 2023 18:17:46 +0100 Subject: [PATCH] Add filter chain discovery to the relay --- agones/src/lib.rs | 2 +- agones/src/sidecar.rs | 2 +- agones/src/xds.rs | 2 +- src/cli/relay.rs | 49 +++++- src/config.rs | 4 +- src/config/{provider.rs => providers.rs} | 4 +- src/config/providers/k8s.rs | 147 ++++++++++++++++ .../agones/crd.rs => providers/k8s/agones.rs} | 0 src/config/watch/agones.rs | 159 ++---------------- 9 files changed, 218 insertions(+), 151 deletions(-) rename src/config/{provider.rs => providers.rs} (96%) create mode 100644 src/config/providers/k8s.rs rename src/config/{watch/agones/crd.rs => providers/k8s/agones.rs} (100%) diff --git a/agones/src/lib.rs b/agones/src/lib.rs index 9bcdab615f..1b112e3747 100644 --- a/agones/src/lib.rs +++ b/agones/src/lib.rs @@ -41,7 +41,7 @@ use kube::{ }; use tokio::sync::OnceCell; -use quilkin::config::watch::agones::crd::{ +use quilkin::config::providers::k8s::agones::{ Fleet, FleetSpec, GameServer, GameServerPort, GameServerSpec, GameServerState, GameServerTemplateSpec, }; diff --git a/agones/src/sidecar.rs b/agones/src/sidecar.rs index eb5f1cab27..7ecb60dc70 100644 --- a/agones/src/sidecar.rs +++ b/agones/src/sidecar.rs @@ -19,7 +19,7 @@ mod tests { use crate::{game_server, is_gameserver_ready, quilkin_config_map, quilkin_container, Client}; use k8s_openapi::api::core::v1::{ConfigMap, ConfigMapVolumeSource, Volume}; use kube::{api::PostParams, runtime::wait::await_condition, Api, ResourceExt}; - use quilkin::{config::watch::agones::crd::GameServer, test_utils::TestHelper}; + use quilkin::{config::providers::k8s::agones::GameServer, test_utils::TestHelper}; use std::time::Duration; use tokio::time::timeout; diff --git a/agones/src/xds.rs b/agones/src/xds.rs index 7dd962bae5..f71ed29697 100644 --- a/agones/src/xds.rs +++ b/agones/src/xds.rs @@ -41,7 +41,7 @@ mod tests { use tokio::time::timeout; use quilkin::{ - config::watch::agones::crd::{Fleet, GameServer}, + config::providers::k8s::agones::{Fleet, GameServer}, test_utils::TestHelper, }; diff --git a/src/cli/relay.rs b/src/cli/relay.rs index 555f681600..930e284a55 100644 --- a/src/cli/relay.rs +++ b/src/cli/relay.rs @@ -16,7 +16,9 @@ use std::sync::Arc; -use crate::config::Config; +use futures::StreamExt; + +use crate::config::{Config, Providers}; pub const PORT: u16 = 7900; @@ -32,6 +34,8 @@ pub struct Relay { /// Port for xDS management_server service #[clap(short, long, env = super::PORT_ENV_VAR, default_value_t = super::manage::PORT)] xds_port: u16, + #[clap(subcommand)] + providers: Option, } impl Relay { @@ -39,9 +43,50 @@ impl Relay { let xds_server = crate::xds::server::spawn(self.xds_port, config.clone()); let mds_server = tokio::spawn(crate::xds::server::control_plane_discovery_server( self.mds_port, - config, + config.clone(), )); + let _provider_task = if let Some(Providers::Agones { + config_namespace, .. + }) = &self.providers + { + let config = config.clone(); + let config_namespace = config_namespace.clone(); + Some(tokio::spawn(Providers::task(move || { + let config = config.clone(); + let config_namespace = config_namespace.clone(); + async move { + let client = tokio::time::timeout( + std::time::Duration::from_secs(5), + kube::Client::try_default(), + ) + .await??; + + let configmap_reflector = + crate::config::providers::k8s::update_filters_from_configmap( + client.clone(), + config_namespace, + config.clone(), + ); + + tokio::pin!(configmap_reflector); + + loop { + match configmap_reflector.next().await { + Some(Ok(_)) => (), + Some(Err(error)) => return Err(error), + None => break, + } + } + + tracing::info!("configmap stream ending"); + Ok(()) + } + }))) + } else { + None + }; + tokio::select! { result = xds_server => { result diff --git a/src/config.rs b/src/config.rs index 6d991d9f7f..460f16045d 100644 --- a/src/config.rs +++ b/src/config.rs @@ -25,7 +25,7 @@ use uuid::Uuid; mod config_type; mod error; -pub mod provider; +pub mod providers; mod slot; pub mod watch; @@ -40,7 +40,7 @@ use crate::{ }; pub use self::{ - config_type::ConfigType, error::ValidationError, provider::Providers, slot::Slot, watch::Watch, + config_type::ConfigType, error::ValidationError, providers::Providers, slot::Slot, watch::Watch, }; base64_serde_type!(pub Base64Standard, base64::STANDARD); diff --git a/src/config/provider.rs b/src/config/providers.rs similarity index 96% rename from src/config/provider.rs rename to src/config/providers.rs index 730fc841a1..537d22e280 100644 --- a/src/config/provider.rs +++ b/src/config/providers.rs @@ -1,3 +1,5 @@ +pub mod k8s; + const RETRIES: u32 = 25; const BACKOFF_STEP: std::time::Duration = std::time::Duration::from_millis(250); @@ -53,7 +55,7 @@ impl Providers { } #[tracing::instrument(level = "trace", skip_all)] - async fn task(task: impl FnMut() -> F) -> crate::Result<()> + pub async fn task(task: impl FnMut() -> F) -> crate::Result<()> where F: std::future::Future>, { diff --git a/src/config/providers/k8s.rs b/src/config/providers/k8s.rs new file mode 100644 index 0000000000..eda52fb3d7 --- /dev/null +++ b/src/config/providers/k8s.rs @@ -0,0 +1,147 @@ +pub mod agones; + +use std::sync::Arc; + +use futures::Stream; +use k8s_openapi::api::core::v1::ConfigMap; +use kube::runtime::watcher::Event; + +use agones::GameServer; + +use crate::endpoint::{Endpoint, Locality, LocalityEndpoints}; + +pub fn update_filters_from_configmap( + client: kube::Client, + namespace: impl AsRef, + config: Arc, +) -> impl Stream> { + async_stream::stream! { + for await event in configmap_events(client, namespace) { + tracing::trace!("new configmap event"); + + let event = match event { + Ok(event) => event, + Err(error) => { + yield Err(error.into()); + continue; + } + }; + + let configmap = match event { + Event::Applied(configmap) => configmap, + Event::Restarted(configmaps) => match configmaps.get(0) { + Some(configmap) => configmap.clone(), + None => { + yield Ok(()); + continue; + }, + }, + Event::Deleted(_) => { + config.filters.remove(); + yield Ok(()); + continue; + } + }; + + let data = configmap.data.ok_or_else(|| eyre::eyre!("configmap data missing"))?; + let data = data.get("quilkin.yaml").ok_or_else(|| eyre::eyre!("quilkin.yaml property not found"))?; + let data: serde_json::Map = serde_yaml::from_str(data)?; + + if let Some(filters) = data + .get("filters") + .cloned() + .map(serde_json::from_value) + .transpose()? + { + config.filters.store(Arc::new(filters)); + } + + yield Ok(()); + } + } +} + +fn configmap_events( + client: kube::Client, + namespace: impl AsRef, +) -> impl Stream, kube::runtime::watcher::Error>> { + let config_namespace = namespace.as_ref(); + let configmap: kube::Api = kube::Api::namespaced(client, config_namespace); + let config_writer = kube::runtime::reflector::store::Writer::::default(); + let configmap_stream = kube::runtime::watcher( + configmap, + kube::api::ListParams::default().labels("quilkin.dev/configmap=true"), + ); + kube::runtime::reflector(config_writer, configmap_stream) +} + +fn gameserver_events( + client: kube::Client, + namespace: impl AsRef, +) -> impl Stream, kube::runtime::watcher::Error>> { + let gameservers_namespace = namespace.as_ref(); + let gameservers: kube::Api = kube::Api::namespaced(client, gameservers_namespace); + let gs_writer = kube::runtime::reflector::store::Writer::::default(); + let gameserver_stream = kube::runtime::watcher(gameservers, kube::api::ListParams::default()); + kube::runtime::reflector(gs_writer, gameserver_stream) +} + +pub fn update_endpoints_from_gameservers( + client: kube::Client, + namespace: impl AsRef, + config: Arc, + locality: Option, +) -> impl Stream> { + async_stream::stream! { + for await event in gameserver_events(client, namespace) { + match event? { + Event::Applied(server) => { + if !server.is_allocated() { + yield Ok(()); + continue; + } + + let endpoint = Endpoint::try_from(server)?; + tracing::trace!(endpoint=%serde_json::to_value(&endpoint).unwrap(), "Adding endpoint"); + match &locality { + Some(locality) => config + .clusters + .value() + .default_cluster_mut() + .insert((endpoint, locality.clone())), + None => config + .clusters + .value() + .default_cluster_mut() + .insert(endpoint), + }; + tracing::trace!(clusters=%serde_json::to_value(&config.clusters).unwrap(), "current clusters"); + } + + Event::Restarted(servers) => { + let servers: Vec<_> = servers + .into_iter() + .filter(GameServer::is_allocated) + .map(Endpoint::try_from) + .collect::>()?; + let endpoints = LocalityEndpoints::from((servers, locality.clone())); + tracing::trace!(?endpoints, "Restarting with endpoints"); + config.clusters.value().insert_default(endpoints); + } + + Event::Deleted(server) => { + let endpoint = Endpoint::try_from(server)?; + tracing::trace!(?endpoint, "Deleting endpoint"); + config.clusters.modify(|clusters| { + for locality in clusters.default_cluster_mut().localities.iter_mut() { + locality.remove(&endpoint); + } + }); + } + }; + + config.apply_metrics(); + yield Ok(()); + } + } +} diff --git a/src/config/watch/agones/crd.rs b/src/config/providers/k8s/agones.rs similarity index 100% rename from src/config/watch/agones/crd.rs rename to src/config/providers/k8s/agones.rs diff --git a/src/config/watch/agones.rs b/src/config/watch/agones.rs index 4a1259d2f9..cf2e0dbfa9 100644 --- a/src/config/watch/agones.rs +++ b/src/config/watch/agones.rs @@ -14,18 +14,10 @@ * limitations under the License. */ -pub mod crd; - use futures::TryStreamExt; -use k8s_openapi::api::core::v1::ConfigMap; -use kube::runtime::watcher::Event; use std::sync::Arc; -use crate::{ - endpoint::{Endpoint, Locality, LocalityEndpoints}, - Config, -}; -use crd::GameServer; +use crate::{endpoint::Locality, Config}; pub async fn watch( gameservers_namespace: impl AsRef, @@ -38,145 +30,26 @@ pub async fn watch( kube::Client::try_default(), ) .await??; - let config_namespace = config_namespace.as_ref(); - let gameservers_namespace = gameservers_namespace.as_ref(); - let configmap: kube::Api = kube::Api::namespaced(client.clone(), config_namespace); - let gameservers: kube::Api = kube::Api::namespaced(client, gameservers_namespace); - - let gs_writer = kube::runtime::reflector::store::Writer::::default(); - let config_writer = kube::runtime::reflector::store::Writer::::default(); - let configmap_stream = kube::runtime::watcher( - configmap, - kube::api::ListParams::default().labels("quilkin.dev/configmap=true"), + let configmap_reflector = crate::config::providers::k8s::update_filters_from_configmap( + client.clone(), + config_namespace, + config.clone(), + ); + let gameserver_reflector = crate::config::providers::k8s::update_endpoints_from_gameservers( + client, + gameservers_namespace, + config.clone(), + locality, ); - let gameserver_stream = kube::runtime::watcher(gameservers, kube::api::ListParams::default()); - let configmap_reflector = kube::runtime::reflector(config_writer, configmap_stream); - let gameserver_reflector = kube::runtime::reflector(gs_writer, gameserver_stream); - let this = Watcher { config }; - - let this = this.clone(); tokio::pin!(configmap_reflector); tokio::pin!(gameserver_reflector); loop { - let new_event: Option, Event>> = tokio::select! { - event = configmap_reflector.try_next() => event?.map(either::Left), - event = gameserver_reflector.try_next() => event?.map(either::Right), - }; - - match new_event { - Some(either::Left(configmap)) => { - this.handle_configmap_event(configmap).await?; - } - Some(either::Right(gameserver)) => { - this.handle_gameserver_event(gameserver, &locality).await?; - } - None => break Err(eyre::eyre!("Kubernetes stream unexpectedly ended")), - } - } -} - -#[derive(Clone)] -pub struct Watcher { - config: Arc, -} - -impl Watcher { - async fn handle_configmap_event(&self, event: Event) -> Result<(), tonic::Status> { - tracing::trace!("new configmap event"); - - let configmap = match event { - Event::Applied(configmap) => configmap, - Event::Restarted(configmaps) => match configmaps.get(0) { - Some(configmap) => configmap.clone(), - None => return Ok(()), - }, - Event::Deleted(_) => { - self.config.filters.remove(); - return Ok(()); - } + let Some(_) = tokio::select! { + result = configmap_reflector.try_next() => result?, + result = gameserver_reflector.try_next() => result?, + } else { + break Ok(()); }; - - self.update_configmap(configmap) - } - - fn update_configmap(&self, configmap: ConfigMap) -> Result<(), tonic::Status> { - let config = configmap - .data - .ok_or_else(|| tonic::Status::internal("No configmap data present"))?; - let config = config - .get("quilkin.yaml") - .ok_or_else(|| tonic::Status::internal("No quilkin.yaml present in configmap."))?; - - let data: serde_json::Map = - serde_yaml::from_str(config).map_err(|err| tonic::Status::internal(err.to_string()))?; - - if let Some(filters) = data - .get("filters") - .cloned() - .map(serde_json::from_value) - .transpose() - .map_err(|error| tonic::Status::internal(error.to_string()))? - { - self.config.filters.store(Arc::new(filters)); - } - - Ok(()) - } - - async fn handle_gameserver_event( - &self, - event: Event, - locality: &Option, - ) -> Result<(), tonic::Status> { - match event { - Event::Applied(server) => { - if !server.is_allocated() { - return Ok(()); - } - - let endpoint = Endpoint::try_from(server)?; - tracing::trace!(endpoint=%serde_json::to_value(&endpoint).unwrap(), "Adding endpoint"); - match locality { - Some(locality) => self - .config - .clusters - .value() - .default_cluster_mut() - .insert((endpoint, locality.clone())), - None => self - .config - .clusters - .value() - .default_cluster_mut() - .insert(endpoint), - }; - tracing::trace!(clusters=%serde_json::to_value(&self.config.clusters).unwrap(), "current clusters"); - } - - Event::Restarted(servers) => { - let servers: Vec<_> = servers - .into_iter() - .filter(GameServer::is_allocated) - .map(Endpoint::try_from) - .collect::>()?; - let endpoints = LocalityEndpoints::from((servers, locality.clone())); - tracing::trace!(?endpoints, "Restarting with endpoints"); - self.config.clusters.value().insert_default(endpoints); - } - - Event::Deleted(server) => { - let endpoint = Endpoint::try_from(server)?; - tracing::trace!(?endpoint, "Deleting endpoint"); - self.config.clusters.modify(|clusters| { - for locality in clusters.default_cluster_mut().localities.iter_mut() { - locality.remove(&endpoint); - } - }); - } - }; - - self.config.apply_metrics(); - Ok(()) } }