diff --git a/docs/extensions/filters/append_token_router.md b/docs/extensions/filters/append_token_router.md new file mode 100644 index 0000000000..5a608e3f3b --- /dev/null +++ b/docs/extensions/filters/append_token_router.md @@ -0,0 +1,78 @@ +# Append Token Router + +Append Token Router is a simple Client/Server filter pair to provide routing from a Game Client to a Game Server +through a given set of Proxies via a fixed length `connection_id` value that is sent along as part of the UDP packet. + +To implement this, the Client proxy appends the `client.connection_id` data value to the end of each packet that is sent +from the Game Client to a Server Proxy. + +On the Server proxy side, on receiving the packet, strips the packet off the fixed length `connection_id` off the end +of the packet. Then that `connection_id` value is compared to the `server.endpoints.connection_ids`. Any values that +is matches, the UDP packet is sent on to the that Endpoint's destination. + +#### Filter name +```text +quilkin.extensions.filters.append_token_router.v1alpha1.AppendTokenRouter +``` + +### Configuration Examples: Client + +```rust +# let yaml = " +local: + port: 7000 +filters: + - name: quilkin.extensions.filters.append_token_router.v1alpha1.AppendTokenRouter +client: + addresses: + - 127.0.0.1:7001 + connection_id: MXg3aWp5Ng== # (string value: nkuy70x) +# "; + +# let config = quilkin::config::Config::from_reader(yaml.as_bytes()).unwrap(); +# assert_eq!(config.validate().unwrap(), ()); +# assert_eq!(config.filters.len(), 1); +# // TODO: make it possible to easily validate filter's config from here. + ``` + +### Configuration Examples: Server + +```rust +# let yaml = " +local: + port: 7001 +filters: + - name: quilkin.extensions.filters.append_token_router.v1alpha1.AppendTokenRouter + config: + connection_id_bytes: 7 +server: + endpoints: # array of potential endpoints to send on traffic to + - name: Game Server No. 1 + address: 127.0.0.1:26000 + connection_ids: + - MXg3aWp5Ng== # the connection byte array to route to, encoded as base64 (string value: 1x7ijy6) + - OGdqM3YyaQ== # (string value: 8gj3v2i) + - name: Game Server No. 2 + address: 127.0.0.1:26001 + connection_ids: + - bmt1eTcweA== # (string value: nkuy70x) +# "; +# let config = quilkin::config::Config::from_reader(yaml.as_bytes()).unwrap(); +# assert_eq!(config.validate().unwrap(), ()); +# assert_eq!(config.filters.len(), 1); +# // TODO: make it possible to easily validate filter's config from here. +``` + +### Configuration Options: Server + +```yaml +properties: + connection_id_bytes: + type: integer + description: | + The number of bytes the `connection_id` takes up at the end of the packets. +``` + +### Metrics + +Implemented soon! \ No newline at end of file diff --git a/src/config.rs b/src/config.rs index c337c90dc0..07c9abd517 100644 --- a/src/config.rs +++ b/src/config.rs @@ -87,6 +87,18 @@ impl From<&str> for ConnectionId { } } +impl From> for ConnectionId { + fn from(contents: Vec) -> Self { + ConnectionId(contents) + } +} + +impl AsRef> for ConnectionId { + fn as_ref(&self) -> &Vec { + &self.0 + } +} + /// ConnectionConfig is the configuration for either a Client or Server proxy #[derive(Debug, Deserialize, Serialize)] pub enum ConnectionConfig { diff --git a/src/extensions/filter_registry.rs b/src/extensions/filter_registry.rs index 45fb4a0ced..063a4725c1 100644 --- a/src/extensions/filter_registry.rs +++ b/src/extensions/filter_registry.rs @@ -22,6 +22,7 @@ use prometheus::{Error as MetricsError, Registry}; use serde::export::Formatter; use crate::config::{ConnectionConfig, EndPoint}; +use std::ops::Deref; /// Filter is a trait for routing and manipulating packets. pub trait Filter: Send + Sync { @@ -50,12 +51,36 @@ pub trait Filter: Send + Sync { ) -> Option>; } +impl Filter for Box { + fn on_downstream_receive( + &self, + endpoints: &[EndPoint], + from: SocketAddr, + contents: Vec, + ) -> Option<(Vec, Vec)> { + self.deref() + .on_downstream_receive(endpoints, from, contents) + } + + fn on_upstream_receive( + &self, + endpoint: &EndPoint, + from: SocketAddr, + to: SocketAddr, + contents: Vec, + ) -> Option> { + self.deref() + .on_upstream_receive(endpoint, from, to, contents) + } +} + #[derive(Debug, PartialEq)] /// Error is an error when attempting to create a Filter from_config() from a FilterFactory pub enum Error { NotFound(String), FieldInvalid { field: String, reason: String }, DeserializeFailed(String), + FieldNotFound(String), InitializeMetricsFailed(String), } @@ -67,6 +92,9 @@ impl fmt::Display for Error { write!(f, "field {} is invalid: {}", field, reason) } Error::DeserializeFailed(reason) => write!(f, "Deserialization failed: {}", reason), + Error::FieldNotFound(field) => { + write!(f, "field {} is required, but wasn't found", field) + } Error::InitializeMetricsFailed(reason) => { write!(f, "failed to initialize metrics: {}", reason) } diff --git a/src/extensions/filters/append_token_router.rs b/src/extensions/filters/append_token_router.rs new file mode 100644 index 0000000000..d49d09f37c --- /dev/null +++ b/src/extensions/filters/append_token_router.rs @@ -0,0 +1,387 @@ +/* + * Copyright 2020 Google LLC All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::net::SocketAddr; + +use slog::{debug, o, Logger}; + +use crate::config::{ConnectionConfig, ConnectionId, EndPoint}; +use crate::extensions::filter_registry::Error::{FieldInvalid, FieldNotFound}; +use crate::extensions::{CreateFilterArgs, Error, Filter, FilterFactory}; + +pub struct AppendTokenRouterFactory { + log: Logger, +} + +impl AppendTokenRouterFactory { + pub fn new(base: &Logger) -> Self { + AppendTokenRouterFactory { log: base.clone() } + } +} + +impl FilterFactory for AppendTokenRouterFactory { + fn name(&self) -> String { + return String::from( + "quilkin.extensions.filters.append_token_router.v1alpha1.AppendTokenRouter", + ); + } + + fn create_filter(&self, args: CreateFilterArgs) -> Result, Error> { + let filter: Box = match args.connection { + ConnectionConfig::Client { connection_id, .. } => { + Box::new(Client::new(&self.log, connection_id.clone())) + } + ConnectionConfig::Server { .. } => { + let result = args + .config + .ok_or(FieldNotFound("config".into()))? + .get("connection_id_bytes") + .ok_or(FieldNotFound("config.connection_id_bytes".into()))?; + let cil = result.as_u64().ok_or(FieldInvalid { + field: "config.connection_id_bytes".into(), + reason: "should be an unsigned integer".into(), + })?; + + Box::new(Server::new(&self.log, cil as usize)) + } + }; + Ok(Box::new(AppendTokenRouter::new(filter))) + } +} + +/// +/// Append Token Router is a Client/Server filter pair that appends the Client +/// client.connection_id to each packet, and then when received on the Server +/// side, it is stripped off the packet, and compared to endpoints and on match +/// the packet is sent on to that endpoint. +/// +pub struct AppendTokenRouter { + filter: Box, +} + +impl AppendTokenRouter { + pub fn new(filter: Box) -> Self { + AppendTokenRouter { + // either the Server or Client filter + filter, + } + } +} + +impl Filter for AppendTokenRouter { + fn on_downstream_receive( + &self, + endpoints: &[EndPoint], + from: SocketAddr, + contents: Vec, + ) -> Option<(Vec, Vec)> { + self.filter.on_downstream_receive(endpoints, from, contents) + } + + fn on_upstream_receive( + &self, + endpoint: &EndPoint, + from: SocketAddr, + to: SocketAddr, + contents: Vec, + ) -> Option> { + self.filter + .on_upstream_receive(endpoint, from, to, contents) + } +} + +struct Client { + log: Logger, + connection_id: ConnectionId, +} + +impl Client { + pub fn new(base: &Logger, connection_id: ConnectionId) -> Self { + Client { + log: base.new(o!("source" => "extensions::AppendTokenRouter::Client")), + connection_id, + } + } +} + +impl Filter for Client { + fn on_downstream_receive( + &self, + endpoints: &[EndPoint], + _: SocketAddr, + contents: Vec, + ) -> Option<(Vec, Vec)> { + let mut contents = contents; + let mut token = self.connection_id.as_ref().clone(); + contents.append(&mut token); + + debug!(self.log, "on_downstream_receive"; "contents" => String::from_utf8(contents.clone()).unwrap_or(format!("{:?}", contents))); + Some((endpoints.to_vec(), contents)) + } + + fn on_upstream_receive( + &self, + _: &EndPoint, + _: SocketAddr, + _: SocketAddr, + contents: Vec, + ) -> Option> { + Some(contents) + } +} + +struct Server { + log: Logger, + // number of bytes the connection id is + connection_id_length: usize, +} + +impl Server { + pub fn new(base: &Logger, connection_id_length: usize) -> Self { + Server { + log: base.new(o!("source" => "extensions::AppendTokenRouter::Server")), + connection_id_length, + } + } +} + +impl Filter for Server { + fn on_downstream_receive( + &self, + endpoints: &[EndPoint], + _: SocketAddr, + contents: Vec, + ) -> Option<(Vec, Vec)> { + let mut contents = contents; + // splits the connection_id off the content and returns the value + let connection_id = + ConnectionId::from(contents.split_off(contents.len() - self.connection_id_length)); + + let result: Vec = endpoints + .iter() + .filter(|endpoint| { + endpoint + .connection_ids + .iter() + .any(|id| *id == connection_id) + }) + .map(|e| e.clone()) + .collect(); + + debug!(self.log, "on_downstream_receive"; + "endpoints" => result.clone().into_iter().map(|e| e.name.clone()).collect::>().as_slice().join(", "), + "contents" => String::from_utf8(contents.clone()).unwrap_or(format!("{:?}", contents)), + "connection_id" => String::from_utf8(connection_id.as_ref().clone()).unwrap_or(format!("{:?}", connection_id))); + + Some((result, contents.to_vec())) + } + + fn on_upstream_receive( + &self, + _: &EndPoint, + _: SocketAddr, + _: SocketAddr, + contents: Vec, + ) -> Option> { + Some(contents) + } +} + +#[cfg(test)] +mod tests { + use serde_yaml::{Mapping, Number, Value}; + + use crate::test_utils::{assert_filter_on_upstream_receive_no_change, logger}; + + use super::*; + + #[test] + fn factory_server() { + let log = logger(); + let factory = AppendTokenRouterFactory::new(&log); + let connection = ConnectionConfig::Server { endpoints: vec![] }; + let yaml = "connection_id_bytes: 3"; + let value: Value = serde_yaml::from_str(yaml).unwrap(); + let config = Some(&value); + + let filter = factory + .create_filter(CreateFilterArgs::new(&connection, config)) + .unwrap(); + + assert_filter_on_upstream_receive_no_change(&filter); + assert_server_on_downstream_receive(&filter); + } + + #[test] + fn factory_client() { + let log = logger(); + let factory = AppendTokenRouterFactory::new(&log); + let connection = ConnectionConfig::Client { + addresses: vec![], + connection_id: "abc".into(), + lb_policy: None, + }; + + let filter = factory + .create_filter(CreateFilterArgs::new(&connection, None)) + .unwrap(); + assert_filter_on_upstream_receive_no_change(&filter); + assert_client_on_downstream_receive(&filter); + } + + #[test] + fn client_on_downstream_receive() { + let log = logger(); + let connection_id: ConnectionId = "abc".into(); + let client = Client::new(&log, connection_id); + assert_client_on_downstream_receive(&client); + } + + #[test] + fn client_on_upstream_receive() { + let log = logger(); + let connection_id: ConnectionId = "abc".into(); + let client = Client::new(&log, connection_id); + assert_filter_on_upstream_receive_no_change(&client); + } + + #[test] + fn server_on_downstream_receive() { + let log = logger(); + let server = Server::new(&log, 3); + assert_server_on_downstream_receive(&server) + } + + #[test] + fn server_on_upstream_receive() { + let log = logger(); + let server = Server::new(&log, 3); + assert_filter_on_upstream_receive_no_change(&server); + } + + #[test] + fn create_from_config_server_empty_config() { + let log = logger(); + let map = Mapping::new(); + let connection = ConnectionConfig::Server { endpoints: vec![] }; + let factory = AppendTokenRouterFactory::new(&log); + + match factory.create_filter(CreateFilterArgs::new( + &connection, + Some(&Value::Mapping(map)), + )) { + Ok(_) => assert!(false, "should fail validation"), + Err(err) => assert_eq!(FieldNotFound("config.connection_id_bytes".into()), err), + }; + } + + #[test] + fn create_from_config_valid_connection_id() { + let log = logger(); + let mut map = Mapping::new(); + let connection = ConnectionConfig::Server { endpoints: vec![] }; + let factory = AppendTokenRouterFactory::new(&log); + map.insert("connection_id_bytes".into(), Value::Number(Number::from(7))); + + assert!( + factory + .create_filter(CreateFilterArgs::new( + &connection, + Some(&Value::Mapping(map)) + )) + .is_ok(), + "should be a valid config" + ); + } + + #[test] + fn create_from_config_invalid_connection_id() { + let log = logger(); + let mut map = Mapping::new(); + let connection = ConnectionConfig::Server { endpoints: vec![] }; + let factory = AppendTokenRouterFactory::new(&log); + map.insert("connection_id_bytes".into(), Value::String("stuff".into())); + + match factory.create_filter(CreateFilterArgs::new( + &connection, + Some(&Value::Mapping(map)), + )) { + Ok(_) => assert!(false, "should fail validation"), + Err(err) => assert_eq!( + FieldInvalid { + field: "config.connection_id_bytes".into(), + reason: "should be an unsigned integer".into(), + }, + err + ), + }; + } + + /// assert that on_downstream_receive does the right thing + /// for a server configuration. + /// Assumes that the connection token is "abc" + fn assert_client_on_downstream_receive(filter: &F) + where + F: Filter, + { + let contents = "hello".to_string().into_bytes(); + let endpoints = vec![EndPoint { + name: "e1".to_string(), + address: "127.0.0.1:81".parse().unwrap(), + connection_ids: vec![], + }]; + + match filter.on_downstream_receive( + endpoints.as_slice(), + "127.0.0.1:80".parse().unwrap(), + contents, + ) { + None => assert!(false, "should get a result"), + Some((result_endpoints, result_content)) => { + assert_eq!(endpoints, result_endpoints); + assert_eq!("helloabc".to_string().into_bytes(), result_content); + } + } + } + + /// assert that on_downstream_receive does the right thing + /// for a server configuration. + /// Assumes that the connection string length is 3 + fn assert_server_on_downstream_receive(filter: &F) + where + F: Filter, + { + let e1 = EndPoint::new( + "e1".into(), + "127.0.0.1:80".parse().unwrap(), + vec!["abc".into()], + ); + let e2 = EndPoint::new("e2".into(), "127.0.0.1:90".parse().unwrap(), vec![]); + + match filter.on_downstream_receive( + vec![e1.clone(), e2].as_slice(), + "127.0.0.1:70".parse().unwrap(), + "helloabc".as_bytes().to_vec(), + ) { + None => assert!(false, "should be a result"), + Some((endpoints, content)) => { + assert_eq!(1, endpoints.len()); + assert_eq!(endpoints[0], e1); + assert_eq!("hello".as_bytes().to_vec(), content); + } + } + } +} diff --git a/src/extensions/filters/mod.rs b/src/extensions/filters/mod.rs index a5efbea4ee..779cfc0e6b 100644 --- a/src/extensions/filters/mod.rs +++ b/src/extensions/filters/mod.rs @@ -14,8 +14,10 @@ * limitations under the License. */ +pub use append_token_router::AppendTokenRouterFactory; pub use debug::DebugFilterFactory; pub use local_rate_limit::RateLimitFilterFactory; +mod append_token_router; mod debug; mod local_rate_limit; diff --git a/src/extensions/mod.rs b/src/extensions/mod.rs index 0d0b5b0a7b..e01366c5fb 100644 --- a/src/extensions/mod.rs +++ b/src/extensions/mod.rs @@ -30,5 +30,6 @@ pub fn default_registry(base: &Logger) -> FilterRegistry { let mut fr = FilterRegistry::default(); fr.insert(filters::DebugFilterFactory::new(base)); fr.insert(filters::RateLimitFilterFactory::default()); + fr.insert(filters::AppendTokenRouterFactory::new(base)); fr } diff --git a/src/lib.rs b/src/lib.rs index a4e818abf3..07b3b19acb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -37,4 +37,5 @@ pub mod external_doc_tests { #![doc(include = "../docs/extensions/filters/filters.md")] #![doc(include = "../docs/extensions/filters/local_rate_limit.md")] #![doc(include = "../docs/extensions/filters/debug.md")] + #![doc(include = "../docs/extensions/filters/append_token_router.md")] }