diff --git a/Cargo.lock b/Cargo.lock index a04354f45..b9c9b9c8b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,12 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "adler32" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234" + [[package]] name = "aead" version = "0.5.2" @@ -54,14 +60,15 @@ dependencies = [ [[package]] name = "ahash" -version = "0.8.3" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c99f64d1e06488f620f932677e24bc6e2897582980441ae90a671415bd7ec2f" +checksum = "cd7d5a2cecb58716e47d67d5703a249964b14c7be1ec3cad3affc295b2d1c35d" dependencies = [ "cfg-if", "getrandom 0.2.10", "once_cell", "version_check", + "zerocopy", ] [[package]] @@ -638,7 +645,7 @@ dependencies = [ "aws-config", "aws-sdk-ec2", "aws-sdk-iam", - "base64 0.21.4", + "base64 0.21.5", "russh", "russh-keys", "ssh-key", @@ -701,9 +708,9 @@ checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" [[package]] name = "base64" -version = "0.21.4" +version = "0.21.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ba43ea6f343b788c8764558649e08df62f86c6ef251fdaeb1ffd010a9ae50a2" +checksum = "35636a1494ede3b646cc98f74f8e62c773a38a659ebc777a2cf26b9b74171df9" [[package]] name = "base64-simd" @@ -871,7 +878,7 @@ dependencies = [ "cached_proc_macro", "cached_proc_macro_types", "futures", - "hashbrown 0.14.1", + "hashbrown 0.14.2", "instant", "once_cell", "thiserror", @@ -1241,11 +1248,20 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e496a50fda8aacccc86d7529e2c1e0892dbd0f898a6b5645b5561b89c3210efa" +[[package]] +name = "core2" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b49ba7ef1ad6107f8824dbe97de947cbaac53c44e7f9756a1fba0d37c1eec505" +dependencies = [ + "memchr", +] + [[package]] name = "cpufeatures" -version = "0.2.9" +version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a17b76ff3a4162b0b27f354a0c87015ddad39d35f9c0c36607a3bdd175dde1f1" +checksum = "3fbc60abd742b35f2492f808e1abbb83d45f72db402e14c55057edc9c7b1e9e4" dependencies = [ "libc", ] @@ -1568,6 +1584,12 @@ dependencies = [ "syn 2.0.38", ] +[[package]] +name = "dary_heap" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7762d17f1241643615821a8455a0b2c3e803784b058693d990b11f2dce25a0ca" + [[package]] name = "dashmap" version = "5.5.3" @@ -1575,7 +1597,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" dependencies = [ "cfg-if", - "hashbrown 0.14.1", + "hashbrown 0.14.2", "lock_api", "once_cell", "parking_lot_core", @@ -1987,7 +2009,7 @@ dependencies = [ "rustls-native-certs", "rustls-webpki", "semver", - "socket2 0.5.4", + "socket2 0.5.5", "tokio", "tokio-rustls", "tokio-stream", @@ -2214,7 +2236,7 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5681137554ddff44396e5f149892c769d45301dd9aa19c51602a89ee214cb0ec" dependencies = [ - "hashbrown 0.13.1", + "hashbrown 0.13.2", ] [[package]] @@ -2225,18 +2247,18 @@ checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" [[package]] name = "hashbrown" -version = "0.13.1" +version = "0.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33ff8ae62cd3a9102e5637afc8452c55acf3844001bd5374e0b0bd7b6616c038" +checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e" dependencies = [ "ahash", ] [[package]] name = "hashbrown" -version = "0.14.1" +version = "0.14.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7dfda62a12f55daeae5015f81b0baea145391cb4520f86c248fc615d72640d12" +checksum = "f93e7192158dbcda357bdec5fb5788eebf8bbac027f3f33e719d29135ae84156" dependencies = [ "ahash", "allocator-api2", @@ -2341,7 +2363,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.4.9", + "socket2 0.4.10", "tokio", "tower-service", "tracing", @@ -2434,7 +2456,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8adf3ddd720272c6ea8bf59463c04e0f93d0bbf7c5439b691bca2987e0270897" dependencies = [ "equivalent", - "hashbrown 0.14.1", + "hashbrown 0.14.2", ] [[package]] @@ -2491,9 +2513,9 @@ checksum = "e1be380c410bf0595e94992a648ea89db4dd3f3354ba54af206fd2a68cf5ac8e" [[package]] name = "ipnet" -version = "2.8.0" +version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28b29a3cd74f0f4598934efe3aeba42bae0eb4680554128851ebbecb02af14e6" +checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" [[package]] name = "is-terminal" @@ -2573,6 +2595,30 @@ version = "0.2.149" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a08173bc88b7955d1b3145aa561539096c421ac8debde8cbc3612ec635fee29b" +[[package]] +name = "libflate" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f7d5654ae1795afc7ff76f4365c2c8791b0feb18e8996a96adad8ffd7c3b2bf" +dependencies = [ + "adler32", + "core2", + "crc32fast", + "dary_heap", + "libflate_lz77", +] + +[[package]] +name = "libflate_lz77" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be5f52fb8c451576ec6b79d3f4deb327398bc05bbdbd99021a6e77a4c855d524" +dependencies = [ + "core2", + "hashbrown 0.13.2", + "rle-decode-fast", +] + [[package]] name = "libm" version = "0.2.8" @@ -2678,7 +2724,7 @@ version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a4964177ddfdab1e3a2b37aec7cf320e14169abb0ed73999f558136409178d5" dependencies = [ - "base64 0.21.4", + "base64 0.21.5", "hyper", "indexmap 1.9.3", "ipnet", @@ -2703,13 +2749,13 @@ dependencies = [ [[package]] name = "metrics-util" -version = "0.15.1" +version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4de2ed6e491ed114b40b732e4d1659a9d53992ebd87490c44a6ffe23739d973e" +checksum = "111cb375987443c3de8d503580b536f77dc8416d32db62d9456db5d93bd7ac47" dependencies = [ "crossbeam-epoch", "crossbeam-utils", - "hashbrown 0.13.1", + "hashbrown 0.13.2", "metrics", "num_cpus", "quanta", @@ -2739,9 +2785,9 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.8" +version = "0.8.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2" +checksum = "3dce281c5e46beae905d4de1870d8b1509a9142b62eedf18b443b011ca8343d0" dependencies = [ "libc", "wasi 0.11.0+wasi-snapshot-preview1", @@ -3042,7 +3088,7 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f7899e6ad63d5c1dc6394785d625cadf560aaa5e606d6b709fd5cc6bbf1727f1" dependencies = [ - "base64 0.21.4", + "base64 0.21.5", "bytes", "dyn-clone", "lazy_static", @@ -3230,7 +3276,7 @@ version = "3.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3163d2912b7c3b52d651a055f2c7eec9ba5cd22d26ef75b8dd3a59980b185923" dependencies = [ - "base64 0.21.4", + "base64 0.21.5", "serde", ] @@ -3367,9 +3413,9 @@ dependencies = [ [[package]] name = "portable-atomic" -version = "1.4.3" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31114a898e107c51bb1609ffaf55a0e011cf6a4d7f1170d0015a165082c0338b" +checksum = "b559898e0b4931ed2d3b959ab0c2da4d99cc644c4b0b1a35b4d344027f474023" [[package]] name = "powerfmt" @@ -3642,7 +3688,7 @@ dependencies = [ "rand 0.8.5", "ryu", "sha1_smol", - "socket2 0.4.9", + "socket2 0.4.10", "tokio", "tokio-util", "url", @@ -3757,7 +3803,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "046cd98826c46c2ac8ddecae268eb5c2e58628688a5fc7a2643704a73faba95b" dependencies = [ "async-compression", - "base64 0.21.4", + "base64 0.21.5", "bytes", "encoding_rs", "futures-core", @@ -3824,6 +3870,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "rle-decode-fast" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3582f63211428f83597b51b2ddb88e2a91a9d52d12831f9d08f5e624e8977422" + [[package]] name = "rsa" version = "0.9.2" @@ -3986,9 +4038,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.19" +version = "0.38.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "745ecfa778e66b2b63c88a61cb36e0eea109e803b0b86bf9879fbc77c70e86ed" +checksum = "67ce50cb2e16c2903e30d1cbccfd8387a74b9d4c938b6a4c5ec6cc7556f7a8a0" dependencies = [ "bitflags 2.4.1", "errno", @@ -4027,7 +4079,7 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2d3987094b1d07b653b7dfdc3f70ce9a1da9c51ac18c1b06b662e4f9a0e9f4b2" dependencies = [ - "base64 0.21.4", + "base64 0.21.5", ] [[package]] @@ -4135,7 +4187,7 @@ dependencies = [ "scylla-macros", "smallvec", "snap", - "socket2 0.5.4", + "socket2 0.5.5", "strum 0.23.0", "strum_macros 0.23.1", "thiserror", @@ -4377,7 +4429,7 @@ dependencies = [ "aws-sdk-kms", "backtrace", "backtrace-ext", - "base64 0.21.4", + "base64 0.21.5", "bigdecimal 0.4.2", "bincode", "bytes", @@ -4403,6 +4455,7 @@ dependencies = [ "hyper", "itertools 0.11.0", "kafka-protocol", + "libflate", "lz4_flex", "metrics", "metrics-exporter-prometheus", @@ -4534,9 +4587,9 @@ checksum = "5e9f0ab6ef7eb7353d9119c170a436d1bf248eea575ac42d19d12f4e34130831" [[package]] name = "socket2" -version = "0.4.9" +version = "0.4.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64a4a911eed85daf18834cfaa86a79b7d266ff93ff5ba14005426219480ed662" +checksum = "9f7916fc008ca5542385b89a3d3ce689953c143e9304a9bf8beec1de48994c0d" dependencies = [ "libc", "winapi", @@ -4544,9 +4597,9 @@ dependencies = [ [[package]] name = "socket2" -version = "0.5.4" +version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4031e820eb552adee9295814c0ced9e5cf38ddf1e8b7d566d6de8e2538ea989e" +checksum = "7b5fac59a5cb5dd637972e5fca70daf0523c9067fcdc4842f053dae04a18f8e9" dependencies = [ "libc", "windows-sys 0.48.0", @@ -4786,18 +4839,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.49" +version = "1.0.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1177e8c6d7ede7afde3585fd2513e611227efd6481bd78d2e82ba1ce16557ed4" +checksum = "f9a7210f5c9a7156bb50aa36aed4c95afb51df0df00713949448cf9e97d382d2" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.49" +version = "1.0.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10712f02019e9288794769fba95cd6847df9874d49d871d062172f9dd41bc4cc" +checksum = "266b2e40bc00e5a6c09c3584011e08b06f123c00362c92b975ba9843aaaa14b8" dependencies = [ "proc-macro2", "quote", @@ -4882,7 +4935,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2 0.5.4", + "socket2 0.5.5", "tokio-macros", "windows-sys 0.48.0", ] @@ -5014,9 +5067,9 @@ dependencies = [ [[package]] name = "toml_datetime" -version = "0.6.3" +version = "0.6.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7cda73e2f1397b1262d6dfdcef8aafae14d1de7748d66822d3bfeeb6d03e5e4b" +checksum = "3550f4e9685620ac18a50ed434eb3aec30db8ba93b0287467bca5826ea25baf1" [[package]] name = "toml_edit" @@ -5059,9 +5112,9 @@ checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" [[package]] name = "tracing" -version = "0.1.39" +version = "0.1.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee2ef2af84856a50c1d430afce2fdded0a4ec7eda868db86409b4543df0797f9" +checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ "log", "pin-project-lite", @@ -5103,12 +5156,12 @@ dependencies = [ [[package]] name = "tracing-log" -version = "0.1.3" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78ddad33d2d10b1ed7eb9d1f518a5674713876e97e5bb9b7345a7984fbb4f922" +checksum = "f751112709b4e791d8ce53e32c4ed2d353565a795ce84da2285393f41557bdf2" dependencies = [ - "lazy_static", "log", + "once_cell", "tracing-core", ] @@ -5732,6 +5785,26 @@ dependencies = [ "time", ] +[[package]] +name = "zerocopy" +version = "0.7.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c19fae0c8a9efc6a8281f2e623db8af1db9e57852e04cde3e754dd2dc29340f" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.7.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc56589e9ddd1f1c28d4b4b5c773ce232910a6bb67a70133d61c9e347585efe9" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.38", +] + [[package]] name = "zeroize" version = "1.6.0" diff --git a/shotover-proxy/tests/opensearch_int_tests/mod.rs b/shotover-proxy/tests/opensearch_int_tests/mod.rs index 2fcd3814f..adb5074c9 100644 --- a/shotover-proxy/tests/opensearch_int_tests/mod.rs +++ b/shotover-proxy/tests/opensearch_int_tests/mod.rs @@ -1,7 +1,9 @@ use crate::shotover_process; +use hyper::{Body as HyperBody, Client, Method as HyperMethod, Request, Response as HyperResponse}; use opensearch::{ auth::Credentials, cert::CertificateValidation, + cluster::ClusterHealthParts, http::{ headers::{HeaderName, HeaderValue}, response::Response, @@ -10,11 +12,29 @@ use opensearch::{ }, indices::{IndicesCreateParts, IndicesDeleteParts, IndicesExistsParts}, nodes::NodesInfoParts, - params::Refresh, + params::{Refresh, WaitForStatus}, BulkOperation, BulkParts, DeleteParts, Error, IndexParts, OpenSearch, SearchParts, }; use serde_json::{json, Value}; use test_helpers::docker_compose::docker_compose; +use test_helpers::shotover_process::{EventMatcher, Level}; +use tokio::time::Duration; + +async fn hyper_request( + uri: String, + method: HyperMethod, + body: HyperBody, +) -> HyperResponse { + let client = Client::new(); + + let req = Request::builder() + .method(method) + .uri(uri) + .body(body) + .expect("request builder"); + + client.request(req).await.unwrap() +} async fn assert_ok_and_get_json(response: Result) -> Value { let response = response.unwrap(); @@ -34,6 +54,60 @@ async fn assert_ok_and_get_json(response: Result) -> Value { } } +async fn get_cluster_name(client: &OpenSearch) -> String { + let response = assert_ok_and_get_json( + client + .cluster() + .stats(opensearch::cluster::ClusterStatsParts::None) + .send() + .await, + ) + .await; + + response["cluster_name"].as_str().unwrap().to_owned() +} + +async fn assert_ok_and_same_data( + response_a: Result, + response_b: Result, + warn: bool, +) { + let mut response_a = assert_ok_and_get_json(response_a).await["hits"]["hits"] + .as_array() + .unwrap() + .clone(); + let mut response_b = assert_ok_and_get_json(response_b).await["hits"]["hits"] + .as_array() + .unwrap() + .clone(); + + if !warn { + assert_eq!(response_a.len(), response_b.len()); + } else if response_a.len() != response_b.len() { + println!("Response A len: {:#?}", response_a.len()); + println!("Response B len: {:#?}", response_b.len()); + } + + response_a.sort_by(|a, b| { + let a_age = a["_source"]["age"].as_i64().unwrap(); + let b_age = b["_source"]["age"].as_i64().unwrap(); + a_age.cmp(&b_age) + }); + + response_b.sort_by(|a, b| { + let a_age = a["_source"]["age"].as_i64().unwrap(); + let b_age = b["_source"]["age"].as_i64().unwrap(); + a_age.cmp(&b_age) + }); + + if !warn { + assert_eq!(response_a, response_b); + } else if response_a != response_b { + println!("Response A: {:#?}", response_a); + println!("Response B: {:#?}", response_b); + } +} + pub async fn test_bulk(client: &OpenSearch) { assert_ok_and_get_json( client @@ -242,6 +316,24 @@ async fn opensearch_test_suite(client: &OpenSearch) { .await; } +fn create_client(addr: &str) -> OpenSearch { + let url = Url::parse(addr).unwrap(); + let credentials = Credentials::Basic("admin".into(), "admin".into()); + let transport = TransportBuilder::new(SingleNodeConnectionPool::new(url)) + .cert_validation(CertificateValidation::None) + .auth(credentials) + .build() + .unwrap(); + let client = OpenSearch::new(transport); + + client + .cluster() + .health(ClusterHealthParts::None) + .wait_for_status(WaitForStatus::Green); + + client +} + #[tokio::test(flavor = "multi_thread")] async fn passthrough_standard() { let _compose = docker_compose("tests/test-configs/opensearch-passthrough/docker-compose.yaml"); @@ -251,17 +343,311 @@ async fn passthrough_standard() { .await; let addr = "http://localhost:9201"; + let client = create_client(addr); - let url = Url::parse(addr).unwrap(); - let credentials = Credentials::Basic("admin".into(), "admin".into()); - let transport = TransportBuilder::new(SingleNodeConnectionPool::new(url)) - .cert_validation(CertificateValidation::None) - .auth(credentials) - .build() + let res = client + .nodes() + .info(NodesInfoParts::None) + .header( + HeaderName::from_lowercase(b"accept-encoding").unwrap(), + HeaderValue::from_str("").unwrap(), + ) + .send() + .await .unwrap(); - let client = OpenSearch::new(transport); + + println!("{:#?}", res); opensearch_test_suite(&client).await; shotover.shutdown_and_then_consume_events(&[]).await; } + +#[tokio::test(flavor = "multi_thread")] +async fn dual_write_basic() { + let _compose = docker_compose("tests/test-configs/opensearch-dual-write/docker-compose.yaml"); + + let addr1 = "http://172.16.1.2:9200"; + let client1 = create_client(addr1); + let addr2 = "http://172.16.1.3:9200"; + let client2 = create_client(addr2); + + let shotover = shotover_process("tests/test-configs/opensearch-dual-write/topology.yaml") + .start() + .await; + + let shotover_client = create_client("http://localhost:9200"); + + shotover_client + .indices() + .create(IndicesCreateParts::Index("test-index")) + .send() + .await + .unwrap(); + + let exists_response = shotover_client + .indices() + .exists(IndicesExistsParts::Index(&["test-index"])) + .send() + .await + .unwrap(); + + assert_eq!(exists_response.status_code(), StatusCode::OK); + + shotover_client + .index(IndexParts::Index("test-index")) + .body(json!({ + "name": "John", + "age": 30 + })) + .refresh(Refresh::WaitFor) + .send() + .await + .unwrap(); + + for client in &[shotover_client, client1, client2] { + let response = client + .search(SearchParts::Index(&["test-index"])) + .from(0) + .size(10) + .body(json!({ + "query": { + "match": { + "name": "John", + } + } + })) + .send() + .await + .unwrap(); + + let results = response.json::().await.unwrap(); + assert!(results["took"].as_i64().is_some()); + assert_eq!( + results["hits"].as_object().unwrap()["hits"] + .as_array() + .unwrap() + .len(), + 1 + ); + } + + shotover.shutdown_and_then_consume_events(&[]).await; +} + +async fn index_100_documents(client: &OpenSearch) { + let mut body: Vec> = vec![]; + for i in 0..100 { + let op = BulkOperation::index(json!({ + "name": "John", + "age": i + })) + .id(i.to_string()) + .into(); + body.push(op); + } + + assert_ok_and_get_json( + client + .bulk(BulkParts::Index("test-index")) + .body(body) + .refresh(Refresh::WaitFor) + .send() + .await, + ) + .await; +} + +#[tokio::test(flavor = "multi_thread")] +async fn dual_write_reindex() { + let shotover_addr = "http://localhost:9200"; + let source_addr = "http://172.16.1.2:9200"; + let target_addr = "http://172.16.1.3:9200"; + + let _compose = docker_compose("tests/test-configs/opensearch-dual-write/docker-compose.yaml"); + + let shotover = shotover_process("tests/test-configs/opensearch-dual-write/topology.yaml") + .start() + .await; + + let source_client = create_client(source_addr); + let target_client = create_client(target_addr); + let shotover_client = create_client(shotover_addr); + + // verify that shotover is returning responses from the source clusters + let cluster_name = get_cluster_name(&shotover_client).await; + println!("Shotover is returning responses from {:?}", cluster_name); + assert_eq!(cluster_name, "source-cluster"); + + // Create indexes in source cluster + assert_ok_and_get_json( + source_client + .indices() + .create(IndicesCreateParts::Index("test-index")) + .send() + .await, + ) + .await; + + // Create in target cluster + assert_ok_and_get_json( + target_client + .indices() + .create(IndicesCreateParts::Index("test-index")) + .send() + .await, + ) + .await; + + index_100_documents(&source_client).await; + + let shotover_client_c = shotover_client.clone(); + let dual_write_jh = tokio::spawn(async move { + for _ in 0..20 { + // get a random number in between 0 and 2000 + let i = rand::random::() % 100; + + let response = assert_ok_and_get_json( + shotover_client_c + .search(SearchParts::Index(&["test-index"])) + .from(0) + .size(200) + .body(json!({ + "query": { + "match": { + "age": i, + } + } + })) + .send() + .await, + ) + .await; + + assert_eq!(response["hits"]["hits"].as_array().unwrap().len(), 1); + + let document = &response["hits"]["hits"][0]; + + assert_ok_and_get_json( + shotover_client_c + .update(opensearch::UpdateParts::IndexId( + "test-index", + document["_id"].as_str().unwrap(), + )) + .body(json!({ + "doc": { "name" : Value::String("Smith".into())} + })) + .refresh(Refresh::WaitFor) + .send() + .await, + ) + .await; + + tokio::time::sleep(Duration::from_millis(500)).await; + } + }); + + let target_client_c = target_client.clone(); + let reindex_jh = tokio::spawn(async move { + target_client_c + .reindex() + .body(json!( + { + "source":{ + "remote":{ + "host": source_addr, + "username":"admin", + "password":"admin" + }, + "index": "test-index" + }, + "dest":{ + "index": "test-index", + } + } + )) + .requests_per_second(1) + .send() + .await + .unwrap(); + }); + + // Begin dual writes + // Begin reindex operations + let _ = tokio::join!(reindex_jh, dual_write_jh); + + // verify both clusters end up in the same state + let target = target_client + .search(SearchParts::Index(&["test-index"])) + .from(0) + .size(200) + .body(json!({ + "query": { + "match": { + "name": "Smith", + } + } + })) + .send(); + + let source = source_client + .search(SearchParts::Index(&["test-index"])) + .from(0) + .size(200) + .body(json!({ + "query": { + "match": { + "name": "Smith", + } + } + })) + .send(); + + assert_ok_and_same_data(target.await, source.await, true).await; + + // verify both clusters end up in the same state + let target = target_client + .search(SearchParts::Index(&["test-index"])) + .from(0) + .size(200) + .body(json!({ + "query": { + "match": { + "name": "John", + } + } + })) + .send(); + + let source = source_client + .search(SearchParts::Index(&["test-index"])) + .from(0) + .size(200) + .body(json!({ + "query": { + "match": { + "name": "John", + } + } + })) + .send(); + + assert_ok_and_same_data(target.await, source.await, true).await; + + // switch shotover to the target cluster + let _ = hyper_request( + format!("http://localhost:{}/transform/tee/result-source", 1234), + HyperMethod::PUT, + HyperBody::from("tee-chain"), + ) + .await; + let cluster_name = get_cluster_name(&shotover_client).await; + println!("Shotover is returning responses from {:?}", cluster_name); + assert_eq!(cluster_name, "target-cluster"); + + shotover + .shutdown_and_then_consume_events(&[EventMatcher::new() + .with_level(Level::Warn) + .with_target("shotover::transforms::tee")]) + .await; +} diff --git a/shotover-proxy/tests/test-configs/opensearch-dual-write/docker-compose.yaml b/shotover-proxy/tests/test-configs/opensearch-dual-write/docker-compose.yaml new file mode 100644 index 000000000..07f1f0bde --- /dev/null +++ b/shotover-proxy/tests/test-configs/opensearch-dual-write/docker-compose.yaml @@ -0,0 +1,88 @@ +version: '3' +networks: + cluster_subnet: + name: cluster_subnet + driver: bridge + ipam: + driver: default + config: + - subnet: 172.16.1.0/24 + gateway: 172.16.1.1 + +services: + opensearch-node1: + image: opensearchproject/opensearch:2.9.0 + container_name: opensearch-node1 + networks: + cluster_subnet: + ipv4_address: 172.16.1.2 + environment: + &environment + cluster.name: source-cluster + node.name: opensearch-node1 + bootstrap.memory_lock: "true" + OPENSEARCH_JAVA_OPTS: "-Xms512m -Xmx512m" + discovery.type: single-node + plugins.security.disabled: "true" + reindex.remote.allowlist: 172.16.1.2:9200 + ulimits: + memlock: + soft: -1 + hard: -1 + nofile: + soft: 65536 + hard: 65536 + volumes: + - type: volume + target: /usr/share/opensearch/data + # ports: + # - 9201:9200 + # - 9601:9600 + + opensearch-node2: + image: opensearchproject/opensearch:2.9.0 + container_name: opensearch-node2 + networks: + cluster_subnet: + ipv4_address: 172.16.1.3 + environment: + <<: *environment + cluster.name: target-cluster + node.name: opensearch-node2 + ulimits: + memlock: + soft: -1 + hard: -1 + nofile: + soft: 65536 + hard: 65536 + volumes: + - type: volume + target: /usr/share/opensearch/data + # ports: + # - 9202:9200 + # - 9602:9600 + # + # + + opensearch-dashboards: + image: opensearchproject/opensearch-dashboards:2.9.0 + container_name: opensearch-dashboards + environment: + OPENSEARCH_HOSTS: '["http://172.16.1.2:9200"]' + DISABLE_SECURITY_DASHBOARDS_PLUGIN: "true" + networks: + cluster_subnet: + ipv4_address: 172.16.1.4 + + opensearch-dashboard-2: + image: opensearchproject/opensearch-dashboards:2.9.0 + container_name: opensearch-dashboard-2 + environment: + OPENSEARCH_HOSTS: '["http://172.16.1.3:9200"]' + DISABLE_SECURITY_DASHBOARDS_PLUGIN: "true" + networks: + cluster_subnet: + ipv4_address: 172.16.1.5 + + diff --git a/shotover-proxy/tests/test-configs/opensearch-dual-write/topology.yaml b/shotover-proxy/tests/test-configs/opensearch-dual-write/topology.yaml new file mode 100644 index 000000000..2cc41a6ca --- /dev/null +++ b/shotover-proxy/tests/test-configs/opensearch-dual-write/topology.yaml @@ -0,0 +1,17 @@ +--- +sources: + - OpenSearch: + name: "OpenSearch" + listen_addr: "127.0.0.1:9200" + chain: + - Tee: + switch_port: 1234 + behavior: LogWarningOnMismatch + buffer_size: 10000 + chain: + - OpenSearchSinkSingle: + remote_address: "172.16.1.3:9200" + connect_timeout_ms: 3000 + - OpenSearchSinkSingle: + remote_address: "172.16.1.2:9200" + connect_timeout_ms: 3000 diff --git a/shotover/Cargo.toml b/shotover/Cargo.toml index d86e66396..2232faf0e 100644 --- a/shotover/Cargo.toml +++ b/shotover/Cargo.toml @@ -93,6 +93,7 @@ string = "0.3.0" xxhash-rust = { version = "0.8.6", features = ["xxh3"] } dashmap = "5.4.0" atoi = "2.0.0" +libflate = "2.0.0" [dev-dependencies] criterion = { version = "0.5.0", features = ["async_tokio"] } diff --git a/shotover/src/codec/opensearch.rs b/shotover/src/codec/opensearch.rs index 6a45beb98..3e9399954 100644 --- a/shotover/src/codec/opensearch.rs +++ b/shotover/src/codec/opensearch.rs @@ -117,9 +117,16 @@ impl OpenSearchDecoder { let mut headers = [httparse::EMPTY_HEADER; 16]; let mut response = httparse::Response::new(&mut headers); - let body_start = match response.parse(src).unwrap() { - httparse::Status::Complete(body_start) => body_start, - httparse::Status::Partial => return Ok(None), + let body_start = match response.parse(src) { + Ok(httparse::Status::Complete(body_start)) => body_start, + Ok(httparse::Status::Partial) => return Ok(None), + Err(err) => { + return Err(anyhow!( + "error: {} parsing response: {}", + err, + pretty_hex::pretty_hex(&src) + )) + } }; match response.version.unwrap() { 1 => (), @@ -196,6 +203,12 @@ impl Decoder for OpenSearchDecoder { content_length, }) = decode_result { + tracing::debug!( + "{}: incoming OpenSearch message:\n{}", + self.direction, + pretty_hex::pretty_hex(&src) + ); + self.state = State::ReadingBody(http_headers, content_length); src.advance(body_start); } else { @@ -258,7 +271,7 @@ impl Encoder for OpenSearchEncoder { Encodable::Frame(frame) => { let opensearch_frame = frame.into_opensearch().unwrap(); - match opensearch_frame.headers { + match &opensearch_frame.headers { HttpHead::Request(request_parts) => { self.last_outgoing_method .lock() @@ -313,46 +326,126 @@ impl Encoder for OpenSearchEncoder { } } -#[cfg(test)] -mod opensearch_tests { - use crate::codec::{opensearch::OpenSearchCodecBuilder, CodecBuilder, Direction}; - use bytes::BytesMut; - use tokio_util::codec::{Decoder, Encoder}; - - fn test_frame(raw_frame: &[u8], direction: Direction) { - let (mut decoder, mut encoder) = OpenSearchCodecBuilder::new(direction).build(); - let message = decoder - .decode(&mut BytesMut::from(raw_frame)) - .unwrap() - .unwrap(); - - let mut dest = BytesMut::new(); - encoder.encode(message, &mut dest).unwrap(); - assert_eq!(raw_frame, &dest); - } - - const RESPONSE: [u8; 186] = *b"\ - HTTP/1.1 200 OK\r\n\ - date: Mon, 27 Jul 2009 12:28:53 GMT\r\n\ - server: Apache/2.2.14 (Win32)\r\n\ - last-modified: Wed, 22 Jul 2009 19:15:56 GMT\r\n\ - content-length: 9\r\n\ - content-type: text/html\r\n\r\n\ - something"; - - const REQUEST: [u8; 90] = *b"\ - POST /cgi-bin/process.cgi HTTP/1.1\r\n\ - connection: Keep-Alive\r\n\ - content-length: 9\r\n\r\n\ - something"; - - #[test] - fn test_request() { - test_frame(&REQUEST, Direction::Source); - } - - #[test] - fn test_response() { - test_frame(&RESPONSE, Direction::Sink); - } -} +// #[cfg(test)] +// mod opensearch_tests { +// use crate::codec::{opensearch::OpenSearchCodecBuilder, CodecBuilder, Direction}; +// use bytes::BytesMut; +// use hex_literal::hex; +// use serde_json::Value; +// use tokio_util::codec::{Decoder, Encoder}; +// +// fn test_frame(raw_frame: &[u8], direction: Direction) { +// let (mut decoder, mut encoder) = OpenSearchCodecBuilder::new(direction).build(); +// let message = decoder +// .decode(&mut BytesMut::from(raw_frame)) +// .unwrap() +// .unwrap(); +// +// println!("message: {:?}", message); +// +// let mut dest = BytesMut::new(); +// encoder.encode(message, &mut dest).unwrap(); +// +// println!("dest: {:x?}", dest); +// println!("raw_frame: {:?}", raw_frame); +// +// assert_eq!(raw_frame, &dest); +// } +// +// const RESPONSE: [u8; 186] = *b"\ +// HTTP/1.1 200 OK\r\n\ +// date: Mon, 27 Jul 2009 12:28:53 GMT\r\n\ +// server: Apache/2.2.14 (Win32)\r\n\ +// last-modified: Wed, 22 Jul 2009 19:15:56 GMT\r\n\ +// content-length: 9\r\n\ +// content-type: text/html\r\n\r\n\ +// something"; +// +// const REQUEST: [u8; 90] = *b"\ +// POST /cgi-bin/process.cgi HTTP/1.1\r\n\ +// connection: Keep-Alive\r\n\ +// content-length: 9\r\n\r\n\ +// something"; +// +// const GZIP_RESPONSE: [u8; 594] = hex!("48 54 54 50 2f 31 2e 31 20 32 30 30 20 4f 4b 0d 0a 63 6f 6e 74 65 6e 74 2d 74 79 70 65 3a 20 61 70 70 6c 69 63 61 74 69 6f 6e 2f 6a 73 6f 6e 3b 20 63 68 61 72 73 65 74 3d 55 54 46 2d 38 0d 0a 63 6f 6e 74 65 6e 74 2d 65 6e 63 6f 64 69 6e 67 3a 20 67 7a 69 70 0d 0a 63 6f 6e 74 65 6e 74 2d 6c 65 6e 67 74 68 3a 20 34 38 33 0d 0a 0d 0a 1f 8b 08 00 00 00 00 00 00 00 cc 94 c9 6e db 30 10 86 5f 45 98 b3 52 78 5f 74 33 d2 16 c8 d6 c0 4e da 06 0d 0c 82 16 a9 98 30 25 2a 5c 0c 0b 82 df 3d 23 1a 41 63 a0 25 7b ec 45 20 31 ff 7c 9c e1 fc 54 0b 56 a9 1d 64 a3 71 0a 56 94 9c 11 e5 2c 64 05 95 86 a7 40 cc 96 6a 66 20 6b 51 66 a9 84 6c 98 82 71 79 ce 8d 29 1c 6e fb b8 dd 89 ba e6 0c b2 5e 0a 05 15 b2 5b 0e 4e 4b a7 39 e6 3e b7 e0 31 5e 21 2a c6 0f 90 c1 27 55 f3 ca 70 aa f3 ed 85 da 18 ae f7 74 23 a4 b0 0d a4 50 29 c6 51 b2 b9 5d 15 8b d5 ed f4 a1 bc bc d9 7d bd 53 e6 d1 bc 5c 2d 31 ae 39 35 aa f2 45 35 75 a7 7c 75 5c 37 a7 5a 09 3f e4 bc b6 02 e3 bf 85 f0 4d 25 25 ad 6b 51 bd 24 85 72 15 c3 af 4e 9e c5 3a 11 55 a2 34 e3 3a b1 2a 31 4a db c4 e7 fd 4b 91 5e 43 9c 13 d8 17 b0 5f 0b db 3c 95 bd c7 dc fd dc 7c a7 07 42 d8 e7 e5 02 8e c7 f4 4f bd 5b 6e ec c5 e9 8c ff a8 d9 b3 aa ce ba a3 d7 fb d5 6a 71 65 96 4d 79 7d f9 63 76 77 73 ff 85 b0 25 76 b7 3e a6 b0 15 f6 a3 3f 5a d8 53 e9 70 28 7d 1c b6 e6 92 fa 51 64 c0 5f 01 c5 25 5e 8d c9 95 c6 78 e5 a4 7c cf 46 87 90 f7 3b af 95 41 20 5a cf 5f 6c af 5b 9d 65 10 a3 9c ce 11 d0 82 f0 8e b2 c2 ca ce 04 f7 68 a8 07 6f a8 ee a4 6e 98 68 bd 1e 96 f8 57 78 3f 02 47 73 07 e1 fd 10 7c 10 81 e3 13 09 c2 07 21 f8 30 02 c7 57 1a 84 0f 43 f0 51 04 3e 8a c0 47 21 f8 38 02 f7 bf a1 c0 40 c7 21 f8 24 02 9f 44 e0 93 10 7c 1a 81 4f 23 f0 69 08 3e 8b c0 67 11 f8 2c 04 9f 47 e0 f3 08 7c be c6 97 7e 7c 03 00 00 ff ff 03 00 c2 4e 64 8e 2a 06 00 00"); +// +// // #[test] +// // fn test_request() { +// // test_frame(&REQUEST, Direction::Source); +// // } +// // +// // #[test] +// // fn test_response() { +// // test_frame(&RESPONSE, Direction::Sink); +// // } +// +// #[test] +// fn test_encoding() { +// test_frame(&GZIP_RESPONSE, Direction::Sink); +// } +// +// fn decode_gzip(bytes: Vec) -> Value { +// use libflate::gzip::Decoder; +// use libflate::gzip::Encoder; +// use std::io::{Read, Write}; +// +// let mut decoder = Decoder::new(&bytes[..]).unwrap(); +// +// let mut decoded_data = Vec::new(); +// decoder.read_to_end(&mut decoded_data).unwrap(); +// +// let json = serde_json::from_slice::(&decoded_data).unwrap(); +// +// println!("decoded json: {:?}", json); +// +// json +// } +// +// fn encode_gzip(value: Value) -> Vec { +// use libflate::gzip::Decoder; +// use libflate::gzip::Encoder; +// use std::io::{Read, Write}; +// +// let json_bytes = serde_json::to_vec(&value).unwrap(); +// +// let mut encoder = Encoder::new(Vec::new()).unwrap(); +// encoder.write_all(&json_bytes).unwrap(); +// let encoded_data = encoder.finish().into_result().unwrap(); +// +// encoded_data +// } +// +// #[test] +// fn test_gzip_codec() { +// use libflate::gzip::Decoder; +// use libflate::gzip::Encoder; +// use std::io::{Read, Write}; +// +// /* let bytes = hex!("1f 8b 08 00 00 00 00 00 00 00 cc 94 c9 6e db 30 10 86 5f 45 98 b3 52 78 5f 74 33 d2 16 c8 d6 c0 4e da 06 0d 0c 82 16 a9 98 30 25 2a 5c 0c 0b 82 df 3d 23 1a 41 63 a0 25 7b ec 45 20 31 ff 7c 9c e1 fc 54 0b 56 a9 1d 64 a3 71 0a 56 94 9c 11 e5 2c 64 05 95 86 a7 40 cc 96 6a 66 20 6b 51 66 a9 84 6c 98 82 71 79 ce 8d 29 1c 6e fb b8 dd 89 ba e6 0c b2 5e 0a 05 15 b2 5b 0e 4e 4b a7 39 e6 3e b7 e0 31 5e 21 2a c6 0f 90 c1 27 55 f3 ca 70 aa f3 ed 85 da 18 ae f7 74 23 a4 b0 0d a4 50 29 c6 51 b2 b9 5d 15 8b d5 ed f4 a1 bc bc d9 7d bd 53 e6 d1 bc 5c 2d 31 ae 39 35 aa f2 45 35 75 a7 7c 75 5c 37 a7 5a 09 3f e4 bc b6 02 e3 bf 85 f0 4d 25 25 ad 6b 51 bd 24 85 72 15 c3 af 4e 9e c5 3a 11 55 a2 34 e3 3a b1 2a 31 4a db c4 e7 fd 4b 91 5e 43 9c 13 d8 17 b0 5f 0b db 3c 95 bd c7 dc fd dc 7c a7 07 42 d8 e7 e5 02 8e c7 f4 4f bd 5b 6e ec c5 e9 8c ff a8 d9 b3 aa ce ba a3 d7 fb d5 6a 71 65 96 4d 79 7d f9 63 76 77 73 ff 85 b0 25 76 b7 3e a6 b0 15 f6 a3 3f 5a d8 53 e9 70 28 7d 1c b6 e6 92 fa 51 64 c0 5f 01 c5 25 5e 8d c9 95 c6 78 e5 a4 7c cf 46 87 90 f7 3b af 95 41 20 5a cf 5f 6c af 5b 9d 65 10 a3 9c ce 11 d0 82 f0 8e b2 c2 ca ce 04 f7 68 a8 07 6f a8 ee a4 6e 98 68 bd 1e 96 f8 57 78 3f 02 47 73 07 e1 fd 10 7c 10 81 e3 13 09 c2 07 21 f8 30 02 c7 57 1a 84 0f 43 f0 51 04 3e 8a c0 47 21 f8 38 02 f7 bf a1 c0 40 c7 21 f8 24 02 9f 44 e0 93 10 7c 1a 81 4f 23 f0 69 08 3e 8b c0 67 11 f8 2c 04 9f 47 e0 f3 08 7c be c6 97 7e 7c 03 00 00 ff ff 03 00 c2 4e 64 8e 2a 06 00 00"); */ +// +// let bytes = +// hex!("1f8b0800000000000003ab56ca48cdc9c957b252502acf2fca4951aa050022aea38612000000"); +// +// let json = decode_gzip(bytes.to_vec()); +// let encoded_data = encode_gzip(json.clone()); +// +// // assert_eq!(bytes.to_vec(), encoded_data); +// // +// let json = decode_gzip(encoded_data.clone()); +// let encoded_data2 = encode_gzip(json.clone()); +// assert_eq!(encoded_data, encoded_data2); +// +// // +// // let json_bytes = serde_json::to_vec(&json).unwrap(); +// // let new_json = serde_json::from_slice::(&json_bytes).unwrap(); +// // +// // assert_eq!(json,new_json); +// // assert_eq!(decoded_data, json_bytes); +// // +// /* let mut encoder = Encoder::new(Vec::new()).unwrap(); +// encoder.write_all(&decoded_data).unwrap(); +// let encoded_data = encoder.finish().into_result().unwrap(); +// +// assert_eq!(encoded_data, bytes); */ +// } +// } diff --git a/shotover/src/frame/opensearch.rs b/shotover/src/frame/opensearch.rs index fe76f16c5..2f3c280e0 100644 --- a/shotover/src/frame/opensearch.rs +++ b/shotover/src/frame/opensearch.rs @@ -1,19 +1,28 @@ use anyhow::Result; use bytes::Bytes; +use derivative::Derivative; use http::{HeaderMap, Method, StatusCode, Uri, Version}; +use serde_json::Value; +use std::fmt; -#[derive(Debug, Clone, PartialEq)] +use crate::message::QueryType; + +#[derive(Debug, Clone, Derivative)] +#[derivative(PartialEq)] pub struct ResponseParts { pub status: StatusCode, pub version: Version, + #[derivative(PartialEq = "ignore")] pub headers: HeaderMap, } -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, Derivative)] +#[derivative(PartialEq)] pub struct RequestParts { pub method: Method, pub uri: Uri, pub version: Version, + #[derivative(PartialEq = "ignore")] pub headers: HeaderMap, } @@ -23,12 +32,33 @@ pub enum HttpHead { Request(RequestParts), } -#[derive(Debug, Clone, PartialEq)] +impl HttpHead { + pub fn headers(&self) -> &HeaderMap { + match &self { + HttpHead::Response(response) => &response.headers, + HttpHead::Request(request) => &request.headers, + } + } +} + +#[derive(Clone, Derivative)] +#[derivative(PartialEq)] pub struct OpenSearchFrame { pub headers: HttpHead, + + #[derivative(PartialEq = "ignore")] pub body: Bytes, } +impl fmt::Debug for OpenSearchFrame { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("OpenSearchFrame") + .field("headers", &self.headers) + .field("body", &self.json_str()) + .finish() + } +} + impl OpenSearchFrame { pub fn new(headers: HttpHead, body: Bytes) -> Self { Self { headers, body } @@ -37,4 +67,61 @@ impl OpenSearchFrame { pub fn from_bytes(_bytes: &Bytes) -> Result { todo!(); } + + pub fn new_server_error_response() -> Self { + let headers = HttpHead::Response(ResponseParts { + status: StatusCode::INTERNAL_SERVER_ERROR, + version: Version::HTTP_11, + headers: HeaderMap::new(), + }); + let body = Bytes::new(); + Self::new(headers, body) + } + + pub fn get_query_type(&self) -> QueryType { + if let HttpHead::Request(request) = &self.headers { + match &request.method { + &Method::GET | &Method::HEAD => QueryType::Read, + &Method::POST | &Method::PUT | &Method::DELETE | &Method::PATCH => QueryType::Write, + m => { + tracing::warn!("handled method: {:?}", m); + QueryType::Read + } + } + } else { + QueryType::Read + } + } + + pub fn json_str(&self) -> String { + use http::header; + + if self.body.is_empty() { + return "".to_string(); + }; + + let body = if self + .headers + .headers() + .get(header::CONTENT_ENCODING) + .is_some() + { + use libflate::gzip::Decoder; + use std::io::Read; + + let mut decoder = Decoder::new(&self.body[..]).unwrap(); + + let mut decoded_data = Vec::new(); + decoder.read_to_end(&mut decoded_data).unwrap(); + + decoded_data + } else { + self.body.to_vec() + }; + + match serde_json::from_slice::(&body) { + Ok(json) => serde_json::to_string_pretty(&json).unwrap(), + Err(_) => format!("failed to parse json: {:?}", pretty_hex::pretty_hex(&body)), + } + } } diff --git a/shotover/src/message/mod.rs b/shotover/src/message/mod.rs index 31d1d3363..aea46fc4b 100644 --- a/shotover/src/message/mod.rs +++ b/shotover/src/message/mod.rs @@ -4,6 +4,7 @@ use crate::codec::kafka::RequestHeader; use crate::codec::CodecState; use crate::frame::cassandra::Tracing; use crate::frame::redis::redis_query_type; +use crate::frame::OpenSearchFrame; use crate::frame::{ cassandra, cassandra::{CassandraMetadata, CassandraOperation}, @@ -244,7 +245,7 @@ impl Message { Some(Frame::Redis(redis)) => redis_query_type(redis), // free-standing function as we cant define methods on RedisFrame Some(Frame::Kafka(_)) => todo!(), Some(Frame::Dummy) => todo!(), - Some(Frame::OpenSearch(_)) => todo!(), + Some(Frame::OpenSearch(os)) => os.get_query_type(), None => QueryType::ReadWrite, } } @@ -278,7 +279,7 @@ impl Message { Metadata::Kafka => return Err(anyhow!(error).context( "A generic error cannot be formed because the kafka protocol does not support it", )), - Metadata::OpenSearch => unimplemented!() + Metadata::OpenSearch => Frame::OpenSearch(OpenSearchFrame::new_server_error_response()), })) } @@ -295,14 +296,14 @@ impl Message { MessageType::Redis => Ok(Metadata::Redis), MessageType::Kafka => Ok(Metadata::Kafka), MessageType::Dummy => Err(anyhow!("Dummy has no metadata")), - MessageType::OpenSearch => Err(anyhow!("OpenSearch has no metadata")), + MessageType::OpenSearch => Ok(Metadata::OpenSearch), }, MessageInner::Parsed { frame, .. } | MessageInner::Modified { frame } => match frame { Frame::Cassandra(frame) => Ok(Metadata::Cassandra(frame.metadata())), Frame::Kafka(_) => Ok(Metadata::Kafka), Frame::Redis(_) => Ok(Metadata::Redis), Frame::Dummy => Err(anyhow!("dummy has no metadata")), - Frame::OpenSearch(_) => Err(anyhow!("OpenSearch has no metadata")), + Frame::OpenSearch(_) => Ok(Metadata::OpenSearch), }, } } diff --git a/shotover/src/server.rs b/shotover/src/server.rs index 8ba76af67..04718b53e 100644 --- a/shotover/src/server.rs +++ b/shotover/src/server.rs @@ -175,6 +175,7 @@ impl TcpCodecListener { "connection", id = self.connection_count, source = self.source_name.as_str(), + chain = self.chain_builder.name.as_str(), ); let transport = self.transport; async { diff --git a/shotover/src/transforms/chain.rs b/shotover/src/transforms/chain.rs index 66bb95028..fd5c98fc7 100644 --- a/shotover/src/transforms/chain.rs +++ b/shotover/src/transforms/chain.rs @@ -278,6 +278,8 @@ impl TransformChainBuilder { #[cfg(test)] let count_clone = count.clone(); + let span = tracing::error_span!("subchain"); + // Even though we don't keep the join handle, this thread will wrap up once all corresponding senders have been dropped. let mut chain = self.build(); @@ -329,7 +331,7 @@ impl TransformChainBuilder { ), } } - .in_current_span(), + .instrument(span), ); BufferedChain { diff --git a/shotover/src/transforms/mod.rs b/shotover/src/transforms/mod.rs index a6f99877a..b613ef28b 100644 --- a/shotover/src/transforms/mod.rs +++ b/shotover/src/transforms/mod.rs @@ -18,7 +18,7 @@ use crate::transforms::load_balance::ConnectionBalanceAndPool; use crate::transforms::loopback::Loopback; use crate::transforms::null::NullSink; #[cfg(feature = "alpha-transforms")] -use crate::transforms::opensearch::OpenSearchSinkSingle; +use crate::transforms::opensearch::sink_single::OpenSearchSinkSingle; use crate::transforms::parallel_map::ParallelMap; use crate::transforms::protect::Protect; use crate::transforms::query_counter::QueryCounter; diff --git a/shotover/src/transforms/opensearch/filter.rs b/shotover/src/transforms/opensearch/filter.rs new file mode 100644 index 000000000..e69de29bb diff --git a/shotover/src/transforms/opensearch/mod.rs b/shotover/src/transforms/opensearch/mod.rs index 1a84d10c2..c6e4fd3f5 100644 --- a/shotover/src/transforms/opensearch/mod.rs +++ b/shotover/src/transforms/opensearch/mod.rs @@ -1,119 +1 @@ -use crate::tcp; -use crate::transforms::{ - Messages, Transform, TransformBuilder, TransformConfig, Transforms, Wrapper, -}; -use crate::{ - codec::{opensearch::OpenSearchCodecBuilder, CodecBuilder, Direction}, - transforms::util::{ - cluster_connection_pool::{spawn_read_write_tasks, Connection}, - Request, - }, -}; -use anyhow::{anyhow, Result}; -use async_trait::async_trait; -use serde::{Deserialize, Serialize}; -use std::time::Duration; -use tokio::sync::oneshot; -use tracing::trace; - -#[derive(Serialize, Deserialize, Debug)] -pub struct OpenSearchSinkSingleConfig { - #[serde(rename = "remote_address")] - address: String, - connect_timeout_ms: u64, -} - -#[typetag::serde(name = "OpenSearchSinkSingle")] -#[async_trait(?Send)] -impl TransformConfig for OpenSearchSinkSingleConfig { - async fn get_builder(&self, chain_name: String) -> Result> { - Ok(Box::new(OpenSearchSinkSingleBuilder::new( - self.address.clone(), - chain_name, - self.connect_timeout_ms, - ))) - } -} - -#[derive(Clone)] -pub struct OpenSearchSinkSingleBuilder { - address: String, - connect_timeout: Duration, -} - -impl OpenSearchSinkSingleBuilder { - pub fn new(address: String, _chain_name: String, connect_timeout_ms: u64) -> Self { - let connect_timeout = Duration::from_millis(connect_timeout_ms); - - Self { - address, - connect_timeout, - } - } -} - -impl TransformBuilder for OpenSearchSinkSingleBuilder { - fn build(&self) -> Transforms { - Transforms::OpenSearchSinkSingle(OpenSearchSinkSingle { - address: self.address.clone(), - connect_timeout: self.connect_timeout, - codec_builder: OpenSearchCodecBuilder::new(Direction::Sink), - connection: None, - }) - } - - fn get_name(&self) -> &'static str { - "OpenSearchSinkSingle" - } - - fn is_terminating(&self) -> bool { - true - } -} - -pub struct OpenSearchSinkSingle { - address: String, - connection: Option, - connect_timeout: Duration, - codec_builder: OpenSearchCodecBuilder, -} - -#[async_trait] -impl Transform for OpenSearchSinkSingle { - async fn transform<'a>(&'a mut self, requests_wrapper: Wrapper<'a>) -> Result { - // Return immediately if we have no messages. - // If we tried to send no messages we would block forever waiting for a reply that will never come. - if requests_wrapper.requests.is_empty() { - return Ok(requests_wrapper.requests); - } - - if self.connection.is_none() { - trace!("creating outbound connection {:?}", self.address); - - let tcp_stream = tcp::tcp_stream(self.connect_timeout, self.address.clone()).await?; - let (rx, tx) = tcp_stream.into_split(); - self.connection = Some(spawn_read_write_tasks(&self.codec_builder, rx, tx)); - } - - let connection = self.connection.as_mut().unwrap(); - - let messages_len = requests_wrapper.requests.len(); - - let mut result = Vec::with_capacity(messages_len); - for message in requests_wrapper.requests { - let (tx, rx) = oneshot::channel(); - - connection - .send(Request { - message, - return_chan: Some(tx), - }) - .map_err(|_| anyhow!("Failed to send"))?; - - let message = rx.await?.response?; - result.push(message); - } - - Ok(result) - } -} +pub mod sink_single; diff --git a/shotover/src/transforms/opensearch/sink_single.rs b/shotover/src/transforms/opensearch/sink_single.rs new file mode 100644 index 000000000..1a84d10c2 --- /dev/null +++ b/shotover/src/transforms/opensearch/sink_single.rs @@ -0,0 +1,119 @@ +use crate::tcp; +use crate::transforms::{ + Messages, Transform, TransformBuilder, TransformConfig, Transforms, Wrapper, +}; +use crate::{ + codec::{opensearch::OpenSearchCodecBuilder, CodecBuilder, Direction}, + transforms::util::{ + cluster_connection_pool::{spawn_read_write_tasks, Connection}, + Request, + }, +}; +use anyhow::{anyhow, Result}; +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; +use std::time::Duration; +use tokio::sync::oneshot; +use tracing::trace; + +#[derive(Serialize, Deserialize, Debug)] +pub struct OpenSearchSinkSingleConfig { + #[serde(rename = "remote_address")] + address: String, + connect_timeout_ms: u64, +} + +#[typetag::serde(name = "OpenSearchSinkSingle")] +#[async_trait(?Send)] +impl TransformConfig for OpenSearchSinkSingleConfig { + async fn get_builder(&self, chain_name: String) -> Result> { + Ok(Box::new(OpenSearchSinkSingleBuilder::new( + self.address.clone(), + chain_name, + self.connect_timeout_ms, + ))) + } +} + +#[derive(Clone)] +pub struct OpenSearchSinkSingleBuilder { + address: String, + connect_timeout: Duration, +} + +impl OpenSearchSinkSingleBuilder { + pub fn new(address: String, _chain_name: String, connect_timeout_ms: u64) -> Self { + let connect_timeout = Duration::from_millis(connect_timeout_ms); + + Self { + address, + connect_timeout, + } + } +} + +impl TransformBuilder for OpenSearchSinkSingleBuilder { + fn build(&self) -> Transforms { + Transforms::OpenSearchSinkSingle(OpenSearchSinkSingle { + address: self.address.clone(), + connect_timeout: self.connect_timeout, + codec_builder: OpenSearchCodecBuilder::new(Direction::Sink), + connection: None, + }) + } + + fn get_name(&self) -> &'static str { + "OpenSearchSinkSingle" + } + + fn is_terminating(&self) -> bool { + true + } +} + +pub struct OpenSearchSinkSingle { + address: String, + connection: Option, + connect_timeout: Duration, + codec_builder: OpenSearchCodecBuilder, +} + +#[async_trait] +impl Transform for OpenSearchSinkSingle { + async fn transform<'a>(&'a mut self, requests_wrapper: Wrapper<'a>) -> Result { + // Return immediately if we have no messages. + // If we tried to send no messages we would block forever waiting for a reply that will never come. + if requests_wrapper.requests.is_empty() { + return Ok(requests_wrapper.requests); + } + + if self.connection.is_none() { + trace!("creating outbound connection {:?}", self.address); + + let tcp_stream = tcp::tcp_stream(self.connect_timeout, self.address.clone()).await?; + let (rx, tx) = tcp_stream.into_split(); + self.connection = Some(spawn_read_write_tasks(&self.codec_builder, rx, tx)); + } + + let connection = self.connection.as_mut().unwrap(); + + let messages_len = requests_wrapper.requests.len(); + + let mut result = Vec::with_capacity(messages_len); + for message in requests_wrapper.requests { + let (tx, rx) = oneshot::channel(); + + connection + .send(Request { + message, + return_chan: Some(tx), + }) + .map_err(|_| anyhow!("Failed to send"))?; + + let message = rx.await?.response?; + result.push(message); + } + + Ok(result) + } +} diff --git a/test-helpers/src/docker_compose.rs b/test-helpers/src/docker_compose.rs index baf43bffa..dd761483d 100644 --- a/test-helpers/src/docker_compose.rs +++ b/test-helpers/src/docker_compose.rs @@ -81,5 +81,10 @@ pub fn get_image_waiters() -> &'static [Image] { log_regex_to_wait_for: r"Node started", timeout: 120, }, + Image { + name: "opensearchproject/opensearch-dashboards:2.9.0", + log_regex_to_wait_for: r"Server running", + timeout: 120, + }, ] }