Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
conorbros committed Sep 14, 2023
1 parent c58e10b commit 3f3453a
Show file tree
Hide file tree
Showing 9 changed files with 492 additions and 61 deletions.
52 changes: 52 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion shotover-proxy/config/config.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
---
main_log_level: "info,shotover_proxy=info"
main_log_level: "debug,shotover_proxy=debug"
observability_interface: "0.0.0.0:9001"
178 changes: 168 additions & 10 deletions shotover-proxy/tests/opensearch_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,24 @@ use crate::shotover_process;
use opensearch::{
auth::Credentials,
cert::CertificateValidation,
http::response::Response,
cluster::ClusterHealthParts,
http::{
response::Response,
transport::{SingleNodeConnectionPool, TransportBuilder},
Method, StatusCode, Url,
},
indices::{IndicesCreateParts, IndicesDeleteParts, IndicesExistsParts},
indices::{IndicesCreateParts, IndicesDeleteParts, IndicesExistsParts, IndicesGetParts},
params::Refresh,
params::WaitForStatus,
BulkOperation, BulkParts, DeleteParts, Error, IndexParts, OpenSearch, SearchParts,
};
use serde_json::{json, Value};
use test_helpers::docker_compose::docker_compose;
// use tokio::{
// sync::oneshot,
// task::JoinHandle,
// time::{interval, Duration},
// };

async fn assert_ok_and_get_json(response: Result<Response, Error>) -> Value {
let response = response.unwrap();
Expand Down Expand Up @@ -226,6 +233,24 @@ async fn opensearch_test_suite(client: &OpenSearch) {
test_delete_index(client).await;
}

fn create_client(addr: &str) -> OpenSearch {
let url = Url::parse(addr).unwrap();
let credentials = Credentials::Basic("admin".into(), "admin".into());
let transport = TransportBuilder::new(SingleNodeConnectionPool::new(url))
.cert_validation(CertificateValidation::None)
.auth(credentials)
.build()
.unwrap();
let client = OpenSearch::new(transport);

client
.cluster()
.health(ClusterHealthParts::None)
.wait_for_status(WaitForStatus::Green);

client
}

#[tokio::test(flavor = "multi_thread")]
async fn passthrough_standard() {
let _compose = docker_compose("tests/test-configs/opensearch-passthrough/docker-compose.yaml");
Expand All @@ -235,17 +260,150 @@ async fn passthrough_standard() {
.await;

let addr = "http://localhost:9201";
let client = create_client(addr);

let url = Url::parse(addr).unwrap();
let credentials = Credentials::Basic("admin".into(), "admin".into());
let transport = TransportBuilder::new(SingleNodeConnectionPool::new(url))
.cert_validation(CertificateValidation::None)
.auth(credentials)
.build()
opensearch_test_suite(&client).await;

shotover.shutdown_and_then_consume_events(&[]).await;
}

#[tokio::test(flavor = "multi_thread")]
async fn dual_write_basic() {
let _compose = docker_compose("tests/test-configs/opensearch-dual-write/docker-compose.yaml");

let addr1 = "http://localhost:9201";
let client1 = create_client(addr1);
let addr2 = "http://localhost:9202";
let client2 = create_client(addr2);

let shotover = shotover_process("tests/test-configs/opensearch-dual-write/topology.yaml")
.start()
.await;

let shotover_client = create_client("http://localhost:9200");

shotover_client
.indices()
.create(IndicesCreateParts::Index("test-index"))
.send()
.await
.unwrap();
let client = OpenSearch::new(transport);

opensearch_test_suite(&client).await;
let exists_response = shotover_client
.indices()
.exists(IndicesExistsParts::Index(&["test-index"]))
.send()
.await
.unwrap();

assert_eq!(exists_response.status_code(), StatusCode::OK);

shotover_client
.index(IndexParts::Index("test-index"))
.body(json!({
"name": "John",
"age": 30
}))
.refresh(Refresh::WaitFor)
.send()
.await
.unwrap();

for client in &[shotover_client, client1, client2] {
let response = client
.search(SearchParts::Index(&["test-index"]))
.from(0)
.size(10)
.body(json!({
"query": {
"match": {
"name": "John",
}
}
}))
.send()
.await
.unwrap();

let results = response.json::<Value>().await.unwrap();
assert!(results["took"].as_i64().is_some());
assert_eq!(
results["hits"].as_object().unwrap()["hits"]
.as_array()
.unwrap()
.len(),
1
);
}

shotover.shutdown_and_then_consume_events(&[]).await;
}

// async fn start_writer_thread(
// client: OpenSearch,
// mut shutdown_notification_rx: oneshot::Receiver<()>,
// ) -> tokio::task::JoinHandle<()> {
// tokio::spawn(async move {
// let mut i = 0;
// let mut interval = interval(Duration::from_millis(100));
//
// loop {
// tokio::select! {
// _ = interval.tick() => {
// // TODO send the message to opensearch
// },
// _ = &mut shutdown_notification_rx => {
// println!("shutting down writer thread");
// break;
// }
// }
// }
// })
// }

#[tokio::test(flavor = "multi_thread")]
async fn dual_write() {
// let shotover_addr = "http://localhost:9200";
let source_addr = "http://localhost:9201";
// let target_addr = "http://localhost:9202";

let _compose = docker_compose("tests/test-configs/opensearch-dual-write/docker-compose.yaml");

let shotover = shotover_process("tests/test-configs/opensearch-dual-write/topology.yaml")
.start()
.await;

// let shotover_client = create_client(shotover_addr);
let source_client = create_client(source_addr);
// let target_client = create_client(target_addr);

// Create indexes in source cluster
assert_ok_and_get_json(
source_client
.indices()
.create(IndicesCreateParts::Index("test-index"))
.send()
.await,
)
.await;

// Get index info from source cluster and create in target cluster

let index_info = assert_ok_and_get_json(
source_client
.indices()
.get(IndicesGetParts::Index(&["test-index"]))
.send()
.await,
)
.await;

println!("{:?}", index_info);

// Begin dual writes and verify data ends up in both clusters
// Begin reindex operations
// Continue dual writing until reindex operation complete
// verify both clusters end up in the same state

shotover.shutdown_and_then_consume_events(&[]).await;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
version: '3'
services:
opensearch-node1:
image: opensearchproject/opensearch:2.9.0
container_name: opensearch-node1
environment:
- cluster.name=opensearch-cluster-1
- node.name=opensearch-node1
- bootstrap.memory_lock=true
- "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m"
- discovery.type=single-node
- plugins.security.disabled=true
ulimits:
memlock:
soft: -1
hard: -1
nofile:
soft: 65536
hard: 65536
volumes:
- type: volume
target: /usr/share/opensearch/data
ports:
- 9201:9200
- 9601:9600

opensearch-node2:
image: opensearchproject/opensearch:2.9.0
container_name: opensearch-node2
environment:
- cluster.name=opensearch-cluster-2
- node.name=opensearch-node2
- bootstrap.memory_lock=true
- "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m"
- discovery.type=single-node
- plugins.security.disabled=true
ulimits:
memlock:
soft: -1
hard: -1
nofile:
soft: 65536
hard: 65536
volumes:
- type: volume
target: /usr/share/opensearch/data
ports:
- 9202:9200
- 9602:9600

Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
---
sources:
opensearch_prod:
OpenSearch:
listen_addr: "127.0.0.1:9200"
chain_config:
main_chain:
- Tee:
behavior: FailOnMismatch
buffer_size: 10000
chain:
- OpenSearchSinkSingle:
remote_address: "127.0.0.1:9202"
connect_timeout_ms: 3000
- OpenSearchSinkSingle:
remote_address: "127.0.0.1:9201"
connect_timeout_ms: 3000
source_to_chain_mapping:
opensearch_prod: main_chain
1 change: 1 addition & 0 deletions shotover/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ string = "0.3.0"
xxhash-rust = { version = "0.8.6", features = ["xxh3"] }
dashmap = "5.4.0"
atoi = "2.0.0"
libflate = "2.0.0"

[dev-dependencies]
criterion = { version = "0.5.0", features = ["async_tokio"] }
Expand Down
Loading

0 comments on commit 3f3453a

Please sign in to comment.