Skip to content

Commit

Permalink
XDS Filter impl
Browse files Browse the repository at this point in the history
  • Loading branch information
iffyio committed Dec 7, 2020
1 parent 6c8ea4e commit e32fb6d
Show file tree
Hide file tree
Showing 13 changed files with 712 additions and 69 deletions.
7 changes: 7 additions & 0 deletions build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,17 @@
// we need for XDS GRPC communication.
fn main() -> Result<(), Box<dyn std::error::Error>> {
let proto_files = vec![
"proto/data-plane-api/envoy/config/accesslog/v3/accesslog.proto",
"proto/data-plane-api/envoy/config/cluster/v3/cluster.proto",
"proto/data-plane-api/envoy/config/listener/v3/listener.proto",
"proto/data-plane-api/envoy/config/route/v3/route.proto",
"proto/data-plane-api/envoy/service/cluster/v3/cds.proto",
"proto/data-plane-api/envoy/service/discovery/v3/ads.proto",
"proto/data-plane-api/envoy/service/discovery/v3/discovery.proto",
"proto/data-plane-api/envoy/type/metadata/v3/metadata.proto",
"proto/data-plane-api/envoy/type/tracing/v3/custom_tag.proto",
"proto/udpa/udpa/core/v1/resource_name.proto",
"proto/quilkin/extensions/filters/debug/v1alpha1/debug.proto",
]
.iter()
.map(|name| std::env::current_dir().unwrap().join(name))
Expand All @@ -33,6 +39,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
"proto/udpa",
"proto/googleapis",
"proto/protoc-gen-validate",
"proto/quilkin",
]
.iter()
.map(|i| std::env::current_dir().unwrap().join(i))
Expand Down
12 changes: 12 additions & 0 deletions proto/quilkin/extensions/filters/debug/v1alpha1/debug.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
syntax = "proto3";

package quilkin.extensions.filters.debug.v1alpha1;

import "google/protobuf/wrappers.proto";

option go_package = "example";

message Debug {
google.protobuf.StringValue id = 1;
}

104 changes: 92 additions & 12 deletions src/cluster/cluster_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,24 @@
// and we will need to acquire a read lock with every packet that is processed
// to be able to capture the current endpoint state and pass it to Filters.
use parking_lot::RwLock;
use slog::{debug, o, warn, Logger};
use slog::{debug, info, o, warn, Logger};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::{fmt, sync::Arc};
use tokio::sync::{mpsc, oneshot, watch};

use crate::config::{EmptyListError, EndPoint, Endpoints, ManagementServer, UpstreamEndpoints};
use crate::extensions::filter_manager::ListenerManagerArgs;
use crate::extensions::FilterRegistry;
use crate::xds::ads_client::{AdsClient, ClusterUpdate, ExecutionResult};

/// The max size of queue that provides updates from the XDS layer to the [`ClusterManager`].
const CLUSTER_UPDATE_QUEUE_SIZE: usize = 1000;

pub type SharedClusterManager = Arc<RwLock<ClusterManager>>;
pub(crate) type SharedClusterManager = Arc<RwLock<ClusterManager>>;

/// ClusterManager knows about all clusters and endpoints.
pub struct ClusterManager {
pub(crate) struct ClusterManager {
endpoints: Option<Endpoints>,
}

Expand Down Expand Up @@ -90,25 +92,33 @@ impl ClusterManager {
base_logger: Logger,
management_servers: Vec<ManagementServer>,
xds_node_id: String,
listener_manager_args: ListenerManagerArgs,
mut shutdown_rx: watch::Receiver<()>,
) -> Result<(SharedClusterManager, oneshot::Receiver<ExecutionResult>), InitializeError> {
let log = base_logger.new(o!("source" => "cluster::ClusterManager"));
let (cluster_updates_tx, mut cluster_updates_rx) =
mpsc::channel::<ClusterUpdate>(CLUSTER_UPDATE_QUEUE_SIZE);
let (execution_result_tx, execution_result_rx) = oneshot::channel::<ExecutionResult>();
let (execution_result_tx, mut execution_result_rx) = oneshot::channel::<ExecutionResult>();
Self::spawn_ads_client(
log.clone(),
xds_node_id,
management_servers,
cluster_updates_tx,
listener_manager_args,
execution_result_tx,
shutdown_rx.clone(),
);

// Initial cluster warming - wait to receive the first set of clusters
// from the server before we start receiving any traffic.
let cluster_update =
Self::receive_initial_cluster_update(&mut cluster_updates_rx, &mut shutdown_rx).await?;
info!(log, "Waiting to receive initial cluster updates...");
let (cluster_update, execution_result_rx) = Self::receive_initial_cluster_update(
&mut cluster_updates_rx,
execution_result_rx,
&mut shutdown_rx,
)
.await?;
info!(log, "Received initial cluster updates.");

let cluster_manager = Arc::new(RwLock::new(Self::new(Self::create_endpoints_from_update(
cluster_update,
Expand Down Expand Up @@ -161,6 +171,7 @@ impl ClusterManager {
node_id: String,
management_servers: Vec<ManagementServer>,
cluster_updates_tx: mpsc::Sender<ClusterUpdate>,
listener_manager_args: ListenerManagerArgs,
execution_result_tx: oneshot::Sender<ExecutionResult>,
shutdown_rx: watch::Receiver<()>,
) {
Expand All @@ -171,6 +182,7 @@ impl ClusterManager {
node_id,
management_servers,
cluster_updates_tx,
listener_manager_args,
shutdown_rx,
)
.await;
Expand All @@ -182,24 +194,34 @@ impl ClusterManager {
}

// Waits until it receives a cluster update from the given channel.
// This also takes in the execution result receiver - while we're waiting for
// an update, if the client exits prematurely, we return its execution error.
async fn receive_initial_cluster_update(
cluster_updates_rx: &mut mpsc::Receiver<ClusterUpdate>,
mut execution_result_rx: oneshot::Receiver<ExecutionResult>,
shutdown_rx: &mut watch::Receiver<()>,
) -> Result<ClusterUpdate, InitializeError> {
) -> Result<(ClusterUpdate, oneshot::Receiver<ExecutionResult>), InitializeError> {
tokio::select! {
update = cluster_updates_rx.recv() => {
match update {
Some(update) => {
Ok(update)
Ok((update, execution_result_rx))
}
None => {
// Sender has dropped - so we can't initialize properly.
Err(InitializeError::Message("failed to receive initial cluster - sender dropped the channel".into()))
// Sender has dropped (the client exited prematurely) - so we can't
// initialize properly.
// Check the client's execution result if exiting was due to some root cause
// error and return that error if so. Otherwise return a generic error.
if let Ok(Err(execution_error)) = execution_result_rx.try_recv() {
Err(InitializeError::Message(format!("failed to receive initial cluster update: {:?}", execution_error)))
} else {
Err(InitializeError::Message("failed to receive initial cluster update: sender dropped the channel".into()))
}
}
}
}
_ = shutdown_rx.recv() => {
Err(InitializeError::Message("failed to receive initial cluster - received shutdown signal".into()))
Err(InitializeError::Message("failed to receive initial cluster update: received shutdown signal".into()))
},
}
}
Expand All @@ -223,7 +245,7 @@ impl ClusterManager {
cluster_manager.write().update(update);
}
None => {
debug!(log, "Exiting cluster update receive loop because the sender dropped the channel.");
warn!(log, "Exiting cluster update receive loop because the sender dropped the channel.");
return;
}
}
Expand All @@ -237,3 +259,61 @@ impl ClusterManager {
});
}
}

#[cfg(test)]
mod tests {
use super::ClusterManager;
use crate::config::ManagementServer;
use crate::extensions::default_registry;
use crate::extensions::filter_manager::ListenerManagerArgs;
use crate::proxy::logger;
use crate::xds::ads_client::AdsClient;
use tokio::sync::watch;

#[tokio::test]
#[ignore]
async fn run() {
let (_shutdown_tx, mut shutdown_rx) = watch::channel::<()>(());
shutdown_rx.recv().await; // :P

use std::sync::Arc;
use tokio::sync::mpsc;

let (filter_chain_updates_tx, _) = mpsc::channel(10);
let listener_manager_args = ListenerManagerArgs::new(
Arc::new(default_registry(&logger())),
filter_chain_updates_tx,
);
let (cm, execution_result_rx) = ClusterManager::from_xds(
logger(),
vec![ManagementServer {
address: "http://localhost:18000".into(),
}],
"test-id".into(),
listener_manager_args,
shutdown_rx.clone(),
)
.await
.unwrap();

tokio::spawn(async move {
let res = execution_result_rx.await;
println!("XDS client terminated with: {:?}", res);
});
tokio::spawn(async move {
let mut prev = None;
loop {
tokio::time::delay_for(std::time::Duration::from_millis(1000)).await;
let curr = cm
.read()
.get_all_endpoints()
.map(|v| v.iter().map(|v| v.address).collect::<Vec<_>>());
if curr != prev {
println!("received {:?}", curr)
}
prev = curr;
}
});
let _ = shutdown_rx.recv().await;
}
}
144 changes: 144 additions & 0 deletions src/extensions/filter_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Copyright 2020 Google LLC All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// TODO: Allow unused variables since this module is WIP.
#![allow(unused)]

use crate::extensions::{FilterChain, FilterRegistry};

use std::sync::Arc;

use parking_lot::RwLock;
use slog::{debug, o, warn, Logger};
use tokio::sync::mpsc;
use tokio::sync::watch;

/// The max size of queue that provides updates from the XDS layer to the [`ClusterManager`].
const FILTER_CHAIN_UPDATE_QUEUE_SIZE: usize = 1000;

pub type SharedFilterManager = Arc<RwLock<FilterManager>>;

/// FilterManager creates and updates the filter chain.
pub struct FilterManager {
/// A lookup table for filter factories.
filter_registry: FilterRegistry,
/// The current filter chain.
filter_chain: Option<FilterChain>,
log: Logger,
}

/// ListenerManagerArgs contains arguments when invoking the LDS resource manager.
pub(crate) struct ListenerManagerArgs {
pub filter_registry: Arc<FilterRegistry>,
pub filter_chain_updates_tx: mpsc::Sender<FilterChain>,
}

impl ListenerManagerArgs {
pub fn new(
filter_registry: Arc<FilterRegistry>,
filter_chain_updates_tx: mpsc::Sender<FilterChain>,
) -> ListenerManagerArgs {
ListenerManagerArgs {
filter_registry,
filter_chain_updates_tx,
}
}
}

impl FilterManager {
fn update(&mut self, filter_chain: Option<FilterChain>) {
self.filter_chain = filter_chain;
}

/// Returns the current filter chain.
pub fn get_filter_chain(&self) -> &Option<FilterChain> {
&self.filter_chain
}

/// Returns a new instance backed only by the provided filter chain.
pub fn fixed(
base_logger: Logger,
filter_registry: FilterRegistry,
filter_chain: FilterChain,
) -> SharedFilterManager {
Arc::new(RwLock::new(FilterManager {
filter_registry,
filter_chain: Some(filter_chain),
log: Self::create_logger(base_logger),
}))
}

/// Returns a new instance backed by a stream of filter chain updates.
/// Updates from the provided stream will be reflected in the current filter chain.
pub async fn dynamic(
base_logger: Logger,
filter_registry: FilterRegistry,
filter_chain_updates_rx: mpsc::Receiver<Option<FilterChain>>,
shutdown_rx: watch::Receiver<()>,
) -> SharedFilterManager {
let log = Self::create_logger(base_logger);
let filter_manager = Arc::new(RwLock::new(FilterManager {
filter_registry,
filter_chain: None,
log: log.clone(),
}));

Self::spawn_updater(
log,
filter_manager.clone(),
filter_chain_updates_rx,
shutdown_rx,
);

filter_manager
}

/// Spawns a task in the background that listens for filter chain updates and
/// updates the filter manager's current filter in turn.
fn spawn_updater(
log: Logger,
filter_manager: SharedFilterManager,
mut filter_chain_updates_rx: mpsc::Receiver<Option<FilterChain>>,
mut shutdown_rx: watch::Receiver<()>,
) {
tokio::spawn(async move {
loop {
tokio::select! {
update = filter_chain_updates_rx.recv() => {
match update {
Some(filter_chain) => {
debug!(log, "Received a filter chain update.");
filter_manager.write().update(filter_chain);
}
None => {
warn!(log, "Exiting filter chain update receive loop because the sender dropped the channel.");
return;
}
}
}
_ = shutdown_rx.recv() => {
debug!(log, "Exiting filter chain update receive loop because a shutdown signal was received.");
return;
},
}
}
});
}

fn create_logger(base_logger: Logger) -> Logger {
base_logger.new(o!("source" => "FilterManager"))
}
}
Loading

0 comments on commit e32fb6d

Please sign in to comment.