Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tee switch main chain #1361

Merged
merged 4 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,4 @@ typetag = "0.2.5"
aws-throwaway = "0.3.0"
tokio-bin-process = "0.4.0"
ordered-float = { version = "4.0.0", features = ["serde"] }
hyper = { version = "0.14.14", features = ["server"] }
10 changes: 10 additions & 0 deletions docs/src/transforms.md
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,12 @@ This is mainly used in conjunction with the `TuneableConsistencyScatter` transfo
This transform sends messages to both the defined sub chain and the remaining down-chain transforms.
The response from the down-chain transform is returned back up-chain but various behaviours can be defined by the `behaviour` field to handle the case when the responses from the sub chain and down-chain do not match.

Tee also exposes an optional HTTP API to switch which chain to use as the "result source", that is the chain to return responses from.

`GET` `/transform/tee/result-source` will return `regular-chain` or `tee-chain` indicating which chain is being used for the result source.

`PUT` `/transform/tee/result-source` with the body content as either `regular-chain` or `tee-chain` to set the result source.

```yaml
- Tee:
# Ignore responses returned by the sub chain
Expand All @@ -528,6 +534,10 @@ The response from the down-chain transform is returned back up-chain but various
# filter: Read
# - NullSink

# The port that the HTTP API will listen on.
# When this field is not provided the HTTP API will not be run.
# http_api_port: 1234
#
# Timeout for sending to the sub chain in microseconds
timeout_micros: 1000
# The number of message batches that the tee can hold onto in its buffer of messages to send.
Expand Down
1 change: 1 addition & 0 deletions shotover-proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ serde_json = "1.0.103"
time = { version = "0.3.25" }
inferno = { version = "0.11.15", default-features = false, features = ["multithreaded", "nameattr"] }
shell-quote = "0.3.0"
hyper.workspace = true
conorbros marked this conversation as resolved.
Show resolved Hide resolved

[features]
# Include WIP alpha transforms in the public API
Expand Down
46 changes: 46 additions & 0 deletions shotover-proxy/tests/test-configs/tee/switch_chain.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
---
sources:
- Redis:
name: "redis-1"
listen_addr: "127.0.0.1:6371"
connection_limit:
chain:
- Tee:
behavior: Ignore
buffer_size: 10000
switch_port: 1231
chain:
- DebugReturner:
Redis: "b"
- DebugReturner:
Redis: "a"
- Redis:
name: "redis-3"
listen_addr: "127.0.0.1:6372"
connection_limit:
chain:
- Tee:
behavior:
SubchainOnMismatch:
- NullSink
buffer_size: 10000
switch_port: 1232
chain:
- DebugReturner:
Redis: "b"
- DebugReturner:
Redis: "a"
- Redis:
name: "redis-3"
listen_addr: "127.0.0.1:6373"
connection_limit:
chain:
- Tee:
behavior: LogWarningOnMismatch
buffer_size: 10000
switch_port: 1233
chain:
- DebugReturner:
Redis: "b"
- DebugReturner:
Redis: "a"
97 changes: 97 additions & 0 deletions shotover-proxy/tests/transforms/tee.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::shotover_process;
use hyper::{body, Body, Client, Method, Request, Response};
use test_helpers::connection::redis_connection;
use test_helpers::docker_compose::docker_compose;
use test_helpers::shotover_process::{EventMatcher, Level};
Expand Down Expand Up @@ -193,3 +194,99 @@ async fn test_subchain_with_mismatch() {
assert_eq!("myvalue", result);
shotover.shutdown_and_then_consume_events(&[]).await;
}

async fn read_response_body(res: Response<Body>) -> Result<String, hyper::Error> {
let bytes = body::to_bytes(res.into_body()).await?;
Ok(String::from_utf8(bytes.to_vec()).expect("response was not valid utf-8"))
}

async fn hyper_request(uri: String, method: Method, body: Body) -> Response<Body> {
let client = Client::new();

let req = Request::builder()
.method(method)
.uri(uri)
.body(body)
.expect("request builder");

client.request(req).await.unwrap()
}

#[tokio::test(flavor = "multi_thread")]
async fn test_switch_main_chain() {
let shotover = shotover_process("tests/test-configs/tee/switch_chain.yaml")
.start()
.await;

for i in 1..=3 {
let redis_port = 6370 + i;
let switch_port = 1230 + i;

let mut connection = redis_connection::new_async("127.0.0.1", redis_port).await;

let result = redis::cmd("SET")
.arg("key")
.arg("myvalue")
.query_async::<_, String>(&mut connection)
.await
.unwrap();

assert_eq!("a", result);

let _ = hyper_request(
format!(
"http://localhost:{}/transform/tee/result-source",
switch_port
),
Method::PUT,
Body::from("tee-chain"),
)
.await;

let res = hyper_request(
format!(
"http://localhost:{}/transform/tee/result-source",
switch_port
),
Method::GET,
Body::empty(),
)
.await;
let body = read_response_body(res).await.unwrap();
assert_eq!("tee-chain", body);

let result = redis::cmd("SET")
.arg("key")
.arg("myvalue")
.query_async::<_, String>(&mut connection)
.await
.unwrap();

assert_eq!("b", result);

let _ = hyper_request(
format!(
"http://localhost:{}/transform/tee/result-source",
switch_port
),
Method::PUT,
Body::from("regular-chain"),
)
.await;

let result = redis::cmd("SET")
.arg("key")
.arg("myvalue")
.query_async::<_, String>(&mut connection)
.await
.unwrap();

assert_eq!("a", result);
}

shotover
.shutdown_and_then_consume_events(&[EventMatcher::new()
.with_level(Level::Warn)
.with_count(tokio_bin_process::event_matcher::Count::Times(3))])
.await;
}
2 changes: 1 addition & 1 deletion shotover/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ metrics-exporter-prometheus = "0.12.0"
tracing.workspace = true
tracing-subscriber.workspace = true
tracing-appender.workspace = true
hyper = { version = "0.14.14", features = ["server"] }
hyper.workspace = true
halfbrown = "0.2.1"

# Transform dependencies
Expand Down
Loading
Loading