Skip to content

Commit

Permalink
more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
conorbros committed Oct 31, 2023
1 parent a625d21 commit b3730f1
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 12 deletions.
86 changes: 76 additions & 10 deletions shotover-proxy/tests/opensearch_int_tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,41 @@
use crate::shotover_process;
use hyper::{Body as HyperBody, Client, Method as HyperMethod, Request, Response as HyperResponse};
use opensearch::{
auth::Credentials,
cert::CertificateValidation,
cluster::ClusterHealthParts,
http::{
headers::{HeaderName, HeaderValue},
response::Response,
response::Response,
transport::{SingleNodeConnectionPool, TransportBuilder},
Method, StatusCode, Url,
},
indices::{IndicesCreateParts, IndicesDeleteParts, IndicesExistsParts},
indices::{IndicesCreateParts, IndicesDeleteParts, IndicesExistsParts},
nodes::NodesInfoParts,
indices::{IndicesCreateParts, IndicesDeleteParts, IndicesExistsParts, IndicesGetParts},
params::Refresh,
params::WaitForStatus,
params::{Refresh, WaitForStatus},
BulkOperation, BulkParts, DeleteParts, Error, IndexParts, OpenSearch, SearchParts,
};
use serde_json::{json, Value};
use test_helpers::docker_compose::docker_compose;
use test_helpers::shotover_process::{EventMatcher, Level};
use tokio::time::Duration;

async fn hyper_request(
uri: String,
method: HyperMethod,
body: HyperBody,
) -> HyperResponse<HyperBody> {
let client = Client::new();

let req = Request::builder()
.method(method)
.uri(uri)
.body(body)
.expect("request builder");

client.request(req).await.unwrap()
}

async fn assert_ok_and_get_json(response: Result<Response, Error>) -> Value {
let response = response.unwrap();
let status = response.status_code();
Expand All @@ -41,9 +54,23 @@ async fn assert_ok_and_get_json(response: Result<Response, Error>) -> Value {
}
}

async fn get_cluster_name(client: &OpenSearch) -> String {
let response = assert_ok_and_get_json(
client
.cluster()
.stats(opensearch::cluster::ClusterStatsParts::None)
.send()
.await,
)
.await;

response["cluster_name"].as_str().unwrap().to_owned()
}

async fn assert_ok_and_same_data(
response_a: Result<Response, Error>,
response_b: Result<Response, Error>,
warn: bool,
) {
let mut response_a = assert_ok_and_get_json(response_a).await["hits"]["hits"]
.as_array()
Expand All @@ -54,7 +81,12 @@ async fn assert_ok_and_same_data(
.unwrap()
.clone();

assert_eq!(response_a.len(), response_b.len());
if !warn {
assert_eq!(response_a.len(), response_b.len());
} else if response_a.len() != response_b.len() {
println!("Response A len: {:#?}", response_a.len());
println!("Response B len: {:#?}", response_b.len());
}

response_a.sort_by(|a, b| {
let a_age = a["_source"]["age"].as_i64().unwrap();
Expand All @@ -68,7 +100,12 @@ async fn assert_ok_and_same_data(
a_age.cmp(&b_age)
});

assert_eq!(response_a, response_b,);
if !warn {
assert_eq!(response_a, response_b);
} else if response_a != response_b {
println!("Response A: {:#?}", response_a);
println!("Response B: {:#?}", response_b);
}
}

pub async fn test_bulk(client: &OpenSearch) {
Expand Down Expand Up @@ -308,6 +345,19 @@ async fn passthrough_standard() {
let addr = "http://localhost:9201";
let client = create_client(addr);

let res = client
.nodes()
.info(NodesInfoParts::None)
.header(
HeaderName::from_lowercase(b"accept-encoding").unwrap(),
HeaderValue::from_str("").unwrap(),
)
.send()
.await
.unwrap();

println!("{:#?}", res);

opensearch_test_suite(&client).await;

shotover.shutdown_and_then_consume_events(&[]).await;
Expand Down Expand Up @@ -420,9 +470,14 @@ async fn dual_write_reindex() {
.start()
.await;

let shotover_client = create_client(shotover_addr);
let source_client = create_client(source_addr);
let target_client = create_client(target_addr);
let shotover_client = create_client(shotover_addr);

// verify that shotover is returning responses from the source clusters
let cluster_name = get_cluster_name(&shotover_client).await;
println!("Shotover is returning responses from {:?}", cluster_name);
assert_eq!(cluster_name, "source-cluster");

// Create indexes in source cluster
assert_ok_and_get_json(
Expand Down Expand Up @@ -548,7 +603,7 @@ async fn dual_write_reindex() {
}))
.send();

assert_ok_and_same_data(target.await, source.await).await;
assert_ok_and_same_data(target.await, source.await, true).await;

// verify both clusters end up in the same state
let target = target_client
Expand Down Expand Up @@ -577,7 +632,18 @@ async fn dual_write_reindex() {
}))
.send();

assert_ok_and_same_data(target.await, source.await).await;
assert_ok_and_same_data(target.await, source.await, true).await;

// switch shotover to the target cluster
let _ = hyper_request(
format!("http://localhost:{}/transform/tee/result-source", 1234),
HyperMethod::PUT,
HyperBody::from("tee-chain"),
)
.await;
let cluster_name = get_cluster_name(&shotover_client).await;
println!("Shotover is returning responses from {:?}", cluster_name);
assert_eq!(cluster_name, "target-cluster");

shotover
.shutdown_and_then_consume_events(&[EventMatcher::new()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ services:
ipv4_address: 172.16.1.2
environment:
&environment
cluster.name: opensearch-cluster-1
cluster.name: source-cluster
node.name: opensearch-node1
bootstrap.memory_lock: "true"
OPENSEARCH_JAVA_OPTS: "-Xms512m -Xmx512m"
Expand Down Expand Up @@ -47,7 +47,7 @@ services:
ipv4_address: 172.16.1.3
environment:
<<: *environment
cluster.name: opensearch-cluster-2
cluster.name: target-cluster
node.name: opensearch-node2
ulimits:
memlock:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ sources:
listen_addr: "127.0.0.1:9200"
chain:
- Tee:
switch_port: 1234
behavior: LogWarningOnMismatch
buffer_size: 10000
chain:
Expand Down

0 comments on commit b3730f1

Please sign in to comment.