Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Catalogue extender #26

Merged
merged 2 commits into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 32 additions & 14 deletions src/catalogue.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::{collections::BTreeMap, time::Duration};
use std::{collections::BTreeMap, sync::Arc, time::Duration};

use reqwest::Url;
use serde_json::{json, Value};
use tracing::{debug, info};
use tokio::sync::Mutex;
use tracing::{debug, info, warn};

pub type Criteria = BTreeMap<String, u64>;

Expand All @@ -16,39 +17,56 @@ fn get_element<'a>(count: &'a CriteriaGroups, key1: &'a str, key2: &'a str, key3
.and_then(|criteria| criteria.get(key3))
}

pub fn spawn_thing(catalogue_url: Url, prism_url: Url) -> Arc<Mutex<Value>> {
let thing: Arc<Mutex<Value>> = Arc::default();
let thing1 = thing.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(10)).await;
loop {
match get_extended_json(catalogue_url.clone(), prism_url.clone()).await {
Ok(new_value) => {
*thing1.lock().await = new_value;
info!("Updated Catalogue!");
tokio::time::sleep(Duration::from_secs(60 * 60)).await;
},
Err(err) => {
warn!("Failed to get thing: {err}.\n Retrying in 5s.");
tokio::time::sleep(Duration::from_secs(5)).await;
},
}
}
});

thing
}

pub async fn get_extended_json(catalogue_url: Url, prism_url: Url) -> Value {
pub async fn get_extended_json(catalogue_url: Url, prism_url: Url) -> Result<Value, reqwest::Error> {
debug!("Fetching catalogue from {catalogue_url} ...");

let resp = reqwest::Client::new()
.get(catalogue_url)
.timeout(Duration::from_secs(30))
.send()
.await
.expect("Unable to fetch catalogue from upstream; please check URL specified in config.");
.await?;

let mut json: Value = resp.json().await
.expect("Unable to parse catalogue from upstream; please check URL specified in config.");
let mut json: Value = resp.json().await?;

// tokio::time::sleep(Duration::from_secs(10)).await;

let prism_resp = reqwest::Client::new()
.post(format!("{}criteria", prism_url))
.header("Content-Type", "application/json")
.body("{\"sites\": []}")
.timeout(Duration::from_secs(300))
.send()
.await
.expect("Unable to fetch response from Prism; please check it's running.");
.await?;

let mut counts: CriteriaGroups = prism_resp.json().await
.expect("Unable to parse response from Prism into CriteriaGroups");
let mut counts: CriteriaGroups = prism_resp.json().await?;

recurse(&mut json, &mut counts); //TODO remove from counts once copied into catalogue to make it O(n log n)

info!("Catalogue built successfully.");

json
Ok(json)
}

/// Key order: group key (e.g. patient)
Expand Down Expand Up @@ -119,4 +137,4 @@ fn recurse(json: &mut Value, counts: &mut CriteriaGroups) {
},
_ => {}
}
}
}
17 changes: 14 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use axum::{
extract::{Json, Path, Query, State},
http::HeaderValue,
Expand All @@ -13,6 +15,7 @@ use once_cell::sync::Lazy;
use reqwest::{header, Method, StatusCode};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio::sync::Mutex;
use tower_http::cors::CorsLayer;
use tracing::{info, warn, Level};
use tracing_subscriber::{util::SubscriberInitExt, EnvFilter};
Expand All @@ -34,7 +37,7 @@ static BEAM_CLIENT: Lazy<BeamClient> = Lazy::new(|| {

#[derive(Clone)]
struct SharedState {
extended_json: Value,
extended_json: Arc<Mutex<Value>>,
}

#[tokio::main]
Expand All @@ -45,6 +48,13 @@ async fn main() {
.finish()
.init();

// TODO: Remove this workaround once clap manages to not choke on URL "".
if let Ok(var) = std::env::var("CATALOGUE_URL") {
if var.is_empty() {
std::env::remove_var("CATALOGUE_URL");
}
}

info!("{:#?}", Lazy::force(&CONFIG));

let cors = CorsLayer::new()
Expand All @@ -53,7 +63,7 @@ async fn main() {
.allow_headers([header::CONTENT_TYPE]);

let make_service = if let Some(url) = CONFIG.catalogue_url.clone() {
let extended_json = catalogue::get_extended_json(url, CONFIG.prism_url.clone()).await;
let extended_json = catalogue::spawn_thing(url, CONFIG.prism_url.clone());
let state = SharedState { extended_json };

let app = Router::new()
Expand Down Expand Up @@ -161,5 +171,6 @@ fn convert_response(response: reqwest::Response) -> axum::response::Response {
}

async fn handle_get_catalogue(State(state): State<SharedState>) -> Json<Value> {
Json(state.extended_json)
// TODO: We can totally avoid this clone by using axum_extra ErasedJson
Json(state.extended_json.lock().await.clone())
}
Loading