Skip to content

Commit

Permalink
review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
conorbros committed Oct 24, 2023
1 parent ca6b950 commit c9bc194
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 77 deletions.
10 changes: 6 additions & 4 deletions docs/src/transforms.md
Original file line number Diff line number Diff line change
Expand Up @@ -508,9 +508,9 @@ The response from the down-chain transform is returned back up-chain but various

Tee also exposes an optional HTTP API to switch which chain to use as the "result source", that is the chain to return responses from.

`GET` `/result-source` will return `"regular-chain"` or `"tee-chain"` indicating what chain is being used for the result source.
`GET` `/transform/tee/result-source` will return `regular-chain` or `tee-chain` indicating which chain is being used for the result source.

`PUT` `/result-source` with the body content as either `regular-chain` or `tee-chain` to set the result source.
`PUT` `/transform/tee/result-source` with the body content as either `regular-chain` or `tee-chain` to set the result source.

```yaml
- Tee:
Expand All @@ -534,8 +534,10 @@ Tee also exposes an optional HTTP API to switch which chain to use as the "resul
# filter: Read
# - NullSink
# Optional port to for the HTTP chain switcher API to listen on
# switch_port: 1234
# The port that the HTTP API will listen on.
# When this field is not provided the HTTP API will not be run.
# http_api_port: 1234
#
# Timeout for sending to the sub chain in microseconds
timeout_micros: 1000
# The number of message batches that the tee can hold onto in its buffer of messages to send.
Expand Down
15 changes: 12 additions & 3 deletions shotover-proxy/tests/transforms/tee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,14 +234,20 @@ async fn test_switch_main_chain() {
assert_eq!("a", result);

let _ = hyper_request(
format!("http://localhost:{}/result-source", switch_port),
format!(
"http://localhost:{}/transform/tee/result-source",
switch_port
),
Method::PUT,
Body::from("tee-chain"),
)
.await;

let res = hyper_request(
format!("http://localhost:{}/result-source", switch_port),
format!(
"http://localhost:{}/transform/tee/result-source",
switch_port
),
Method::GET,
Body::empty(),
)
Expand All @@ -259,7 +265,10 @@ async fn test_switch_main_chain() {
assert_eq!("b", result);

let _ = hyper_request(
format!("http://localhost:{}/result-source", switch_port),
format!(
"http://localhost:{}/transform/tee/result-source",
switch_port
),
Method::PUT,
Body::from("regular-chain"),
)
Expand Down
133 changes: 63 additions & 70 deletions shotover/src/transforms/tee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub struct TeeBuilder {
pub behavior: ConsistencyBehaviorBuilder,
pub timeout_micros: Option<u64>,
dropped_messages: Counter,
switch_port: Option<u16>,
result_source: Arc<AtomicResultSource>,
}

pub enum ConsistencyBehaviorBuilder {
Expand All @@ -41,6 +41,14 @@ impl TeeBuilder {
timeout_micros: Option<u64>,
switch_port: Option<u16>,
) -> Self {
let result_source = Arc::new(AtomicResultSource::new(ResultSource::RegularChain));

if let Some(switch_port) = switch_port {
let chain_switch_listener =
ChainSwitchListener::new(SocketAddr::from(([127, 0, 0, 1], switch_port)));
tokio::spawn(chain_switch_listener.async_run(result_source.clone()));
}

let dropped_messages = register_counter!("tee_dropped_messages", "chain" => "Tee");

TeeBuilder {
Expand All @@ -49,21 +57,13 @@ impl TeeBuilder {
behavior,
timeout_micros,
dropped_messages,
switch_port,
result_source,
}
}
}

impl TransformBuilder for TeeBuilder {
fn build(&self) -> Transforms {
let result_source = Arc::new(AtomicResultSource::new(ResultSource::RegularChain));

if let Some(switch_port) = self.switch_port {
let chain_switch_listener =
ChainSwitchListener::new(SocketAddr::from(([127, 0, 0, 1], switch_port)));
tokio::spawn(chain_switch_listener.async_run(result_source.clone()));
}

Transforms::Tee(Tee {
tx: self.tx.build_buffered(self.buffer_size),
behavior: match &self.behavior {
Expand All @@ -79,7 +79,7 @@ impl TransformBuilder for TeeBuilder {
buffer_size: self.buffer_size,
timeout_micros: self.timeout_micros,
dropped_messages: self.dropped_messages.clone(),
result_source: result_source.clone(),
result_source: self.result_source.clone(),
})
}

Expand Down Expand Up @@ -199,59 +199,8 @@ impl TransformConfig for TeeConfig {
#[async_trait]
impl Transform for Tee {
async fn transform<'a>(&'a mut self, requests_wrapper: Wrapper<'a>) -> Result<Messages> {
self.transform_inner(requests_wrapper).await
}
}

impl Tee {
async fn return_response(
&mut self,
tee_result: Messages,
chain_result: Messages,
) -> Result<Messages> {
let result_source: ResultSource = self.result_source.load(Ordering::Relaxed);
match result_source {
ResultSource::RegularChain => Ok(chain_result),
ResultSource::TeeChain => Ok(tee_result),
}
}

async fn ignore_behaviour_inner<'a>(
&'a mut self,
requests_wrapper: Wrapper<'a>,
) -> Result<Messages> {
let result_source: ResultSource = self.result_source.load(Ordering::Relaxed);
match result_source {
ResultSource::RegularChain => {
let (tee_result, chain_result) = tokio::join!(
self.tx
.process_request_no_return(requests_wrapper.clone(), self.timeout_micros),
requests_wrapper.call_next_transform()
);
if let Err(e) = tee_result {
self.dropped_messages.increment(1);
trace!("Tee Ignored error {e}");
}
chain_result
}
ResultSource::TeeChain => {
let (tee_result, chain_result) = tokio::join!(
self.tx
.process_request(requests_wrapper.clone(), self.timeout_micros),
requests_wrapper.call_next_transform()
);
if let Err(e) = chain_result {
self.dropped_messages.increment(1);
trace!("Tee Ignored error {e}");
}
tee_result
}
}
}

async fn transform_inner<'a>(&'a mut self, requests_wrapper: Wrapper<'a>) -> Result<Messages> {
match &mut self.behavior {
ConsistencyBehavior::Ignore => self.ignore_behaviour_inner(requests_wrapper).await,
ConsistencyBehavior::Ignore => self.ignore_behaviour(requests_wrapper).await,
ConsistencyBehavior::FailOnMismatch => {
let (tee_result, chain_result) = tokio::join!(
self.tx
Expand Down Expand Up @@ -280,7 +229,7 @@ impl Tee {
}
}

self.return_response(tee_response, chain_response).await
Ok(self.return_response(tee_response, chain_response).await)
}
ConsistencyBehavior::SubchainOnMismatch(mismatch_chain) => {
let failed_message = requests_wrapper.clone();
Expand All @@ -297,7 +246,7 @@ impl Tee {
mismatch_chain.process_request(failed_message, None).await?;
}

self.return_response(tee_response, chain_response).await
Ok(self.return_response(tee_response, chain_response).await)
}
ConsistencyBehavior::LogWarningOnMismatch => {
let (tee_result, chain_result) = tokio::join!(
Expand All @@ -322,7 +271,47 @@ impl Tee {
.collect::<Vec<_>>()
);
}
self.return_response(tee_response, chain_response).await
Ok(self.return_response(tee_response, chain_response).await)
}
}
}
}

impl Tee {
async fn return_response(&mut self, tee_result: Messages, chain_result: Messages) -> Messages {
let result_source: ResultSource = self.result_source.load(Ordering::Relaxed);
match result_source {
ResultSource::RegularChain => chain_result,
ResultSource::TeeChain => tee_result,
}
}

async fn ignore_behaviour<'a>(&'a mut self, requests_wrapper: Wrapper<'a>) -> Result<Messages> {
let result_source: ResultSource = self.result_source.load(Ordering::Relaxed);
match result_source {
ResultSource::RegularChain => {
let (tee_result, chain_result) = tokio::join!(
self.tx
.process_request_no_return(requests_wrapper.clone(), self.timeout_micros),
requests_wrapper.call_next_transform()
);
if let Err(e) = tee_result {
self.dropped_messages.increment(1);
trace!("Tee Ignored error {e}");
}
chain_result
}
ResultSource::TeeChain => {
let (tee_result, chain_result) = tokio::join!(
self.tx
.process_request(requests_wrapper.clone(), self.timeout_micros),
requests_wrapper.call_next_transform()
);
if let Err(e) = chain_result {
self.dropped_messages.increment(1);
trace!("Tee Ignored error {e}");
}
tee_result
}
}
}
Expand Down Expand Up @@ -381,12 +370,12 @@ impl ChainSwitchListener {
let result_source = result_source.clone();
async move {
let response = match (req.method(), req.uri().path()) {
(&Method::GET, "/result-source") => {
(&Method::GET, "/transform/tee/result-source") => {
let result_source: ResultSource =
result_source.load(Ordering::Relaxed);
Self::rsp(StatusCode::OK, result_source.to_string())
}
(&Method::PUT, "/result-source") => {
(&Method::PUT, "/transform/tee/result-source") => {
match hyper::body::to_bytes(req.into_body()).await {
Ok(body) => {
match Self::set_result_source_chain(
Expand All @@ -399,7 +388,9 @@ impl ChainSwitchListener {
error!(?error, "setting result source failed");
Self::rsp(
StatusCode::BAD_REQUEST,
"setting result source failed",
format!(
"setting result source failed: {error}"
),
)
}
Ok(()) => Self::rsp(StatusCode::OK, Body::empty()),
Expand All @@ -414,7 +405,9 @@ impl ChainSwitchListener {
}
}
}
_ => Self::rsp(StatusCode::NOT_FOUND, "try /result-source"),
_ => {
Self::rsp(StatusCode::NOT_FOUND, "try /tranform/tee/result-source")
}
};
Ok::<_, Infallible>(response)
}
Expand Down

0 comments on commit c9bc194

Please sign in to comment.