Skip to content

Commit

Permalink
add display types to nodes based on rarest type property
Browse files Browse the repository at this point in the history
  • Loading branch information
jamesamcl committed Sep 11, 2024
1 parent b736230 commit 65ff063
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 46 deletions.
16 changes: 14 additions & 2 deletions 04_index/grebi_index/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ fn main() {

let mut entity_props_to_count:HashMap<Vec<u8>,i64> = HashMap::new();
let mut edge_props_to_count:HashMap<Vec<u8>,i64> = HashMap::new();
let mut types_to_count:HashMap<Vec<u8>,i64> = HashMap::new();
let mut all_names:BTreeSet<Vec<u8>> = BTreeSet::new();
let mut all_ids:BTreeSet<Vec<u8>> = BTreeSet::new();

Expand Down Expand Up @@ -106,7 +107,9 @@ fn main() {
w_count = entity_props_to_count.get_mut(prop_key);
let count:&mut i64 = w_count.unwrap();

if prop_key.eq(b"grebi:type") || prop_key.eq(b"grebi:datasources") || prop_key.eq(b"id") {
let is_type = prop_key.eq(b"grebi:type");

if is_type || prop_key.eq(b"grebi:datasources") || prop_key.eq(b"id") {
metadata_writer.write_all(r#",""#.as_bytes()).unwrap();
metadata_writer.write_all(&prop_key).unwrap();
metadata_writer.write_all(r#"":["#.as_bytes()).unwrap();
Expand All @@ -118,6 +121,10 @@ fn main() {
} else {
metadata_writer.write_all(r#","#.as_bytes()).unwrap();
}
if is_type && val.kind == JsonTokenType::StartString {
let typecount = types_to_count.entry(val.value[1..val.value.len()-1].to_vec()).or_insert(0);
*typecount += 1;
}
metadata_writer.write_all(val.value).unwrap();
}
}
Expand Down Expand Up @@ -218,7 +225,12 @@ fn main() {
return (String::from_utf8(k.to_vec()).unwrap(), json!({
"count": v
}))
}).collect::<HashMap<String,serde_json::Value>>()
}).collect::<HashMap<String,serde_json::Value>>(),
"types": types_to_count.iter().map(|(k,v)| {
return (String::from_utf8(k.to_vec()).unwrap(), json!({
"count": v
}))
}).collect::<HashMap<String,serde_json::Value>>(),
})).unwrap().as_bytes()).unwrap();

for name in all_names {
Expand Down
47 changes: 37 additions & 10 deletions 05_materialise/grebi_materialise/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ struct Args {
#[arg(long)]
in_metadata_jsonl: String,

#[arg(long)]
in_summary_json: String,

#[arg(long)]
out_edges_jsonl: String,

Expand Down Expand Up @@ -111,6 +114,14 @@ fn main() -> std::io::Result<()> {

let node_metadata = load_metadata_mapping_table::load_metadata_mapping_table(&args.in_metadata_jsonl);

let mut types_to_count:HashMap<Vec<u8>,i64> = HashMap::new();
{
let summary_json:Map<String, Value> = serde_json::from_reader(File::open(&args.in_summary_json).unwrap()).unwrap();
for (k, v) in summary_json["types"].as_object().unwrap() {
types_to_count.insert(k.as_bytes().to_vec(), v.as_object().unwrap()["count"].as_i64().unwrap());
}
}

let stdin = io::stdin().lock();
let mut reader = BufReader::new(stdin);

Expand Down Expand Up @@ -149,20 +160,31 @@ fn main() -> std::io::Result<()> {
eprintln!("... written {} nodes", n_nodes);
}

let mut rarest_type:Option<Vec<u8>> = None;
let mut rarest_type_count:i64 = std::i64::MAX;

sliced.props.iter().for_each(|prop| {

let prop_key = prop.key;

if prop_key.eq(b"grebi:type") {
for val in &prop.values {
if val.kind == JsonTokenType::StartString {
let buf = &val.value.to_vec();
let str = JsonParser::parse(&buf).string();
all_types.insert(str.to_vec());
}
}
if prop_key.eq(b"grebi:type") {
for val in &prop.values {
if val.kind == JsonTokenType::StartString {
let buf = &val.value.to_vec();
let str = JsonParser::parse(&buf).string();
all_types.insert(str.to_vec());

let count = types_to_count.get(str);
if count.is_some() {
if *count.unwrap() < rarest_type_count {
rarest_type = Some(str.to_vec());
rarest_type_count = *count.unwrap();
}
}
}
}

}
}

all_entity_props.insert(prop_key.to_vec());

Expand All @@ -187,6 +209,11 @@ fn main() -> std::io::Result<()> {
};

nodes_writer.write_all(&line[0..line.len()-1] /* skip closing bracket */).unwrap();
if rarest_type.is_some() {
nodes_writer.write_all(b",\"grebi:displayType\":\"").unwrap();
nodes_writer.write_all(&rarest_type.unwrap()).unwrap();
nodes_writer.write_all(b"\"").unwrap();
}
nodes_writer.write_all(b",\"_refs\":").unwrap();
nodes_writer.write_all(serde_json::to_string(&_refs).unwrap().as_bytes()).unwrap();
nodes_writer.write_all(b"}\n").unwrap();
Expand Down Expand Up @@ -225,7 +252,7 @@ fn main() -> std::io::Result<()> {
summary_writer.write_all(serde_json::to_string_pretty(&json!({
"entity_prop_defs": entity_prop_defs,
"edge_prop_defs": edge_prop_defs,
"type_defs": type_defs,
"types": type_defs,
"edges": edge_summary
})).unwrap().as_bytes()).unwrap();

Expand Down
13 changes: 12 additions & 1 deletion 06_prepare_db_import/grebi_make_neo_csv/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ fn main() -> std::io::Result<()> {



nodes_writer.write_all("grebi:nodeId:ID,:LABEL,grebi:datasources:string[],grebi:subgraph:string".as_bytes()).unwrap();
nodes_writer.write_all("grebi:nodeId:ID,:LABEL,grebi:datasources:string[],grebi:subgraph:string,grebi:displayType:string".as_bytes()).unwrap();
for prop in &all_entity_props {
nodes_writer.write_all(b",").unwrap();
nodes_writer.write_all(prop.as_bytes()).unwrap();
Expand Down Expand Up @@ -204,8 +204,16 @@ fn write_node(src_line:&[u8], entity:&SlicedEntity, all_node_props:&HashSet<Stri
nodes_writer.write_all(ds).unwrap();
});

// grebi:subgraph
nodes_writer.write_all(b"\",\"").unwrap();
nodes_writer.write_all(entity.subgraph).unwrap();
nodes_writer.write_all(b"\",").unwrap();

// grebi:displayType
nodes_writer.write_all(b"\"").unwrap();
if entity.display_type.is_some() {
write_escaped_value(entity.display_type.unwrap(), nodes_writer);
}
nodes_writer.write_all(b"\"").unwrap();

for header_prop in all_node_props {
Expand All @@ -218,6 +226,9 @@ fn write_node(src_line:&[u8], entity:&SlicedEntity, all_node_props:&HashSet<Stri
if row_prop.key == "grebi:type".as_bytes() {
continue; // already put in :LABEL column
}
if row_prop.key == "grebi:displayType".as_bytes() {
continue; // already written above
}
if header_prop.as_bytes() == row_prop.key {
if row_prop.key == "id".as_bytes() {
for val in row_prop.values.iter() {
Expand Down
1 change: 1 addition & 0 deletions 06_prepare_db_import/grebi_make_solr/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ fn write_solr_object(line:&Vec<u8>, nodes_writer:&mut BufWriter<&File>) {
k.eq("grebi:from") ||
k.eq("grebi:to") ||
k.eq("grebi:subgraph") ||
k.eq("grebi:displayType") ||
( k.eq("grebi:type") && !v.is_array() /* edge types are singular */ )
{
out_json.insert(escape_key(k), v.clone());
Expand Down
17 changes: 1 addition & 16 deletions configs/subgraph_configs/ebi_full_monarch.json
Original file line number Diff line number Diff line change
Expand Up @@ -61,21 +61,6 @@
"ols:ontologyId"
],
"datasource_configs": [
"./configs/datasource_configs/gwas.json",
"./configs/datasource_configs/hgnc.json",
"./configs/datasource_configs/impc.json",
"./configs/datasource_configs/sssom.json",
"./configs/datasource_configs/ols.json",
"./configs/datasource_configs/reactome.json",
"./configs/datasource_configs/ubergraph.json",
"./configs/datasource_configs/otar.json",
"./configs/datasource_configs/monarch.json",
"./configs/datasource_configs/metabolights.json",
"./configs/datasource_configs/mondo_efo.json",
"./configs/datasource_configs/hett_pesticides_appril.json",
"./configs/datasource_configs/hett_pesticides_eu.json",
"./configs/datasource_configs/hett_pesticides_gb.json",
"./configs/datasource_configs/aopwiki.json",
"./configs/datasource_configs/chembl.json"
"./configs/datasource_configs/reactome.json"
]
}
11 changes: 9 additions & 2 deletions grebi_shared/src/slice_merged_entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ pub struct SlicedEntity<'a> {
pub datasources:Vec<&'a [u8]>,
pub subgraph:&'a [u8],
pub props:Vec<SlicedProperty<'a>>,
pub _refs:Option<&'a [u8]>
pub _refs:Option<&'a [u8]>,
pub display_type:Option<&'a [u8]>
}

impl<'a> SlicedEntity<'a> {
Expand All @@ -34,6 +35,7 @@ impl<'a> SlicedEntity<'a> {

let mut props:Vec<SlicedProperty> = Vec::new();
let mut entity_datasources:Vec<&[u8]> = Vec::new();
let mut display_type:Option<&[u8]> = None;
let mut _refs:Option<&[u8]> = None;

// {
Expand Down Expand Up @@ -62,6 +64,11 @@ impl<'a> SlicedEntity<'a> {

let prop_key = parser.name();

if prop_key == b"grebi:displayType" {
display_type = Some(&parser.value());
continue;
}

if prop_key == b"_refs" {
_refs = Some(&parser.value());
continue;
Expand Down Expand Up @@ -106,7 +113,7 @@ impl<'a> SlicedEntity<'a> {



return SlicedEntity { id, datasources: entity_datasources, subgraph, props, _refs };
return SlicedEntity { id, datasources: entity_datasources, subgraph, props, display_type, _refs };

}

Expand Down
28 changes: 14 additions & 14 deletions nextflow/01_create_subgraph.nf
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@ workflow {
Channel.value(config.bytes_per_merged_file))

indexed = index(merged.collect())
materialise(merged.flatten(), indexed.metadata_jsonl, Channel.value(config.exclude_edges + config.identifier_props), Channel.value(config.exclude_self_referential_edges + config.identifier_props), groups_txt)
merge_summary_jsons(indexed.prop_summary_json.collect() + materialise.out.edge_summary.collect())

materialise(merged.flatten(), indexed.metadata_jsonl, indexed.summary_json, Channel.value(config.exclude_edges + config.identifier_props), Channel.value(config.exclude_self_referential_edges + config.identifier_props), groups_txt)
merge_summary_jsons(indexed.summary_json.collect() + materialise.out.mat_summary.collect())

materialised_nodes_and_edges = materialise.out.nodes.collect() + materialise.out.edges.collect()

rocks_db = create_rocks(materialised_nodes_and_edges)

neo_input_dir = prepare_neo(indexed.prop_summary_json, materialise.out.nodes, materialise.out.edges)
neo_input_dir = prepare_neo(indexed.summary_json, materialise.out.nodes, materialise.out.edges)

ids_csv = create_neo_ids_csv(indexed.ids_txt)
neo_db = create_neo(
Expand Down Expand Up @@ -208,7 +209,7 @@ process index {

output:
path("metadata.jsonl"), emit: metadata_jsonl
path("prop_summary.json"), emit: prop_summary_json
path("summary.json"), emit: summary_json
path("names.txt"), emit: names_txt
path("ids_${params.subgraph}.txt"), emit: ids_txt

Expand All @@ -220,7 +221,7 @@ process index {
| ${params.home}/target/release/grebi_index \
--subgraph-name ${params.subgraph} \
--out-metadata-jsonl-path metadata.jsonl \
--out-summary-json-path prop_summary.json \
--out-summary-json-path summary.json \
--out-names-txt names.txt \
--out-ids-txt ids_${params.subgraph}.txt
"""
Expand All @@ -237,14 +238,15 @@ process materialise {
input:
path(merged_filename)
path(metadata_jsonl)
path(index_summary_json)
val(exclude)
val(exclude_self_referential)
path(groups_txt)

output:
path("materialised_nodes_${task.index}.jsonl"), emit: nodes
path("materialised_edges_${task.index}.jsonl"), emit: edges
path("edge_summary_${task.index}.json"), emit: edge_summary
path("mat_summary_${task.index}.json"), emit: mat_summary

script:
"""
Expand All @@ -253,9 +255,10 @@ process materialise {
cat ${merged_filename} \
| ${params.home}/target/release/grebi_materialise \
--in-metadata-jsonl ${metadata_jsonl} \
--in-summary-json ${index_summary_json} \
--groups-txt ${groups_txt} \
--out-edges-jsonl materialised_edges_${task.index}.jsonl \
--out-summary-json edge_summary_${task.index}.json \
--out-summary-json mat_summary_${task.index}.json \
--exclude ${exclude.iterator().join(",")} \
--exclude-self-referential ${exclude_self_referential.iterator().join(",")} \
> materialised_nodes_${task.index}.jsonl
Expand All @@ -266,9 +269,6 @@ process merge_summary_jsons {
cache "lenient"
memory "8 GB"
time "1h"
//time { 1.hour + 8.hour * (task.attempt-1) }
//errorStrategy { task.exitStatus in 137..140 ? 'retry' : 'terminate' }
//maxRetries 5

publishDir "${params.tmp}/${params.config}/${params.subgraph}", overwrite: true

Expand Down Expand Up @@ -321,7 +321,7 @@ process prepare_neo {
publishDir "${params.tmp}/${params.config}/${params.subgraph}/neo4j_csv", overwrite: true

input:
path(prop_summary_json)
path(summary_json)
path(nodes_jsonl)
path(edges_jsonl)

Expand All @@ -335,7 +335,7 @@ process prepare_neo {
#!/usr/bin/env bash
set -Eeuo pipefail
${params.home}/target/release/grebi_make_neo_csv \
--in-summary-jsons ${prop_summary_json} \
--in-summary-jsons ${summary_json} \
--in-nodes-jsonl ${nodes_jsonl} \
--in-edges-jsonl ${edges_jsonl} \
--out-nodes-csv-path neo_nodes_${params.subgraph}_${task.index}.csv \
Expand Down Expand Up @@ -435,7 +435,7 @@ process create_solr_nodes_core {
set -Eeuo pipefail
python3 ${params.home}/06_prepare_db_import/make_solr_config.py \
--subgraph-name ${params.subgraph} \
--in-summary-json ${params.tmp}/${params.config}/${params.subgraph}/prop_summary.json \
--in-summary-json ${params.tmp}/${params.config}/${params.subgraph}/summary.json \
--in-template-config-dir ${params.home}/06_prepare_db_import/solr_config_template \
--out-config-dir solr_config
python3 ${params.home}/07_create_db/solr/solr_import.slurm.py \
Expand Down Expand Up @@ -464,7 +464,7 @@ process create_solr_edges_core {
set -Eeuo pipefail
python3 ${params.home}/06_prepare_db_import/make_solr_config.py \
--subgraph-name ${params.subgraph} \
--in-summary-json ${params.tmp}/${params.config}/${params.subgraph}/prop_summary.json \
--in-summary-json ${params.tmp}/${params.config}/${params.subgraph}/summary.json \
--in-template-config-dir ${params.home}/06_prepare_db_import/solr_config_template \
--out-config-dir solr_config
python3 ${params.home}/07_create_db/solr/solr_import.slurm.py \
Expand Down
2 changes: 1 addition & 1 deletion scripts/dataload_saturos.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/bin/bash
export GREBI_HOME=/home/james/grebi
export GREBI_TMP=/data/grebi_tmp
export GREBI_CONFIG=hett_only
export GREBI_CONFIG=ebi
export GREBI_IS_EBI=false
export GREBI_TIMESTAMP=$(date +%Y_%m_%d__%H_%M)
cd $GREBI_TMP
Expand Down

0 comments on commit 65ff063

Please sign in to comment.