Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to shotover 0.4 #9

Merged
merged 1 commit into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,548 changes: 882 additions & 666 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ lto = "fat"
codegen-units = 1

[workspace.dependencies]
shotover = { version = "0.2.0", features = ["alpha-transforms"] }
shotover = { version = "0.4.0" }
anyhow = "1.0.42"
serde = { version = "1.0.111", features = ["derive"] }
tracing = "0.1.15"
Expand Down
2 changes: 1 addition & 1 deletion kafka-fetch-rewrite/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ async-trait.workspace = true
tracing.workspace = true
typetag.workspace = true
bytes = "1.4.0"
kafka-protocol = "0.7.0"
kafka-protocol = "0.10.0"
33 changes: 26 additions & 7 deletions kafka-fetch-rewrite/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,39 @@ use kafka_protocol::records::{
};
use serde::{Deserialize, Serialize};
use shotover::frame::kafka::{KafkaFrame, ResponseBody};
use shotover::frame::Frame;
use shotover::frame::{Frame, MessageType};
use shotover::message::Messages;
use shotover::transforms::{Transform, TransformBuilder, TransformConfig, Transforms, Wrapper};
use shotover::transforms::{
DownChainProtocol, Transform, TransformBuilder, TransformConfig, TransformContextBuilder,
TransformContextConfig, UpChainProtocol, Wrapper,
};

#[derive(Deserialize, Serialize, Debug, Clone)]
#[serde(deny_unknown_fields)]
pub struct KafkaFetchRewriteConfig {
pub result: String,
}

const NAME: &str = "KafkaFetchRewrite";
#[typetag::serde(name = "KafkaFetchRewrite")]
#[async_trait(?Send)]
impl TransformConfig for KafkaFetchRewriteConfig {
async fn get_builder(&self, _chain_name: String) -> Result<Box<dyn TransformBuilder>> {
async fn get_builder(
&self,
_transform_context: TransformContextConfig,
) -> Result<Box<dyn TransformBuilder>> {
Ok(Box::new(KafkaFetchRewriteBuilder {
result: self.result.clone(),
}))
}

fn up_chain_protocol(&self) -> UpChainProtocol {
justinweng-instaclustr marked this conversation as resolved.
Show resolved Hide resolved
UpChainProtocol::MustBeOneOf(vec![MessageType::Kafka])
}

fn down_chain_protocol(&self) -> DownChainProtocol {
DownChainProtocol::SameAsUpChain
}
}

#[derive(Clone)]
Expand All @@ -32,14 +47,14 @@ pub struct KafkaFetchRewriteBuilder {
}

impl TransformBuilder for KafkaFetchRewriteBuilder {
fn build(&self) -> Transforms {
Transforms::Custom(Box::new(KafkaFetchRewrite {
fn build(&self, _transform_context: TransformContextBuilder) -> Box<dyn Transform> {
Box::new(KafkaFetchRewrite {
result: self.result.clone(),
}))
})
}

fn get_name(&self) -> &'static str {
"KafkaFetchRewrite"
NAME
}
}

Expand All @@ -49,6 +64,10 @@ pub struct KafkaFetchRewrite {

#[async_trait]
impl Transform for KafkaFetchRewrite {
fn get_name(&self) -> &'static str {
NAME
}

async fn transform<'a>(&'a mut self, message_wrapper: Wrapper<'a>) -> Result<Messages> {
let mut responses = message_wrapper.call_next_transform().await?;

Expand Down
33 changes: 26 additions & 7 deletions redis-get-rewrite/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,39 @@
use anyhow::Result;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use shotover::frame::{Frame, RedisFrame};
use shotover::frame::{Frame, MessageType, RedisFrame};
use shotover::message::Messages;
use shotover::transforms::{Transform, TransformBuilder, TransformConfig, Transforms, Wrapper};
use shotover::transforms::{
DownChainProtocol, Transform, TransformBuilder, TransformConfig, TransformContextBuilder,
TransformContextConfig, UpChainProtocol, Wrapper,
};

#[derive(Deserialize, Serialize, Debug, Clone)]
#[serde(deny_unknown_fields)]
pub struct RedisGetRewriteConfig {
pub result: String,
}

const NAME: &str = "RedisGetRewrite";
#[typetag::serde(name = "RedisGetRewrite")]
#[async_trait(?Send)]
impl TransformConfig for RedisGetRewriteConfig {
async fn get_builder(&self, _chain_name: String) -> Result<Box<dyn TransformBuilder>> {
async fn get_builder(
&self,
_transform_context: TransformContextConfig,
) -> Result<Box<dyn TransformBuilder>> {
Ok(Box::new(RedisGetRewriteBuilder {
result: self.result.clone(),
}))
}

fn up_chain_protocol(&self) -> UpChainProtocol {
UpChainProtocol::MustBeOneOf(vec![MessageType::Redis])
}

fn down_chain_protocol(&self) -> DownChainProtocol {
DownChainProtocol::SameAsUpChain
}
}

#[derive(Clone)]
Expand All @@ -27,14 +42,14 @@ pub struct RedisGetRewriteBuilder {
}

impl TransformBuilder for RedisGetRewriteBuilder {
fn build(&self) -> Transforms {
Transforms::Custom(Box::new(RedisGetRewrite {
fn build(&self, _transform_context: TransformContextBuilder) -> Box<dyn Transform> {
Box::new(RedisGetRewrite {
result: self.result.clone(),
}))
})
}

fn get_name(&self) -> &'static str {
"RedisGetRewrite"
NAME
}
}

Expand All @@ -44,6 +59,10 @@ pub struct RedisGetRewrite {

#[async_trait]
impl Transform for RedisGetRewrite {
fn get_name(&self) -> &'static str {
NAME
}

async fn transform<'a>(&'a mut self, mut message_wrapper: Wrapper<'a>) -> Result<Messages> {
let mut get_indices = vec![];
for (i, message) in message_wrapper.requests.iter_mut().enumerate() {
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[toolchain]
channel = "1.74"
channel = "1.79"
components = [ "rustfmt", "clippy" ]
8 changes: 4 additions & 4 deletions shotover-bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ redis-get-rewrite = { path = "../redis-get-rewrite" }
kafka-fetch-rewrite = { path = "../kafka-fetch-rewrite" }

[dev-dependencies]
tokio-bin-process = "0.4.0"
docker-compose-runner = "0.2.0"
tokio-bin-process = "0.5.0"
docker-compose-runner = "0.3.0"
tokio = { version = "1.28.0", features = ["full", "macros"] }
redis = { version = "0.23.0", features = ["tokio-comp", "cluster"] }
rdkafka = "0.34.0"
redis = { version = "0.24.0", features = ["tokio-comp", "cluster"] }
rdkafka = "0.36.0"
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
version: "3"
services:
kafka:
image: 'bitnami/kafka:3.4.0-debian-11-r22'
Expand Down
5 changes: 2 additions & 3 deletions shotover-bin/redis-get-rewrite-config/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
version: "3.3"
services:
redis-one:
image: library/redis:5.0.9
ports:
- "1111:6379"
volumes:
- ./redis.conf:/usr/local/etc/redis/redis.conf
command: ["redis-server", "/usr/local/etc/redis/redis.conf"]
- ./redis.conf:/usr/local/etc/redis/redis.conf
command: [ "redis-server", "/usr/local/etc/redis/redis.conf" ]
30 changes: 14 additions & 16 deletions shotover-bin/tests/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,22 @@ use tokio_bin_process::event_matcher::EventMatcher;
use tokio_bin_process::{bin_path, BinProcess};

fn docker_compose(yaml_path: &str) -> DockerCompose {
DockerCompose::new(
&[
Image {
name: "library/redis:5.0.9",
log_regex_to_wait_for: r"Ready to accept connections",
timeout: 120,
},
Image {
name: "bitnami/kafka:3.4.0-debian-11-r22",
log_regex_to_wait_for: r"Kafka Server started",
timeout: 120,
},
],
|_| {},
yaml_path,
)
DockerCompose::new(&IMAGE_WAITERS, |_| {}, yaml_path)
}

pub static IMAGE_WAITERS: [Image; 2] = [
Image {
name: "library/redis:5.0.9",
log_regex_to_wait_for: r"Ready to accept connections",
timeout: Duration::from_secs(120),
},
Image {
name: "bitnami/kafka:3.4.0-debian-11-r22",
log_regex_to_wait_for: r"Kafka Server started",
timeout: Duration::from_secs(120),
},
];

async fn shotover(topology_path: &str) -> BinProcess {
let mut shotover = BinProcess::start_binary(
bin_path!("shotover-bin"),
Expand Down
Loading