Skip to content

Commit

Permalink
Integration tests for Agones Relay and Agent
Browse files Browse the repository at this point in the history
This implements the integration tests for `quilkin relay` and `quilkin
agent` for Agones integration.

This includes refactoring of the integration test libraries to
consolidate and improve aspects of orchestrating and running
proxies in a Kubernetes cluster for integration tests.

Also includes a bug fix on readiness for Agent that will eventually be
replaced by googleforgames#802, but allows for tests to pass.

Closes googleforgames#806
  • Loading branch information
markmandel committed Oct 9, 2023
1 parent 9ba3fe5 commit 244a99f
Show file tree
Hide file tree
Showing 6 changed files with 837 additions and 453 deletions.
3 changes: 2 additions & 1 deletion agones/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ readme = "README.md"

[dependencies]
base64.workspace = true
futures.workspace = true
k8s-openapi.workspace = true
kube = { workspace = true, features = ["openssl-tls", "client", "derive", "runtime"] }
quilkin = { path = "../" }
serial_test = "2.0.0"
tokio.workspace = true
futures.workspace = true
tracing.workspace = true
266 changes: 263 additions & 3 deletions agones/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,33 +14,41 @@
* limitations under the License.
*/

use std::net::SocketAddr;
use std::time::Duration;
use std::{
collections::BTreeMap,
env,
time::{SystemTime, UNIX_EPOCH},
};

use futures::{AsyncBufReadExt, TryStreamExt};
use k8s_openapi::api::apps::v1::DeploymentSpec;
use k8s_openapi::api::core::v1::{ContainerPort, Node};
use k8s_openapi::api::rbac::v1::PolicyRule;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector;
use k8s_openapi::{
api::{
apps::v1::Deployment,
core::v1::{
ConfigMap, Container, EnvVar, Event, HTTPGetAction, Namespace, Pod, PodSpec,
PodTemplateSpec, Probe, ResourceRequirements, ServiceAccount, VolumeMount,
},
rbac::v1::{RoleBinding, RoleRef, Subject},
rbac::v1::{ClusterRole, RoleBinding, RoleRef, Subject},
},
apimachinery::pkg::{
api::resource::Quantity, apis::meta::v1::ObjectMeta, util::intstr::IntOrString,
},
chrono,
};
use kube::runtime::wait::await_condition;
use kube::{
api::{DeleteParams, ListParams, LogParams, PostParams},
runtime::wait::Condition,
Api, Resource, ResourceExt,
};
use tokio::sync::OnceCell;
use tokio::time::timeout;
use tracing::debug;

use quilkin::config::providers::k8s::agones::{
Expand All @@ -49,8 +57,9 @@ use quilkin::config::providers::k8s::agones::{
};

mod pod;
mod provider;
mod relay;
mod sidecar;
mod xds;

#[allow(dead_code)]
static CLIENT: OnceCell<Client> = OnceCell::const_new();
Expand All @@ -63,6 +72,9 @@ const DELETE_DELAY_SECONDS: &str = "DELETE_DELAY_SECONDS";
pub const GAMESERVER_IMAGE: &str =
"us-docker.pkg.dev/agones-images/examples/simple-game-server:0.16";

/// The dynamic metadata key for routing tokens
pub const TOKEN_KEY: &str = "quilkin.dev/tokens";

#[derive(Clone)]
pub struct Client {
/// The Kubernetes client
Expand Down Expand Up @@ -215,6 +227,230 @@ async fn add_agones_service_account(client: kube::Client, namespace: String) {
let _ = role_bindings.create(&pp, &role_binding).await.unwrap();
}

/// Creates a Service account and related RBAC objects to enable a process to query Agones
/// and ConfigMap resources within a cluster
pub async fn create_agones_rbac_read_account(
client: &Client,
service_accounts: Api<ServiceAccount>,
cluster_roles: Api<ClusterRole>,
role_bindings: Api<RoleBinding>,
) -> String {
let pp = PostParams::default();
let rbac_name = "quilkin-agones";

// check if sevice account already exists, otherwise create it.
if service_accounts.get(rbac_name).await.is_ok() {
return rbac_name.into();
}

// create all the rbac rules

let rbac_meta = ObjectMeta {
name: Some(rbac_name.into()),
..Default::default()
};
let service_account = ServiceAccount {
metadata: rbac_meta.clone(),
..Default::default()
};
service_accounts
.create(&pp, &service_account)
.await
.unwrap();

// Delete the cluster role if it already exists, since it's cluster wide.
match cluster_roles
.delete(rbac_name, &DeleteParams::default())
.await
{
Ok(_) => {}
Err(err) => println!("Cluster role not found: {err}"),
};
let cluster_role = ClusterRole {
metadata: rbac_meta.clone(),
rules: Some(vec![
PolicyRule {
api_groups: Some(vec!["agones.dev".into()]),
resources: Some(vec!["gameservers".into()]),
verbs: ["get", "list", "watch"].map(String::from).to_vec(),
..Default::default()
},
PolicyRule {
api_groups: Some(vec!["".into()]),
resources: Some(vec!["configmaps".into()]),
verbs: ["get", "list", "watch"].map(String::from).to_vec(),
..Default::default()
},
]),
..Default::default()
};
cluster_roles.create(&pp, &cluster_role).await.unwrap();

let binding = RoleBinding {
metadata: rbac_meta,
subjects: Some(vec![Subject {
kind: "User".into(),
name: format!("system:serviceaccount:{}:{rbac_name}", client.namespace),
api_group: Some("rbac.authorization.k8s.io".into()),
..Default::default()
}]),
role_ref: RoleRef {
api_group: "rbac.authorization.k8s.io".into(),
kind: "ClusterRole".into(),
name: rbac_name.into(),
},
};
role_bindings.create(&pp, &binding).await.unwrap();
rbac_name.into()
}

/// Create a Deployment with a singular Quilkin proxy, and return it's address.
/// The `name` variable is used as role={name} for label lookup.
pub async fn quilkin_proxy_deployment(
client: &Client,
deployments: Api<Deployment>,
name: String,
host_port: u16,
management_server: String,
) -> SocketAddr {
let pp = PostParams::default();
let mut container = quilkin_container(
client,
Some(vec![
"proxy".into(),
format!("--management-server={management_server}"),
]),
None,
);

// we'll use a host port, since spinning up a load balancer takes a long time.
// we know that port 7777 is open because this is an Agones cluster and it has associated
// firewall rules , and even if we conflict with a GameServer
// the k8s scheduler will move us to another node.
container.ports = Some(vec![ContainerPort {
container_port: 7777,
host_port: Some(host_port as i32),
protocol: Some("UDP".into()),
..Default::default()
}]);

let labels = BTreeMap::from([("role".to_string(), name.clone())]);
let deployment = Deployment {
metadata: ObjectMeta {
name: Some(name),
labels: Some(labels.clone()),
..Default::default()
},
spec: Some(DeploymentSpec {
replicas: Some(1),
selector: LabelSelector {
match_expressions: None,
match_labels: Some(labels.clone()),
},
template: PodTemplateSpec {
metadata: Some(ObjectMeta {
labels: Some(labels.clone()),
..Default::default()
}),
spec: Some(PodSpec {
containers: vec![container],
..Default::default()
}),
},
..Default::default()
}),
..Default::default()
};

let deployment = deployments.create(&pp, &deployment).await.unwrap();
let name = deployment.name_unchecked();
// should not be ready, since there are no endpoints, but let's wait 3 seconds, make sure it doesn't do something we don't expect
let result = timeout(
Duration::from_secs(3),
await_condition(deployments.clone(), name.as_str(), is_deployment_ready()),
)
.await;
assert!(result.is_err());

// get the address to send data to
let pods = client.namespaced_api::<Pod>();
let list = pods
.list(&ListParams {
label_selector: Some(format!("role={name}")),
..Default::default()
})
.await
.unwrap();
assert_eq!(1, list.items.len());

let nodes: Api<Node> = Api::all(client.kubernetes.clone());
let name = list.items[0]
.spec
.as_ref()
.unwrap()
.node_name
.as_ref()
.unwrap();
let node = nodes.get(name.as_str()).await.unwrap();
let external_ip = node
.status
.unwrap()
.addresses
.unwrap()
.iter()
.find(|addr| addr.type_ == "ExternalIP")
.unwrap()
.address
.clone();

SocketAddr::new(external_ip.parse().unwrap(), host_port)
}

/// Create a Fleet, and pick on it's GameServers and add the token to it.
/// Returns the details of the GameServer that has been selected.
pub async fn create_tokenised_gameserver(
fleets: Api<Fleet>,
gameservers: Api<GameServer>,
token: &str,
) -> GameServer {
let pp = PostParams::default();

// create a fleet so we can ensure that a packet is going to the GameServer we expect, and not
// any other.
let fleet = fleet();
let fleet = fleets.create(&pp, &fleet).await.unwrap();
let name = fleet.name_unchecked();
timeout(
Duration::from_secs(30),
await_condition(fleets.clone(), name.as_str(), is_fleet_ready()),
)
.await
.expect("Fleet should be ready")
.unwrap();

let lp = ListParams {
label_selector: Some(format!("agones.dev/fleet={}", fleet.name_unchecked())),
..Default::default()
};
let list = gameservers.list(&lp).await.unwrap();

let mut gs = list.items[0].clone();
// add routing label to the GameServer
assert_eq!(3, token.as_bytes().len());
gs.metadata
.annotations
.get_or_insert(Default::default())
.insert(
TOKEN_KEY.into(),
base64::Engine::encode(&base64::engine::general_purpose::STANDARD, token),
);
gameservers
.replace(gs.name_unchecked().as_str(), &pp, &gs)
.await
.unwrap();
gs
}

/// Returns a test GameServer with the UDP test binary that is used for
/// Agones e2e tests.
pub fn game_server() -> GameServer {
Expand Down Expand Up @@ -371,7 +607,7 @@ pub fn quilkin_container(
..Default::default()
}),
initial_delay_seconds: Some(3),
period_seconds: Some(2),
period_seconds: Some(1),
..Default::default()
}),
..Default::default()
Expand Down Expand Up @@ -404,6 +640,30 @@ pub fn quilkin_config_map(config: &str) -> ConfigMap {
}
}

/// Return a ConfigMap that has a standard testing Token Router configuration
pub async fn create_token_router_config(config_maps: &Api<ConfigMap>) -> ConfigMap {
let pp = PostParams::default();

let config = r#"
version: v1alpha1
filters:
- name: quilkin.filters.capture.v1alpha1.Capture # Capture and remove the authentication token
config:
suffix:
size: 3
remove: true
- name: quilkin.filters.token_router.v1alpha1.TokenRouter
"#;
let mut config_map = quilkin_config_map(config);
config_map
.metadata
.labels
.get_or_insert(Default::default())
.insert("quilkin.dev/configmap".into(), "true".into());

config_maps.create(&pp, &config_map).await.unwrap()
}

/// Convenience function to return the address with the first port of GameServer
pub fn gameserver_address(gs: &GameServer) -> String {
let status = gs.status.as_ref().unwrap();
Expand Down
Loading

0 comments on commit 244a99f

Please sign in to comment.