Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to shotover 0.6 #11

Merged
merged 1 commit into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
370 changes: 196 additions & 174 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
resolver = "2"
members = [
"shotover-bin",
"redis-get-rewrite",
"valkey-get-rewrite",
"kafka-fetch-rewrite",
]

Expand All @@ -16,7 +16,7 @@ lto = "fat"
codegen-units = 1

[workspace.dependencies]
shotover = { version = "0.5.0" }
shotover = { version = "0.6.0" }
anyhow = "1.0.42"
serde = { version = "1.0.111", features = ["derive"] }
tracing = "0.1.15"
Expand Down
2 changes: 1 addition & 1 deletion kafka-fetch-rewrite/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ async-trait.workspace = true
tracing.workspace = true
typetag.workspace = true
bytes = "1.4.0"
kafka-protocol = "0.13.0"
kafka-protocol = "0.14.0"
19 changes: 11 additions & 8 deletions kafka-fetch-rewrite/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,12 @@ impl Transform for KafkaFetchRewrite {
for response in &mut fetch.responses {
for partition in &mut response.partitions {
if let Some(records_bytes) = &mut partition.records {
if let Ok(mut records) = RecordBatchDecoder::decode(
&mut records_bytes.clone(),
None::<fn(&mut bytes::Bytes, Compression) -> Result<Bytes>>,
) {
if let Ok(mut records) =
RecordBatchDecoder::decode::<
_,
fn(&mut bytes::Bytes, Compression) -> Result<Bytes>,
>(&mut records_bytes.clone())
{
for record in &mut records {
if record.value.is_some() {
record.value =
Expand All @@ -95,16 +97,17 @@ impl Transform for KafkaFetchRewrite {
}

let mut new_bytes = BytesMut::new();
RecordBatchEncoder::encode(
RecordBatchEncoder::encode::<
_,
_,
fn(&mut BytesMut, &mut BytesMut, Compression) -> Result<()>,
>(
&mut new_bytes,
records.iter(),
&RecordEncodeOptions {
version: 0, // TODO: get this from somewhere
compression: Compression::None,
},
None::<
fn(&mut BytesMut, &mut BytesMut, Compression) -> Result<()>,
>,
)?;
*records_bytes = new_bytes.freeze();
}
Expand Down
2 changes: 1 addition & 1 deletion readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ To get started just run `git clone https://github.com/shotover/shotover-custom-t

Use an example transform that matches the protocol you are working with as a base. e.g.

* `redis-get-rewrite` - for redis
* `valkey-get-rewrite` - for valkey
* `kafka-fetch-rewrite` - for kafka

Run `cargo build --release` in the root of the repo to get a shotover binary containing your custom transform at `target/release/shotover-bin`.
Expand Down
4 changes: 2 additions & 2 deletions shotover-bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ license = "Apache-2.0"

[dependencies]
shotover.workspace = true
redis-get-rewrite = { path = "../redis-get-rewrite" }
valkey-get-rewrite = { path = "../valkey-get-rewrite" }
kafka-fetch-rewrite = { path = "../kafka-fetch-rewrite" }

[dev-dependencies]
tokio-bin-process = "0.6.0"
docker-compose-runner = "0.3.0"
tokio = { version = "1.28.0", features = ["full", "macros"] }
redis = { version = "0.27.0", features = ["tokio-comp", "cluster"] }
rdkafka = "0.36.0"
rdkafka = "0.37.0"
2 changes: 1 addition & 1 deletion shotover-bin/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
shotover::import_transform!(redis_get_rewrite::RedisGetRewriteConfig);
shotover::import_transform!(valkey_get_rewrite::ValkeyGetRewriteConfig);
shotover::import_transform!(kafka_fetch_rewrite::KafkaFetchRewriteConfig);

fn main() {
Expand Down
2 changes: 1 addition & 1 deletion shotover-bin/tests/test.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
mod kafka;
mod redis;
mod valkey;

use docker_compose_runner::{DockerCompose, Image};
use std::time::Duration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,17 @@ pub async fn assert_bytes(cmd: &mut Cmd, connection: &mut MultiplexedConnection,
assert_eq!(cmd.query_async(connection).await, Ok(value.to_vec()));
}

pub async fn redis_connection(port: u16) -> redis::aio::MultiplexedConnection {
pub async fn valkey_connection(port: u16) -> redis::aio::MultiplexedConnection {
let client = redis::Client::open(format!("redis://127.0.0.1:{port}")).unwrap();
client.get_multiplexed_tokio_connection().await.unwrap()
}

#[tokio::test(flavor = "multi_thread")]
async fn test_redis_get_rewrite() {
async fn test_valkey_get_rewrite() {
// Setup shotover and the redis server it connects to
let _compose = docker_compose("redis-get-rewrite-config/docker-compose.yaml");
let shotover = shotover("redis-get-rewrite-config/topology.yaml").await;
let mut connection = redis_connection(6379).await;
let _compose = docker_compose("valkey-get-rewrite-config/docker-compose.yaml");
let shotover = shotover("valkey-get-rewrite-config/topology.yaml").await;
let mut connection = valkey_connection(6379).await;

// Verify functionality of transform
assert_ok(
Expand Down
justinweng-instaclustr marked this conversation as resolved.
Show resolved Hide resolved
File renamed without changes.
ronycsdu marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
---
sources:
- Redis:
name: "redis"
- Valkey:
name: "valkey"
listen_addr: "127.0.0.1:6379"
chain:
- RedisGetRewrite:
- ValkeyGetRewrite:
result: "Rewritten value"
- RedisSinkSingle:
- ValkeySinkSingle:
remote_address: "127.0.0.1:1111"
connect_timeout_ms: 3000
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = "redis-get-rewrite"
name = "valkey-get-rewrite"
version = "0.0.1"
edition = "2021"
license = "Apache-2.0"
Expand Down
30 changes: 15 additions & 15 deletions redis-get-rewrite/src/lib.rs → valkey-get-rewrite/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::Result;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use shotover::frame::{Frame, MessageType, RedisFrame};
use shotover::frame::{Frame, MessageType, ValkeyFrame};
use shotover::message::Messages;
use shotover::transforms::{
ChainState, DownChainProtocol, Transform, TransformBuilder, TransformConfig,
Expand All @@ -10,25 +10,25 @@ use shotover::transforms::{

#[derive(Deserialize, Serialize, Debug, Clone)]
#[serde(deny_unknown_fields)]
pub struct RedisGetRewriteConfig {
pub struct ValkeyGetRewriteConfig {
pub result: String,
}

const NAME: &str = "RedisGetRewrite";
#[typetag::serde(name = "RedisGetRewrite")]
const NAME: &str = "ValkeyGetRewrite";
#[typetag::serde(name = "ValkeyGetRewrite")]
#[async_trait(?Send)]
impl TransformConfig for RedisGetRewriteConfig {
impl TransformConfig for ValkeyGetRewriteConfig {
async fn get_builder(
&self,
_transform_context: TransformContextConfig,
) -> Result<Box<dyn TransformBuilder>> {
Ok(Box::new(RedisGetRewriteBuilder {
Ok(Box::new(ValkeyGetRewriteBuilder {
result: self.result.clone(),
}))
}

fn up_chain_protocol(&self) -> UpChainProtocol {
UpChainProtocol::MustBeOneOf(vec![MessageType::Redis])
UpChainProtocol::MustBeOneOf(vec![MessageType::Valkey])
}

fn down_chain_protocol(&self) -> DownChainProtocol {
Expand All @@ -37,13 +37,13 @@ impl TransformConfig for RedisGetRewriteConfig {
}

#[derive(Clone)]
pub struct RedisGetRewriteBuilder {
pub struct ValkeyGetRewriteBuilder {
result: String,
}

impl TransformBuilder for RedisGetRewriteBuilder {
impl TransformBuilder for ValkeyGetRewriteBuilder {
fn build(&self, _transform_context: TransformContextBuilder) -> Box<dyn Transform> {
Box::new(RedisGetRewrite {
Box::new(ValkeyGetRewrite {
result: self.result.clone(),
})
}
Expand All @@ -53,12 +53,12 @@ impl TransformBuilder for RedisGetRewriteBuilder {
}
}

pub struct RedisGetRewrite {
pub struct ValkeyGetRewrite {
result: String,
}

#[async_trait]
impl Transform for RedisGetRewrite {
impl Transform for ValkeyGetRewrite {
fn get_name(&self) -> &'static str {
NAME
}
Expand Down Expand Up @@ -89,8 +89,8 @@ impl Transform for RedisGetRewrite {
}

fn is_get(frame: &Frame) -> bool {
if let Frame::Redis(RedisFrame::Array(array)) = frame {
if let Some(RedisFrame::BulkString(first)) = array.first() {
if let Frame::Valkey(ValkeyFrame::Array(array)) = frame {
if let Some(ValkeyFrame::BulkString(first)) = array.first() {
first.eq_ignore_ascii_case(b"GET")
} else {
false
Expand All @@ -102,5 +102,5 @@ fn is_get(frame: &Frame) -> bool {

fn rewrite_get(frame: &mut Frame, result: &str) {
tracing::info!("Replaced {frame:?} with BulkString(\"{result}\")");
*frame = Frame::Redis(RedisFrame::BulkString(result.to_owned().into()));
*frame = Frame::Valkey(ValkeyFrame::BulkString(result.to_owned().into()));
}