Skip to content

Commit

Permalink
Merge pull request #63 from TranslatorSRI/misc
Browse files Browse the repository at this point in the history
Misc
  • Loading branch information
jdr0887 authored Nov 15, 2024
2 parents 2356a13 + 0134eda commit 1500170
Show file tree
Hide file tree
Showing 11 changed files with 60 additions and 97 deletions.
7 changes: 1 addition & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,11 @@ lazy_static! {
pub static ref WHITELISTED_TEMPLATE_QUERIES: Vec<Box<dyn template::CQSTemplate>> = vec![
Box::new(template::ClinicalKPs::new()),
Box::new(template::OpenPredict::new()),
Box::new(template::ServiceProviderAeolus::new()),
Box::new(template::SpokeChembl::new()),
Box::new(template::MoleProChembl::new()),
Box::new(template::RTXKG2SemMed::new()),
Box::new(template::ServiceProviderSemMed::new()),
Box::new(template::ServiceProviderChembl::new()),
Box::new(template::ServiceProviderTMKPTargeted::new()),
Box::new(template::MultiomicsCTKP::new()),
Box::new(template::MultiomicsDrugApprovalsFAERS::new()),
Box::new(template::CAMKP::new()),
];
pub static ref DB_POOL: AsyncOnce<bb8::Pool<AsyncDieselConnectionManager<AsyncPgConnection>>> = AsyncOnce::new(async {
Expand Down Expand Up @@ -89,7 +86,6 @@ lazy_static! {
#[openapi]
#[post("/asyncquery", data = "<data>")]
async fn asyncquery(data: Json<AsyncQuery>) -> Result<Json<AsyncQueryResponse>, status::Custom<Json<AsyncQuery>>> {
info!("ENTERING asyncquery(Json<AsyncQuery>)");
let query: AsyncQuery = data.clone().into_inner();

if let Some(query_graph) = &query.message.query_graph {
Expand All @@ -105,7 +101,6 @@ async fn asyncquery(data: Json<AsyncQuery>) -> Result<Json<AsyncQueryResponse>,
let job_id = job_actions::insert(&job).await.expect("Could not insert Job into DB");
let mut ret = AsyncQueryResponse::new(job_id.to_string());
ret.status = Some(JobStatus::Queued.to_string());
info!("LEAVING asyncquery(Json<AsyncQuery>) - OK");
return Ok(Json(ret));
} else {
let mut message = query.message.clone();
Expand Down
42 changes: 7 additions & 35 deletions src/template.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,41 +60,13 @@ impl_wrapper!(
"n0", //template_disease_node_id
util::compute_composite_score
);
// impl_wrapper!(
// ConnectionHypothesis,
// "mvp1-template2-connections-hypothesis/mvp1-template2-connection-hypothesis.json",
// "n0", //template_drug_node_id
// "n1", //template_disease_node_id
// compute_composite_score
// );
impl_wrapper!(
OpenPredict,
"mvp1-templates/mvp1-template3-openpredict/mvp1-template3-openpredict.json",
"n0", //template_drug_node_id
"n1", //template_disease_node_id
util::compute_composite_score
);
impl_wrapper!(
ServiceProviderAeolus,
"mvp1-templates/mvp1-template4-service-provider-aeolus/mvp1-template4-service-provider-aeolus.json",
"n0", //template_drug_node_id
"n1", //template_disease_node_id
util::compute_composite_score
);
impl_wrapper!(
SpokeChembl,
"mvp1-templates/mvp1-template5-spoke-chembl/mvp1-template5-spoke-chembl.json",
"n00", //template_drug_node_id
"n01", //template_disease_node_id
util::compute_composite_score
);
impl_wrapper!(
MoleProChembl,
"mvp1-templates/mvp1-template6-molepro-chembl/mvp1-template6-molepro-chembl.json",
"n00", //template_drug_node_id
"n01", //template_disease_node_id
util::compute_composite_score
);
impl_wrapper!(
RTXKG2SemMed,
"mvp1-templates/mvp1-template7-rtxkg2-semmed/mvp1-template7-rtxkg2-semmed.json",
Expand All @@ -109,13 +81,6 @@ impl_wrapper!(
"n1", //template_disease_node_id
util::compute_composite_score
);
impl_wrapper!(
ServiceProviderChembl,
"mvp1-templates/mvp1-template9-service-provider-chembl/mvp1-template9-service-provider-chembl.json",
"n00", //template_drug_node_id
"n01", //template_disease_node_id
util::compute_composite_score
);
impl_wrapper!(
ServiceProviderTMKPTargeted,
"mvp1-templates/mvp1-template10-service-provider-tmkp-targeted/mvp1-template10-service-provider-tmkp-targeted.json",
Expand All @@ -130,6 +95,13 @@ impl_wrapper!(
"n01", //template_disease_node_id
util::compute_composite_score
);
impl_wrapper!(
MultiomicsDrugApprovalsFAERS,
"mvp1-templates/mvp-template-12-multiomics-drugapprovals-faers/mvp-template-12-multiomics-drugapprovals-faers.json",
"n0", //template_drug_node_id
"n1", //template_disease_node_id
util::compute_composite_score
);
impl_wrapper!(
CAMKP,
"mvp2-templates/mvp2-template1-clinical-kps-cam-kp/mvp2-template1-clinical-kps-cam-kp.json",
Expand Down
92 changes: 44 additions & 48 deletions src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use futures::future::join_all;
use itertools::Itertools;
use merge_hashmap::Merge;
use rayon::prelude::*;
use rocket::form::validate::Contains;
use serde_json::Value;
use std::cmp::Ordering;
use std::collections::{BTreeMap, HashMap};
Expand Down Expand Up @@ -384,7 +383,7 @@ pub fn compute_composite_score(entry_values: Vec<CQSCompositeScoreValue>) -> f64
}
}

pub async fn process(query_graph: &QueryGraph, cqs_query: &Box<dyn template::CQSTemplate>, ids: &Vec<trapi_model_rs::CURIE>) -> Option<Response> {
pub async fn send_to_wfr(cqs_query: &Box<dyn template::CQSTemplate>, query: &trapi_model_rs::Query) -> Option<Response> {
let request_client = REQWEST_CLIENT.get().await;

let workflow_runner_url = format!(
Expand All @@ -395,14 +394,6 @@ pub async fn process(query_graph: &QueryGraph, cqs_query: &Box<dyn template::CQS
let backoff_multiplier = 2;
let retries = 3;

let mut query_template: QueryTemplate = cqs_query.render_query_template(ids.clone());

let attribute_constraint = query_template.first_edge_attribute_constraint();

query_template.remove_edge_attribute_constraints();
let query = query_template.to_query();
info!("cqs_query {} being sent to WFR: {}", cqs_query.name(), serde_json::to_string(&query).unwrap());

let mut trapi_response = None;
for attempt in 1..=retries {
debug!("attempt: {} for cqs_query.name(): {}", attempt, cqs_query.name());
Expand Down Expand Up @@ -434,10 +425,20 @@ pub async fn process(query_graph: &QueryGraph, cqs_query: &Box<dyn template::CQS
tokio::time::sleep(Duration::from_secs(retry_backoff_sleep_duration)).await;
}
}
trapi_response
}

if let Some(mut tr) = trapi_response {
pub async fn process(query_graph: &QueryGraph, cqs_query: &Box<dyn template::CQSTemplate>, ids: &Vec<trapi_model_rs::CURIE>) -> Option<Response> {
let mut query_template: QueryTemplate = cqs_query.render_query_template(ids.clone());

let attribute_constraint = query_template.first_edge_attribute_constraint();

query_template.remove_edge_attribute_constraints();
let query = query_template.to_query();
info!("cqs_query {} being sent to WFR: {}", cqs_query.name(), serde_json::to_string(&query).unwrap());

if let Some(mut tr) = send_to_wfr(cqs_query, &query).await {
let uuid = uuid::Uuid::new_v4().to_string();
// intended for debugging...writes
write_wfr_response("pre", &tr, &uuid, &cqs_query.name());

if let (Some(ac), Some(kg)) = (attribute_constraint, &mut tr.message.knowledge_graph) {
Expand Down Expand Up @@ -472,47 +473,42 @@ pub async fn process(query_graph: &QueryGraph, cqs_query: &Box<dyn template::CQS
if let Some(results) = &mut tr.message.results {
if let Some(limit) = query_template.cqs.results_limit {
let truncate_size = (crate::TRAPI_MESSAGE_RESULT_LIMIT.clone() as f32).div(limit).round() as usize;
info!("cqs_query: {} - results.len(): {}, truncate_size: {}", cqs_query.name(), results.len(), truncate_size);
results.truncate(truncate_size);
}
let query_graph_edge_sub_and_obj_keys = query_graph
.edges
.iter()
.map(|(_k, v)| (v.subject.clone(), v.object.clone()))
.collect::<Vec<(String, String)>>();
if let Some((subject_key, object_key)) = query_graph_edge_sub_and_obj_keys.first() {
let result_node_bindings_subject_object_pairs: Vec<(String, String)> = results
.iter()
.map(|result| {
let nb_subjects = result.node_bindings.get(subject_key).unwrap();
let nb_subject = nb_subjects.first().unwrap();

let nb_objects = result.node_bindings.get(object_key).unwrap();
let nb_object = nb_objects.first().unwrap();

(nb_subject.id.clone(), nb_object.id.clone())
if let Some(knowledge_graph) = &mut tr.message.knowledge_graph {
let edge_binding_ids: Vec<String> = results
.iter()
.map(|r| {
r.analyses
.iter()
.map(|a| {
a.edge_bindings
.iter()
.map(|(_eb_key, eb_value)| eb_value.iter().map(|eb| eb.id.clone()).collect_vec())
.flatten()
.collect_vec()
})
.flatten()
.collect_vec()
})
.flatten()
.collect();
if let Some(knowledge_graph) = &mut tr.message.knowledge_graph {
let kg_keys_to_remove = knowledge_graph
.edges
.iter()
.filter_map(|(k, v)| {
let kg_edge = (v.subject.clone(), v.object.clone());
match !result_node_bindings_subject_object_pairs.contains(kg_edge) {
true => Some(k.clone()),
false => None,
}
})
.collect_vec();
kg_keys_to_remove.iter().for_each(|k| {
knowledge_graph.edges.remove(k);
});
if let Some(aux_graphs) = &mut tr.message.auxiliary_graphs {
kg_keys_to_remove.iter().for_each(|k| {
if aux_graphs.contains_key(k) {
aux_graphs.remove(k);
}
});
debug!("cqs_query: {} - edge_binding_ids: {:?}", cqs_query.name(), edge_binding_ids);
debug!("cqs_query: {} - knowledge_graph.edges.keys(): {:?}", cqs_query.name(), knowledge_graph.edges.keys());

for key in knowledge_graph.edges.keys().cloned().collect_vec() {
if !edge_binding_ids.contains(&key) {
knowledge_graph.edges.remove(&key);
}
}

if let Some(aux_graphs) = &mut tr.message.auxiliary_graphs {
for key in aux_graphs.keys().cloned().collect_vec() {
if !edge_binding_ids.contains(&key) {
aux_graphs.remove(&key);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
}
},
"cqs": {
"results_limit": 5.6,
"results_limit": 8.4,
"edge_sources": [
{
"resource_id": "infores:cqs",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
}
},
"cqs": {
"results_limit": 5.6,
"results_limit": 8.4,
"edge_sources": [
{
"resource_id": "infores:cqs",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
}
},
"cqs": {
"results_limit": 16,
"results_limit": 17,
"edge_sources": [
{
"resource_id": "infores:cqs",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
}
},
"cqs": {
"results_limit": 5.6,
"results_limit": 8.4,
"edge_sources": [
{
"resource_id": "infores:cqs",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
}
},
"cqs": {
"results_limit": 16,
"results_limit": 17,
"edge_sources": [
{
"resource_id": "infores:cqs",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
}
},
"cqs": {
"results_limit": 5.6,
"results_limit": 8.4,
"edge_sources": [
{
"resource_id": "infores:cqs",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
}
},
"cqs": {
"results_limit": 5.6,
"results_limit": 8.4,
"edge_sources": [
{
"resource_id": "infores:cqs",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
}
},
"cqs": {
"results_limit": 16,
"results_limit": 17,
"edge_sources": [
{
"resource_id": "infores:cqs",
Expand Down

0 comments on commit 1500170

Please sign in to comment.