Skip to content

Commit

Permalink
Update to shotover 0.6
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Dec 9, 2024
1 parent aeb84f2 commit 95df6b9
Show file tree
Hide file tree
Showing 13 changed files with 90 additions and 67 deletions.
78 changes: 49 additions & 29 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
resolver = "2"
members = [
"shotover-bin",
"redis-get-rewrite",
"valkey-get-rewrite",
"kafka-fetch-rewrite",
]

Expand All @@ -16,7 +16,7 @@ lto = "fat"
codegen-units = 1

[workspace.dependencies]
shotover = { version = "0.5.0" }
shotover = { version = "0.6.0" }
anyhow = "1.0.42"
serde = { version = "1.0.111", features = ["derive"] }
tracing = "0.1.15"
Expand Down
2 changes: 1 addition & 1 deletion kafka-fetch-rewrite/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ async-trait.workspace = true
tracing.workspace = true
typetag.workspace = true
bytes = "1.4.0"
kafka-protocol = "0.13.0"
kafka-protocol = "0.14.0"
19 changes: 11 additions & 8 deletions kafka-fetch-rewrite/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,12 @@ impl Transform for KafkaFetchRewrite {
for response in &mut fetch.responses {
for partition in &mut response.partitions {
if let Some(records_bytes) = &mut partition.records {
if let Ok(mut records) = RecordBatchDecoder::decode(
&mut records_bytes.clone(),
None::<fn(&mut bytes::Bytes, Compression) -> Result<Bytes>>,
) {
if let Ok(mut records) =
RecordBatchDecoder::decode::<
_,
fn(&mut bytes::Bytes, Compression) -> Result<Bytes>,
>(&mut records_bytes.clone())
{
for record in &mut records {
if record.value.is_some() {
record.value =
Expand All @@ -95,16 +97,17 @@ impl Transform for KafkaFetchRewrite {
}

let mut new_bytes = BytesMut::new();
RecordBatchEncoder::encode(
RecordBatchEncoder::encode::<
_,
_,
fn(&mut BytesMut, &mut BytesMut, Compression) -> Result<()>,
>(
&mut new_bytes,
records.iter(),
&RecordEncodeOptions {
version: 0, // TODO: get this from somewhere
compression: Compression::None,
},
None::<
fn(&mut BytesMut, &mut BytesMut, Compression) -> Result<()>,
>,
)?;
*records_bytes = new_bytes.freeze();
}
Expand Down
2 changes: 1 addition & 1 deletion readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ To get started just run `git clone https://github.com/shotover/shotover-custom-t

Use an example transform that matches the protocol you are working with as a base. e.g.

* `redis-get-rewrite` - for redis
* `valkey-get-rewrite` - for valkey
* `kafka-fetch-rewrite` - for kafka

Run `cargo build --release` in the root of the repo to get a shotover binary containing your custom transform at `target/release/shotover-bin`.
Expand Down
2 changes: 1 addition & 1 deletion shotover-bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ license = "Apache-2.0"

[dependencies]
shotover.workspace = true
redis-get-rewrite = { path = "../redis-get-rewrite" }
valkey-get-rewrite = { path = "../valkey-get-rewrite" }
kafka-fetch-rewrite = { path = "../kafka-fetch-rewrite" }

[dev-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion shotover-bin/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
shotover::import_transform!(redis_get_rewrite::RedisGetRewriteConfig);
shotover::import_transform!(valkey_get_rewrite::ValkeyGetRewriteConfig);
shotover::import_transform!(kafka_fetch_rewrite::KafkaFetchRewriteConfig);

fn main() {
Expand Down
10 changes: 5 additions & 5 deletions shotover-bin/tests/redis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,17 @@ pub async fn assert_bytes(cmd: &mut Cmd, connection: &mut MultiplexedConnection,
assert_eq!(cmd.query_async(connection).await, Ok(value.to_vec()));
}

pub async fn redis_connection(port: u16) -> redis::aio::MultiplexedConnection {
pub async fn valkey_connection(port: u16) -> redis::aio::MultiplexedConnection {
let client = redis::Client::open(format!("redis://127.0.0.1:{port}")).unwrap();
client.get_multiplexed_tokio_connection().await.unwrap()
}

#[tokio::test(flavor = "multi_thread")]
async fn test_redis_get_rewrite() {
async fn test_valkey_get_rewrite() {
// Setup shotover and the redis server it connects to
let _compose = docker_compose("redis-get-rewrite-config/docker-compose.yaml");
let shotover = shotover("redis-get-rewrite-config/topology.yaml").await;
let mut connection = redis_connection(6379).await;
let _compose = docker_compose("valkey-get-rewrite-config/docker-compose.yaml");
let shotover = shotover("valkey-get-rewrite-config/topology.yaml").await;
let mut connection = valkey_connection(6379).await;

// Verify functionality of transform
assert_ok(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
---
sources:
- Redis:
- Valkey:
name: "redis"
listen_addr: "127.0.0.1:6379"
chain:
- RedisGetRewrite:
- ValkeyGetRewrite:
result: "Rewritten value"
- RedisSinkSingle:
- ValkeySinkSingle:
remote_address: "127.0.0.1:1111"
connect_timeout_ms: 3000
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = "redis-get-rewrite"
name = "valkey-get-rewrite"
version = "0.0.1"
edition = "2021"
license = "Apache-2.0"
Expand Down
Loading

0 comments on commit 95df6b9

Please sign in to comment.