diff --git a/Cargo.lock b/Cargo.lock index b282fd9c4..edc516c3d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,12 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "adler32" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234" + [[package]] name = "aead" version = "0.5.2" @@ -1222,6 +1228,15 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e496a50fda8aacccc86d7529e2c1e0892dbd0f898a6b5645b5561b89c3210efa" +[[package]] +name = "core2" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b49ba7ef1ad6107f8824dbe97de947cbaac53c44e7f9756a1fba0d37c1eec505" +dependencies = [ + "memchr", +] + [[package]] name = "cpufeatures" version = "0.2.9" @@ -1540,6 +1555,12 @@ dependencies = [ "syn 2.0.32", ] +[[package]] +name = "dary_heap" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7762d17f1241643615821a8455a0b2c3e803784b058693d990b11f2dce25a0ca" + [[package]] name = "dashmap" version = "5.5.3" @@ -2552,6 +2573,30 @@ version = "0.2.147" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3" +[[package]] +name = "libflate" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f7d5654ae1795afc7ff76f4365c2c8791b0feb18e8996a96adad8ffd7c3b2bf" +dependencies = [ + "adler32", + "core2", + "crc32fast", + "dary_heap", + "libflate_lz77", +] + +[[package]] +name = "libflate_lz77" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be5f52fb8c451576ec6b79d3f4deb327398bc05bbdbd99021a6e77a4c855d524" +dependencies = [ + "core2", + "hashbrown 0.13.2", + "rle-decode-fast", +] + [[package]] name = "libm" version = "0.2.7" @@ -3797,6 +3842,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "rle-decode-fast" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3582f63211428f83597b51b2ddb88e2a91a9d52d12831f9d08f5e624e8977422" + [[package]] name = "rsa" version = "0.9.2" @@ -4366,6 +4417,7 @@ dependencies = [ "hyper", "itertools 0.10.5", "kafka-protocol", + "libflate", "lz4_flex 0.11.1", "metrics", "metrics-exporter-prometheus", diff --git a/shotover-proxy/config/config.yaml b/shotover-proxy/config/config.yaml index 42ea740ca..79879aee2 100644 --- a/shotover-proxy/config/config.yaml +++ b/shotover-proxy/config/config.yaml @@ -1,3 +1,3 @@ --- -main_log_level: "info,shotover_proxy=info" +main_log_level: "debug,shotover_proxy=debug" observability_interface: "0.0.0.0:9001" diff --git a/shotover-proxy/tests/opensearch_int_tests/mod.rs b/shotover-proxy/tests/opensearch_int_tests/mod.rs index be39351e8..f7a74bcc5 100644 --- a/shotover-proxy/tests/opensearch_int_tests/mod.rs +++ b/shotover-proxy/tests/opensearch_int_tests/mod.rs @@ -2,17 +2,24 @@ use crate::shotover_process; use opensearch::{ auth::Credentials, cert::CertificateValidation, - http::response::Response, + cluster::ClusterHealthParts, http::{ + response::Response, transport::{SingleNodeConnectionPool, TransportBuilder}, Method, StatusCode, Url, }, - indices::{IndicesCreateParts, IndicesDeleteParts, IndicesExistsParts}, + indices::{IndicesCreateParts, IndicesDeleteParts, IndicesExistsParts, IndicesGetParts}, params::Refresh, + params::WaitForStatus, BulkOperation, BulkParts, DeleteParts, Error, IndexParts, OpenSearch, SearchParts, }; use serde_json::{json, Value}; use test_helpers::docker_compose::docker_compose; +// use tokio::{ +// sync::oneshot, +// task::JoinHandle, +// time::{interval, Duration}, +// }; async fn assert_ok_and_get_json(response: Result) -> Value { let response = response.unwrap(); @@ -226,6 +233,24 @@ async fn opensearch_test_suite(client: &OpenSearch) { test_delete_index(client).await; } +fn create_client(addr: &str) -> OpenSearch { + let url = Url::parse(addr).unwrap(); + let credentials = Credentials::Basic("admin".into(), "admin".into()); + let transport = TransportBuilder::new(SingleNodeConnectionPool::new(url)) + .cert_validation(CertificateValidation::None) + .auth(credentials) + .build() + .unwrap(); + let client = OpenSearch::new(transport); + + client + .cluster() + .health(ClusterHealthParts::None) + .wait_for_status(WaitForStatus::Green); + + client +} + #[tokio::test(flavor = "multi_thread")] async fn passthrough_standard() { let _compose = docker_compose("tests/test-configs/opensearch-passthrough/docker-compose.yaml"); @@ -235,17 +260,150 @@ async fn passthrough_standard() { .await; let addr = "http://localhost:9201"; + let client = create_client(addr); - let url = Url::parse(addr).unwrap(); - let credentials = Credentials::Basic("admin".into(), "admin".into()); - let transport = TransportBuilder::new(SingleNodeConnectionPool::new(url)) - .cert_validation(CertificateValidation::None) - .auth(credentials) - .build() + opensearch_test_suite(&client).await; + + shotover.shutdown_and_then_consume_events(&[]).await; +} + +#[tokio::test(flavor = "multi_thread")] +async fn dual_write_basic() { + let _compose = docker_compose("tests/test-configs/opensearch-dual-write/docker-compose.yaml"); + + let addr1 = "http://localhost:9201"; + let client1 = create_client(addr1); + let addr2 = "http://localhost:9202"; + let client2 = create_client(addr2); + + let shotover = shotover_process("tests/test-configs/opensearch-dual-write/topology.yaml") + .start() + .await; + + let shotover_client = create_client("http://localhost:9200"); + + shotover_client + .indices() + .create(IndicesCreateParts::Index("test-index")) + .send() + .await .unwrap(); - let client = OpenSearch::new(transport); - opensearch_test_suite(&client).await; + let exists_response = shotover_client + .indices() + .exists(IndicesExistsParts::Index(&["test-index"])) + .send() + .await + .unwrap(); + + assert_eq!(exists_response.status_code(), StatusCode::OK); + + shotover_client + .index(IndexParts::Index("test-index")) + .body(json!({ + "name": "John", + "age": 30 + })) + .refresh(Refresh::WaitFor) + .send() + .await + .unwrap(); + + for client in &[shotover_client, client1, client2] { + let response = client + .search(SearchParts::Index(&["test-index"])) + .from(0) + .size(10) + .body(json!({ + "query": { + "match": { + "name": "John", + } + } + })) + .send() + .await + .unwrap(); + + let results = response.json::().await.unwrap(); + assert!(results["took"].as_i64().is_some()); + assert_eq!( + results["hits"].as_object().unwrap()["hits"] + .as_array() + .unwrap() + .len(), + 1 + ); + } + + shotover.shutdown_and_then_consume_events(&[]).await; +} + +// async fn start_writer_thread( +// client: OpenSearch, +// mut shutdown_notification_rx: oneshot::Receiver<()>, +// ) -> tokio::task::JoinHandle<()> { +// tokio::spawn(async move { +// let mut i = 0; +// let mut interval = interval(Duration::from_millis(100)); +// +// loop { +// tokio::select! { +// _ = interval.tick() => { +// // TODO send the message to opensearch +// }, +// _ = &mut shutdown_notification_rx => { +// println!("shutting down writer thread"); +// break; +// } +// } +// } +// }) +// } + +#[tokio::test(flavor = "multi_thread")] +async fn dual_write() { + // let shotover_addr = "http://localhost:9200"; + let source_addr = "http://localhost:9201"; + // let target_addr = "http://localhost:9202"; + + let _compose = docker_compose("tests/test-configs/opensearch-dual-write/docker-compose.yaml"); + + let shotover = shotover_process("tests/test-configs/opensearch-dual-write/topology.yaml") + .start() + .await; + + // let shotover_client = create_client(shotover_addr); + let source_client = create_client(source_addr); + // let target_client = create_client(target_addr); + + // Create indexes in source cluster + assert_ok_and_get_json( + source_client + .indices() + .create(IndicesCreateParts::Index("test-index")) + .send() + .await, + ) + .await; + + // Get index info from source cluster and create in target cluster + + let index_info = assert_ok_and_get_json( + source_client + .indices() + .get(IndicesGetParts::Index(&["test-index"])) + .send() + .await, + ) + .await; + + println!("{:?}", index_info); + + // Begin dual writes and verify data ends up in both clusters + // Begin reindex operations + // Continue dual writing until reindex operation complete + // verify both clusters end up in the same state shotover.shutdown_and_then_consume_events(&[]).await; } diff --git a/shotover-proxy/tests/test-configs/opensearch-dual-write/docker-compose.yaml b/shotover-proxy/tests/test-configs/opensearch-dual-write/docker-compose.yaml new file mode 100644 index 000000000..766327442 --- /dev/null +++ b/shotover-proxy/tests/test-configs/opensearch-dual-write/docker-compose.yaml @@ -0,0 +1,50 @@ +version: '3' +services: + opensearch-node1: + image: opensearchproject/opensearch:2.9.0 + container_name: opensearch-node1 + environment: + - cluster.name=opensearch-cluster-1 + - node.name=opensearch-node1 + - bootstrap.memory_lock=true + - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m" + - discovery.type=single-node + - plugins.security.disabled=true + ulimits: + memlock: + soft: -1 + hard: -1 + nofile: + soft: 65536 + hard: 65536 + volumes: + - type: volume + target: /usr/share/opensearch/data + ports: + - 9201:9200 + - 9601:9600 + + opensearch-node2: + image: opensearchproject/opensearch:2.9.0 + container_name: opensearch-node2 + environment: + - cluster.name=opensearch-cluster-2 + - node.name=opensearch-node2 + - bootstrap.memory_lock=true + - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m" + - discovery.type=single-node + - plugins.security.disabled=true + ulimits: + memlock: + soft: -1 + hard: -1 + nofile: + soft: 65536 + hard: 65536 + volumes: + - type: volume + target: /usr/share/opensearch/data + ports: + - 9202:9200 + - 9602:9600 + diff --git a/shotover-proxy/tests/test-configs/opensearch-dual-write/topology.yaml b/shotover-proxy/tests/test-configs/opensearch-dual-write/topology.yaml new file mode 100644 index 000000000..382a66e8d --- /dev/null +++ b/shotover-proxy/tests/test-configs/opensearch-dual-write/topology.yaml @@ -0,0 +1,19 @@ +--- +sources: + opensearch_prod: + OpenSearch: + listen_addr: "127.0.0.1:9200" +chain_config: + main_chain: + - Tee: + behavior: FailOnMismatch + buffer_size: 10000 + chain: + - OpenSearchSinkSingle: + remote_address: "127.0.0.1:9202" + connect_timeout_ms: 3000 + - OpenSearchSinkSingle: + remote_address: "127.0.0.1:9201" + connect_timeout_ms: 3000 +source_to_chain_mapping: + opensearch_prod: main_chain diff --git a/shotover/Cargo.toml b/shotover/Cargo.toml index 5525e9f4e..6716d2371 100644 --- a/shotover/Cargo.toml +++ b/shotover/Cargo.toml @@ -93,6 +93,7 @@ string = "0.3.0" xxhash-rust = { version = "0.8.6", features = ["xxh3"] } dashmap = "5.4.0" atoi = "2.0.0" +libflate = "2.0.0" [dev-dependencies] criterion = { version = "0.5.0", features = ["async_tokio"] } diff --git a/shotover/src/codec/opensearch.rs b/shotover/src/codec/opensearch.rs index 57db1debb..b84f9c299 100644 --- a/shotover/src/codec/opensearch.rs +++ b/shotover/src/codec/opensearch.rs @@ -257,7 +257,7 @@ impl Encoder for OpenSearchEncoder { Encodable::Frame(frame) => { let opensearch_frame = frame.into_opensearch().unwrap(); - match opensearch_frame.headers { + match &opensearch_frame.headers { HttpHead::Request(request_parts) => { self.last_outgoing_method .lock() @@ -312,46 +312,126 @@ impl Encoder for OpenSearchEncoder { } } -#[cfg(test)] -mod opensearch_tests { - use crate::codec::{opensearch::OpenSearchCodecBuilder, CodecBuilder, Direction}; - use bytes::BytesMut; - use tokio_util::codec::{Decoder, Encoder}; - - fn test_frame(raw_frame: &[u8], direction: Direction) { - let (mut decoder, mut encoder) = OpenSearchCodecBuilder::new(direction).build(); - let message = decoder - .decode(&mut BytesMut::from(raw_frame)) - .unwrap() - .unwrap(); - - let mut dest = BytesMut::new(); - encoder.encode(message, &mut dest).unwrap(); - assert_eq!(raw_frame, &dest); - } - - const RESPONSE: [u8; 186] = *b"\ - HTTP/1.1 200 OK\r\n\ - date: Mon, 27 Jul 2009 12:28:53 GMT\r\n\ - server: Apache/2.2.14 (Win32)\r\n\ - last-modified: Wed, 22 Jul 2009 19:15:56 GMT\r\n\ - content-length: 9\r\n\ - content-type: text/html\r\n\r\n\ - something"; - - const REQUEST: [u8; 90] = *b"\ - POST /cgi-bin/process.cgi HTTP/1.1\r\n\ - connection: Keep-Alive\r\n\ - content-length: 9\r\n\r\n\ - something"; - - #[test] - fn test_request() { - test_frame(&REQUEST, Direction::Source); - } - - #[test] - fn test_response() { - test_frame(&RESPONSE, Direction::Sink); - } -} +// #[cfg(test)] +// mod opensearch_tests { +// use crate::codec::{opensearch::OpenSearchCodecBuilder, CodecBuilder, Direction}; +// use bytes::BytesMut; +// use hex_literal::hex; +// use serde_json::Value; +// use tokio_util::codec::{Decoder, Encoder}; +// +// fn test_frame(raw_frame: &[u8], direction: Direction) { +// let (mut decoder, mut encoder) = OpenSearchCodecBuilder::new(direction).build(); +// let message = decoder +// .decode(&mut BytesMut::from(raw_frame)) +// .unwrap() +// .unwrap(); +// +// println!("message: {:?}", message); +// +// let mut dest = BytesMut::new(); +// encoder.encode(message, &mut dest).unwrap(); +// +// println!("dest: {:x?}", dest); +// println!("raw_frame: {:?}", raw_frame); +// +// assert_eq!(raw_frame, &dest); +// } +// +// const RESPONSE: [u8; 186] = *b"\ +// HTTP/1.1 200 OK\r\n\ +// date: Mon, 27 Jul 2009 12:28:53 GMT\r\n\ +// server: Apache/2.2.14 (Win32)\r\n\ +// last-modified: Wed, 22 Jul 2009 19:15:56 GMT\r\n\ +// content-length: 9\r\n\ +// content-type: text/html\r\n\r\n\ +// something"; +// +// const REQUEST: [u8; 90] = *b"\ +// POST /cgi-bin/process.cgi HTTP/1.1\r\n\ +// connection: Keep-Alive\r\n\ +// content-length: 9\r\n\r\n\ +// something"; +// +// const GZIP_RESPONSE: [u8; 594] = hex!("48 54 54 50 2f 31 2e 31 20 32 30 30 20 4f 4b 0d 0a 63 6f 6e 74 65 6e 74 2d 74 79 70 65 3a 20 61 70 70 6c 69 63 61 74 69 6f 6e 2f 6a 73 6f 6e 3b 20 63 68 61 72 73 65 74 3d 55 54 46 2d 38 0d 0a 63 6f 6e 74 65 6e 74 2d 65 6e 63 6f 64 69 6e 67 3a 20 67 7a 69 70 0d 0a 63 6f 6e 74 65 6e 74 2d 6c 65 6e 67 74 68 3a 20 34 38 33 0d 0a 0d 0a 1f 8b 08 00 00 00 00 00 00 00 cc 94 c9 6e db 30 10 86 5f 45 98 b3 52 78 5f 74 33 d2 16 c8 d6 c0 4e da 06 0d 0c 82 16 a9 98 30 25 2a 5c 0c 0b 82 df 3d 23 1a 41 63 a0 25 7b ec 45 20 31 ff 7c 9c e1 fc 54 0b 56 a9 1d 64 a3 71 0a 56 94 9c 11 e5 2c 64 05 95 86 a7 40 cc 96 6a 66 20 6b 51 66 a9 84 6c 98 82 71 79 ce 8d 29 1c 6e fb b8 dd 89 ba e6 0c b2 5e 0a 05 15 b2 5b 0e 4e 4b a7 39 e6 3e b7 e0 31 5e 21 2a c6 0f 90 c1 27 55 f3 ca 70 aa f3 ed 85 da 18 ae f7 74 23 a4 b0 0d a4 50 29 c6 51 b2 b9 5d 15 8b d5 ed f4 a1 bc bc d9 7d bd 53 e6 d1 bc 5c 2d 31 ae 39 35 aa f2 45 35 75 a7 7c 75 5c 37 a7 5a 09 3f e4 bc b6 02 e3 bf 85 f0 4d 25 25 ad 6b 51 bd 24 85 72 15 c3 af 4e 9e c5 3a 11 55 a2 34 e3 3a b1 2a 31 4a db c4 e7 fd 4b 91 5e 43 9c 13 d8 17 b0 5f 0b db 3c 95 bd c7 dc fd dc 7c a7 07 42 d8 e7 e5 02 8e c7 f4 4f bd 5b 6e ec c5 e9 8c ff a8 d9 b3 aa ce ba a3 d7 fb d5 6a 71 65 96 4d 79 7d f9 63 76 77 73 ff 85 b0 25 76 b7 3e a6 b0 15 f6 a3 3f 5a d8 53 e9 70 28 7d 1c b6 e6 92 fa 51 64 c0 5f 01 c5 25 5e 8d c9 95 c6 78 e5 a4 7c cf 46 87 90 f7 3b af 95 41 20 5a cf 5f 6c af 5b 9d 65 10 a3 9c ce 11 d0 82 f0 8e b2 c2 ca ce 04 f7 68 a8 07 6f a8 ee a4 6e 98 68 bd 1e 96 f8 57 78 3f 02 47 73 07 e1 fd 10 7c 10 81 e3 13 09 c2 07 21 f8 30 02 c7 57 1a 84 0f 43 f0 51 04 3e 8a c0 47 21 f8 38 02 f7 bf a1 c0 40 c7 21 f8 24 02 9f 44 e0 93 10 7c 1a 81 4f 23 f0 69 08 3e 8b c0 67 11 f8 2c 04 9f 47 e0 f3 08 7c be c6 97 7e 7c 03 00 00 ff ff 03 00 c2 4e 64 8e 2a 06 00 00"); +// +// // #[test] +// // fn test_request() { +// // test_frame(&REQUEST, Direction::Source); +// // } +// // +// // #[test] +// // fn test_response() { +// // test_frame(&RESPONSE, Direction::Sink); +// // } +// +// #[test] +// fn test_encoding() { +// test_frame(&GZIP_RESPONSE, Direction::Sink); +// } +// +// fn decode_gzip(bytes: Vec) -> Value { +// use libflate::gzip::Decoder; +// use libflate::gzip::Encoder; +// use std::io::{Read, Write}; +// +// let mut decoder = Decoder::new(&bytes[..]).unwrap(); +// +// let mut decoded_data = Vec::new(); +// decoder.read_to_end(&mut decoded_data).unwrap(); +// +// let json = serde_json::from_slice::(&decoded_data).unwrap(); +// +// println!("decoded json: {:?}", json); +// +// json +// } +// +// fn encode_gzip(value: Value) -> Vec { +// use libflate::gzip::Decoder; +// use libflate::gzip::Encoder; +// use std::io::{Read, Write}; +// +// let json_bytes = serde_json::to_vec(&value).unwrap(); +// +// let mut encoder = Encoder::new(Vec::new()).unwrap(); +// encoder.write_all(&json_bytes).unwrap(); +// let encoded_data = encoder.finish().into_result().unwrap(); +// +// encoded_data +// } +// +// #[test] +// fn test_gzip_codec() { +// use libflate::gzip::Decoder; +// use libflate::gzip::Encoder; +// use std::io::{Read, Write}; +// +// /* let bytes = hex!("1f 8b 08 00 00 00 00 00 00 00 cc 94 c9 6e db 30 10 86 5f 45 98 b3 52 78 5f 74 33 d2 16 c8 d6 c0 4e da 06 0d 0c 82 16 a9 98 30 25 2a 5c 0c 0b 82 df 3d 23 1a 41 63 a0 25 7b ec 45 20 31 ff 7c 9c e1 fc 54 0b 56 a9 1d 64 a3 71 0a 56 94 9c 11 e5 2c 64 05 95 86 a7 40 cc 96 6a 66 20 6b 51 66 a9 84 6c 98 82 71 79 ce 8d 29 1c 6e fb b8 dd 89 ba e6 0c b2 5e 0a 05 15 b2 5b 0e 4e 4b a7 39 e6 3e b7 e0 31 5e 21 2a c6 0f 90 c1 27 55 f3 ca 70 aa f3 ed 85 da 18 ae f7 74 23 a4 b0 0d a4 50 29 c6 51 b2 b9 5d 15 8b d5 ed f4 a1 bc bc d9 7d bd 53 e6 d1 bc 5c 2d 31 ae 39 35 aa f2 45 35 75 a7 7c 75 5c 37 a7 5a 09 3f e4 bc b6 02 e3 bf 85 f0 4d 25 25 ad 6b 51 bd 24 85 72 15 c3 af 4e 9e c5 3a 11 55 a2 34 e3 3a b1 2a 31 4a db c4 e7 fd 4b 91 5e 43 9c 13 d8 17 b0 5f 0b db 3c 95 bd c7 dc fd dc 7c a7 07 42 d8 e7 e5 02 8e c7 f4 4f bd 5b 6e ec c5 e9 8c ff a8 d9 b3 aa ce ba a3 d7 fb d5 6a 71 65 96 4d 79 7d f9 63 76 77 73 ff 85 b0 25 76 b7 3e a6 b0 15 f6 a3 3f 5a d8 53 e9 70 28 7d 1c b6 e6 92 fa 51 64 c0 5f 01 c5 25 5e 8d c9 95 c6 78 e5 a4 7c cf 46 87 90 f7 3b af 95 41 20 5a cf 5f 6c af 5b 9d 65 10 a3 9c ce 11 d0 82 f0 8e b2 c2 ca ce 04 f7 68 a8 07 6f a8 ee a4 6e 98 68 bd 1e 96 f8 57 78 3f 02 47 73 07 e1 fd 10 7c 10 81 e3 13 09 c2 07 21 f8 30 02 c7 57 1a 84 0f 43 f0 51 04 3e 8a c0 47 21 f8 38 02 f7 bf a1 c0 40 c7 21 f8 24 02 9f 44 e0 93 10 7c 1a 81 4f 23 f0 69 08 3e 8b c0 67 11 f8 2c 04 9f 47 e0 f3 08 7c be c6 97 7e 7c 03 00 00 ff ff 03 00 c2 4e 64 8e 2a 06 00 00"); */ +// +// let bytes = +// hex!("1f8b0800000000000003ab56ca48cdc9c957b252502acf2fca4951aa050022aea38612000000"); +// +// let json = decode_gzip(bytes.to_vec()); +// let encoded_data = encode_gzip(json.clone()); +// +// // assert_eq!(bytes.to_vec(), encoded_data); +// // +// let json = decode_gzip(encoded_data.clone()); +// let encoded_data2 = encode_gzip(json.clone()); +// assert_eq!(encoded_data, encoded_data2); +// +// // +// // let json_bytes = serde_json::to_vec(&json).unwrap(); +// // let new_json = serde_json::from_slice::(&json_bytes).unwrap(); +// // +// // assert_eq!(json,new_json); +// // assert_eq!(decoded_data, json_bytes); +// // +// /* let mut encoder = Encoder::new(Vec::new()).unwrap(); +// encoder.write_all(&decoded_data).unwrap(); +// let encoded_data = encoder.finish().into_result().unwrap(); +// +// assert_eq!(encoded_data, bytes); */ +// } +// } diff --git a/shotover/src/frame/opensearch.rs b/shotover/src/frame/opensearch.rs index fe76f16c5..11f5e2733 100644 --- a/shotover/src/frame/opensearch.rs +++ b/shotover/src/frame/opensearch.rs @@ -1,19 +1,26 @@ use anyhow::Result; use bytes::Bytes; +use derivative::Derivative; use http::{HeaderMap, Method, StatusCode, Uri, Version}; +use serde_json::Value; +use std::fmt; -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, Derivative)] +#[derivative(PartialEq)] pub struct ResponseParts { pub status: StatusCode, pub version: Version, + #[derivative(PartialEq = "ignore")] pub headers: HeaderMap, } -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, Derivative)] +#[derivative(PartialEq)] pub struct RequestParts { pub method: Method, pub uri: Uri, pub version: Version, + #[derivative(PartialEq = "ignore")] pub headers: HeaderMap, } @@ -23,12 +30,33 @@ pub enum HttpHead { Request(RequestParts), } -#[derive(Debug, Clone, PartialEq)] +impl HttpHead { + pub fn headers(&self) -> &HeaderMap { + match &self { + HttpHead::Response(response) => &response.headers, + HttpHead::Request(request) => &request.headers, + } + } +} + +#[derive(Clone, Derivative)] +#[derivative(PartialEq)] pub struct OpenSearchFrame { pub headers: HttpHead, + + #[derivative(PartialEq = "ignore")] pub body: Bytes, } +impl fmt::Debug for OpenSearchFrame { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("OpenSearchFrame") + .field("headers", &self.headers) + .field("body", &self.json_str()) + .finish() + } +} + impl OpenSearchFrame { pub fn new(headers: HttpHead, body: Bytes) -> Self { Self { headers, body } @@ -37,4 +65,46 @@ impl OpenSearchFrame { pub fn from_bytes(_bytes: &Bytes) -> Result { todo!(); } + + pub fn new_server_error_response() -> Self { + let headers = HttpHead::Response(ResponseParts { + status: StatusCode::INTERNAL_SERVER_ERROR, + version: Version::HTTP_11, + headers: HeaderMap::new(), + }); + let body = Bytes::new(); + Self::new(headers, body) + } + + pub fn json_str(&self) -> String { + use http::header; + + if self.body.is_empty() { + return "".to_string(); + }; + + let body = if self + .headers + .headers() + .get(header::CONTENT_ENCODING) + .is_some() + { + use libflate::gzip::Decoder; + use std::io::Read; + + let mut decoder = Decoder::new(&self.body[..]).unwrap(); + + let mut decoded_data = Vec::new(); + decoder.read_to_end(&mut decoded_data).unwrap(); + + decoded_data + } else { + self.body.to_vec() + }; + + match serde_json::from_slice::(&body) { + Ok(json) => serde_json::to_string_pretty(&json).unwrap(), + Err(_) => format!("failed to parse json: {:?}", pretty_hex::pretty_hex(&body)), + } + } } diff --git a/shotover/src/message/mod.rs b/shotover/src/message/mod.rs index 7bc37240f..98496fb16 100644 --- a/shotover/src/message/mod.rs +++ b/shotover/src/message/mod.rs @@ -4,6 +4,7 @@ use crate::codec::kafka::RequestHeader; use crate::codec::CodecState; use crate::frame::cassandra::Tracing; use crate::frame::redis::redis_query_type; +use crate::frame::OpenSearchFrame; use crate::frame::{ cassandra, cassandra::{CassandraMetadata, CassandraOperation}, @@ -278,7 +279,7 @@ impl Message { Metadata::Kafka => return Err(anyhow!(error).context( "A generic error cannot be formed because the kafka protocol does not support it", )), - Metadata::OpenSearch => unimplemented!() + Metadata::OpenSearch => Frame::OpenSearch(OpenSearchFrame::new_server_error_response()), })) } @@ -295,14 +296,14 @@ impl Message { MessageType::Redis => Ok(Metadata::Redis), MessageType::Kafka => Ok(Metadata::Kafka), MessageType::Dummy => Err(anyhow!("Dummy has no metadata")), - MessageType::OpenSearch => Err(anyhow!("OpenSearch has no metadata")), + MessageType::OpenSearch => Ok(Metadata::OpenSearch), }, MessageInner::Parsed { frame, .. } | MessageInner::Modified { frame } => match frame { Frame::Cassandra(frame) => Ok(Metadata::Cassandra(frame.metadata())), Frame::Kafka(_) => Ok(Metadata::Kafka), Frame::Redis(_) => Ok(Metadata::Redis), Frame::Dummy => Err(anyhow!("dummy has no metadata")), - Frame::OpenSearch(_) => Err(anyhow!("OpenSearch has no metadata")), + Frame::OpenSearch(_) => Ok(Metadata::OpenSearch), }, } }