Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add filter chain discovery to the relay #703

Merged
merged 2 commits into from
Feb 16, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion agones/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use kube::{
};
use tokio::sync::OnceCell;

use quilkin::config::watch::agones::crd::{
use quilkin::config::providers::k8s::agones::{
Fleet, FleetSpec, GameServer, GameServerPort, GameServerSpec, GameServerState,
GameServerTemplateSpec,
};
Expand Down
2 changes: 1 addition & 1 deletion agones/src/sidecar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ mod tests {
use crate::{game_server, is_gameserver_ready, quilkin_config_map, quilkin_container, Client};
use k8s_openapi::api::core::v1::{ConfigMap, ConfigMapVolumeSource, Volume};
use kube::{api::PostParams, runtime::wait::await_condition, Api, ResourceExt};
use quilkin::{config::watch::agones::crd::GameServer, test_utils::TestHelper};
use quilkin::{config::providers::k8s::agones::GameServer, test_utils::TestHelper};
use std::time::Duration;
use tokio::time::timeout;

Expand Down
2 changes: 1 addition & 1 deletion agones/src/xds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ mod tests {
use tokio::time::timeout;

use quilkin::{
config::watch::agones::crd::{Fleet, GameServer},
config::providers::k8s::agones::{Fleet, GameServer},
test_utils::TestHelper,
};

Expand Down
49 changes: 47 additions & 2 deletions src/cli/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

use std::sync::Arc;

use crate::config::Config;
use futures::StreamExt;

use crate::config::{Config, Providers};

pub const PORT: u16 = 7900;

Expand All @@ -32,16 +34,59 @@ pub struct Relay {
/// Port for xDS management_server service
#[clap(short, long, env = super::PORT_ENV_VAR, default_value_t = super::manage::PORT)]
xds_port: u16,
#[clap(subcommand)]
providers: Option<Providers>,
}

impl Relay {
pub async fn relay(&self, config: Arc<Config>) -> crate::Result<()> {
let xds_server = crate::xds::server::spawn(self.xds_port, config.clone());
let mds_server = tokio::spawn(crate::xds::server::control_plane_discovery_server(
self.mds_port,
config,
config.clone(),
));

let _provider_task = if let Some(Providers::Agones {
config_namespace, ..
}) = &self.providers
{
let config = config.clone();
let config_namespace = config_namespace.clone();
Some(tokio::spawn(Providers::task(move || {
let config = config.clone();
let config_namespace = config_namespace.clone();
async move {
let client = tokio::time::timeout(
std::time::Duration::from_secs(5),
kube::Client::try_default(),
)
.await??;

let configmap_reflector =
crate::config::providers::k8s::update_filters_from_configmap(
client.clone(),
config_namespace,
config.clone(),
);

tokio::pin!(configmap_reflector);

loop {
match configmap_reflector.next().await {
Some(Ok(_)) => (),
Some(Err(error)) => return Err(error),
None => break,
}
}

tracing::info!("configmap stream ending");
Ok(())
}
})))
} else {
None
};

tokio::select! {
result = xds_server => {
result
Expand Down
4 changes: 2 additions & 2 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use uuid::Uuid;

mod config_type;
mod error;
pub mod provider;
pub mod providers;
mod slot;
pub mod watch;

Expand All @@ -40,7 +40,7 @@ use crate::{
};

pub use self::{
config_type::ConfigType, error::ValidationError, provider::Providers, slot::Slot, watch::Watch,
config_type::ConfigType, error::ValidationError, providers::Providers, slot::Slot, watch::Watch,
};

base64_serde_type!(pub Base64Standard, base64::STANDARD);
Expand Down
4 changes: 3 additions & 1 deletion src/config/provider.rs → src/config/providers.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
pub mod k8s;

const RETRIES: u32 = 25;
const BACKOFF_STEP: std::time::Duration = std::time::Duration::from_millis(250);

Expand Down Expand Up @@ -53,7 +55,7 @@ impl Providers {
}

#[tracing::instrument(level = "trace", skip_all)]
async fn task<F>(task: impl FnMut() -> F) -> crate::Result<()>
pub async fn task<F>(task: impl FnMut() -> F) -> crate::Result<()>
where
F: std::future::Future<Output = crate::Result<()>>,
{
Expand Down
147 changes: 147 additions & 0 deletions src/config/providers/k8s.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
pub mod agones;

use std::sync::Arc;

use futures::Stream;
use k8s_openapi::api::core::v1::ConfigMap;
use kube::runtime::watcher::Event;

use agones::GameServer;

use crate::endpoint::{Endpoint, Locality, LocalityEndpoints};

pub fn update_filters_from_configmap(
client: kube::Client,
namespace: impl AsRef<str>,
config: Arc<crate::Config>,
) -> impl Stream<Item = crate::Result<(), eyre::Error>> {
async_stream::stream! {
for await event in configmap_events(client, namespace) {
tracing::trace!("new configmap event");

let event = match event {
Ok(event) => event,
Err(error) => {
yield Err(error.into());
continue;
}
};

let configmap = match event {
Event::Applied(configmap) => configmap,
Event::Restarted(configmaps) => match configmaps.get(0) {
Some(configmap) => configmap.clone(),
None => {
yield Ok(());
continue;
},
},
Event::Deleted(_) => {
config.filters.remove();
yield Ok(());
continue;
}
};

let data = configmap.data.ok_or_else(|| eyre::eyre!("configmap data missing"))?;
let data = data.get("quilkin.yaml").ok_or_else(|| eyre::eyre!("quilkin.yaml property not found"))?;
let data: serde_json::Map<String, serde_json::Value> = serde_yaml::from_str(data)?;

if let Some(filters) = data
.get("filters")
.cloned()
.map(serde_json::from_value)
.transpose()?
{
config.filters.store(Arc::new(filters));
}

yield Ok(());
}
}
}

fn configmap_events(
client: kube::Client,
namespace: impl AsRef<str>,
) -> impl Stream<Item = Result<Event<ConfigMap>, kube::runtime::watcher::Error>> {
let config_namespace = namespace.as_ref();
let configmap: kube::Api<ConfigMap> = kube::Api::namespaced(client, config_namespace);
let config_writer = kube::runtime::reflector::store::Writer::<ConfigMap>::default();
let configmap_stream = kube::runtime::watcher(
configmap,
kube::api::ListParams::default().labels("quilkin.dev/configmap=true"),
);
kube::runtime::reflector(config_writer, configmap_stream)
}

fn gameserver_events(
client: kube::Client,
namespace: impl AsRef<str>,
) -> impl Stream<Item = Result<Event<GameServer>, kube::runtime::watcher::Error>> {
let gameservers_namespace = namespace.as_ref();
let gameservers: kube::Api<GameServer> = kube::Api::namespaced(client, gameservers_namespace);
let gs_writer = kube::runtime::reflector::store::Writer::<GameServer>::default();
let gameserver_stream = kube::runtime::watcher(gameservers, kube::api::ListParams::default());
kube::runtime::reflector(gs_writer, gameserver_stream)
}

pub fn update_endpoints_from_gameservers(
client: kube::Client,
namespace: impl AsRef<str>,
config: Arc<crate::Config>,
locality: Option<Locality>,
) -> impl Stream<Item = crate::Result<(), eyre::Error>> {
async_stream::stream! {
for await event in gameserver_events(client, namespace) {
match event? {
Event::Applied(server) => {
if !server.is_allocated() {
yield Ok(());
continue;
}

let endpoint = Endpoint::try_from(server)?;
tracing::trace!(endpoint=%serde_json::to_value(&endpoint).unwrap(), "Adding endpoint");
match &locality {
Some(locality) => config
.clusters
.value()
.default_cluster_mut()
.insert((endpoint, locality.clone())),
None => config
.clusters
.value()
.default_cluster_mut()
.insert(endpoint),
};
tracing::trace!(clusters=%serde_json::to_value(&config.clusters).unwrap(), "current clusters");
}

Event::Restarted(servers) => {
let servers: Vec<_> = servers
.into_iter()
.filter(GameServer::is_allocated)
.map(Endpoint::try_from)
.collect::<Result<_, _>>()?;
let endpoints = LocalityEndpoints::from((servers, locality.clone()));
tracing::trace!(?endpoints, "Restarting with endpoints");
config.clusters.value().insert_default(endpoints);
}

Event::Deleted(server) => {
let endpoint = Endpoint::try_from(server)?;
tracing::trace!(?endpoint, "Deleting endpoint");
config.clusters.modify(|clusters| {
for locality in clusters.default_cluster_mut().localities.iter_mut() {
locality.remove(&endpoint);
}
});
}
};

config.apply_metrics();
yield Ok(());
}
}
}
File renamed without changes.
Loading