Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/materialise improvement #1773

Merged
merged 21 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
9865b88
wip parallel materialize
ljeub-pometry Sep 10, 2024
b12efae
working implementation
ljeub-pometry Sep 11, 2024
f1eda37
refactor the benchmark crate so the common parts can be imported norm…
ljeub-pometry Sep 11, 2024
2789dbb
make edge iterator return references
ljeub-pometry Sep 11, 2024
25de963
fix issues after rebase
ljeub-pometry Sep 11, 2024
45104b3
don't reorder the node ids
ljeub-pometry Sep 11, 2024
5d0ef86
add protobuf decoding and encoding benchmarks
ljeub-pometry Sep 12, 2024
fd92d1e
implement protobuf decoding using sharded readers
ljeub-pometry Sep 12, 2024
664f91f
add encode/decode benchmark to base
ljeub-pometry Sep 12, 2024
3ccc4fa
add missing calls to update_time
ljeub-pometry Sep 13, 2024
f9d957e
make assert_graph_eq check more things and fix related bugs
ljeub-pometry Sep 13, 2024
0bd049e
make sure we don't mess with the earliest time of the graph when rese…
ljeub-pometry Sep 13, 2024
dbf6e08
fix warnings
ljeub-pometry Sep 13, 2024
f4b92f9
Merge master into feature/MaterialiseImprovement
ljeub-pometry Sep 13, 2024
6e7c0c4
improve materialise implementation and add more tests
ljeub-pometry Sep 16, 2024
89550cf
fix graph properties for disk graph
ljeub-pometry Sep 16, 2024
62dd976
clean up
ljeub-pometry Sep 16, 2024
b7505bd
Merge master into feature/MaterialiseImprovement
ljeub-pometry Sep 16, 2024
d3778c0
Merge master into feature/MaterialiseImprovement
ljeub-pometry Sep 18, 2024
74c2bbe
move proto impl to its own module and add proptest
ljeub-pometry Sep 18, 2024
b0b7b0a
remove unused method
ljeub-pometry Sep 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions js-raphtory/src/graph/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,11 @@ impl Node {

#[wasm_bindgen(js_name = edges)]
pub fn edges(&self) -> js_sys::Array {
self.0.edges().iter().map(Edge).map(JsValue::from).collect()
self.0
.edges()
.iter()
.map(|e| JsValue::from(Edge(e.cloned())))
.collect()
}

// out_edges
Expand All @@ -90,8 +94,7 @@ impl Node {
self.0
.out_edges()
.iter()
.map(Edge)
.map(JsValue::from)
.map(|e| JsValue::from(Edge(e.cloned())))
.collect()
}

Expand All @@ -101,8 +104,7 @@ impl Node {
self.0
.in_edges()
.iter()
.map(Edge)
.map(JsValue::from)
.map(|e| JsValue::from(Edge(e.cloned())))
.collect()
}

Expand Down
2 changes: 1 addition & 1 deletion pometry-storage-private
4 changes: 2 additions & 2 deletions raphtory-api/src/core/entities/edges/edge_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ impl EdgeRef {
}

#[inline(always)]
pub fn layer(&self) -> Option<&usize> {
self.layer_id.as_ref()
pub fn layer(&self) -> Option<usize> {
self.layer_id
}

#[inline(always)]
Expand Down
9 changes: 8 additions & 1 deletion raphtory-api/src/core/entities/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl From<EdgeRef> for ELID {
fn from(value: EdgeRef) -> Self {
ELID {
edge: value.pid(),
layer: value.layer().copied(),
layer: value.layer(),
}
}
}
Expand Down Expand Up @@ -186,6 +186,13 @@ impl GID {
GID::Str(v) => parse_u64_strict(v),
}
}

pub fn as_ref(&self) -> GidRef {
match self {
GID::U64(v) => GidRef::U64(*v),
GID::Str(v) => GidRef::Str(v),
}
}
}

impl From<u64> for GID {
Expand Down
8 changes: 8 additions & 0 deletions raphtory-api/src/core/storage/dict_mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,14 @@ impl<Index> BorrowMut<Index> for MaybeNew<Index> {
}

impl DictMapper {
pub fn deep_clone(&self) -> Self {
let reverse_map = self.reverse_map.read().clone();

Self {
map: self.map.clone(),
reverse_map: Arc::new(RwLock::new(reverse_map)),
}
}
pub fn get_or_create_id<Q, T>(&self, name: &Q) -> MaybeNew<usize>
where
Q: Hash + Eq + ?Sized + ToOwned<Owned = T> + Borrow<str>,
Expand Down
4 changes: 2 additions & 2 deletions raphtory-api/src/core/storage/timeindex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{fmt, ops::Range};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};

#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq, Ord, PartialOrd, Eq)]
#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq, Ord, PartialOrd, Eq, Hash)]
pub struct TimeIndexEntry(pub i64, pub usize);

pub trait AsTime: fmt::Debug + Copy + Ord + Eq + Send + Sync + 'static {
Expand Down Expand Up @@ -52,7 +52,7 @@ impl TimeIndexEntry {
}

pub fn end(t: i64) -> Self {
Self(t.saturating_add(1), 0)
Self(t, usize::MAX)
}
}

Expand Down
14 changes: 13 additions & 1 deletion raphtory-benchmark/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ edition = "2021"

[dependencies]
criterion = { workspace = true }
raphtory = { path = "../raphtory", features = ["io"], version = "0.11.3" }
raphtory = { path = "../raphtory", features = ["io", "proto"], version = "0.11.3" }
raphtory-api = { path = "../raphtory-api", version = "0.11.3" }
pometry-storage.workspace = true
sorted_vector_map = { workspace = true }
Expand Down Expand Up @@ -42,6 +42,18 @@ harness = false
name = "edge_add"
harness = false

[[bench]]
name = "materialise"
harness = false

[[bench]]
name = "proto_encode"
harness = false

[[bench]]
name = "proto_decode"
harness = false

# [[bench]]
# name = "arrow_algobench"
# harness = false
3 changes: 1 addition & 2 deletions raphtory-benchmark/benches/algobench.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::common::bench;
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, SamplingMode};
use raphtory::{
algorithms::{
Expand All @@ -16,9 +15,9 @@ use raphtory::{
graphgen::random_attachment::random_attachment,
prelude::*,
};
use raphtory_benchmark::common::bench;
use rayon::prelude::*;

mod common;
pub fn local_triangle_count_analysis(c: &mut Criterion) {
let mut group = c.benchmark_group("local_triangle_count");
group.sample_size(10);
Expand Down
2 changes: 0 additions & 2 deletions raphtory-benchmark/benches/arrow_algobench.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
mod common;

#[cfg(feature = "storage")]
pub mod arrow_bench {

Expand Down
14 changes: 9 additions & 5 deletions raphtory-benchmark/benches/base.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::common::{bootstrap_graph, run_large_ingestion_benchmarks};
use common::run_graph_ops_benches;
use criterion::{criterion_group, criterion_main, Criterion, Throughput};
use raphtory::{graph_loader::lotr_graph::lotr_graph, prelude::*};

mod common;
use raphtory_benchmark::common::{
bootstrap_graph, run_graph_ops_benches, run_large_ingestion_benchmarks,
run_proto_decode_benchmark, run_proto_encode_benchmark,
};

pub fn base(c: &mut Criterion) {
// let mut ingestion_group = c.benchmark_group("ingestion");
Expand Down Expand Up @@ -48,7 +48,11 @@ pub fn base(c: &mut Criterion) {
}
}

run_graph_ops_benches(c, "lotr_graph", graph, layered_graph)
run_graph_ops_benches(c, "lotr_graph", graph.clone(), layered_graph);
let mut proto_group = c.benchmark_group("lotr_graph");
run_proto_decode_benchmark(&mut proto_group, graph.clone());
run_proto_encode_benchmark(&mut proto_group, graph.clone());
proto_group.finish();
}

criterion_group!(benches, base);
Expand Down
4 changes: 1 addition & 3 deletions raphtory-benchmark/benches/edge_add.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use criterion::{criterion_group, criterion_main, Criterion};
use raphtory::prelude::*;

mod common;
use rand::{
distributions::{Alphanumeric, DistString},
thread_rng, Rng,
};
use raphtory::prelude::*;

fn random_string(n: usize) -> String {
Alphanumeric.sample_string(&mut thread_rng(), n)
Expand Down
4 changes: 1 addition & 3 deletions raphtory-benchmark/benches/graph_ops.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
use common::run_graph_ops_benches;
use criterion::{criterion_group, criterion_main, Criterion};
use raphtory::{
graph_loader::sx_superuser_graph::{sx_superuser_file, sx_superuser_graph, TEdge},
io::csv_loader::CsvLoader,
prelude::*,
};
use raphtory_api::core::utils::hashing::calculate_hash;

mod common;
use raphtory_benchmark::common::run_graph_ops_benches;

pub fn graph(c: &mut Criterion) {
let group_name = "analysis_graph";
Expand Down
11 changes: 11 additions & 0 deletions raphtory-benchmark/benches/materialise.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
use criterion::{criterion_group, criterion_main, Criterion};
use raphtory::{graph_loader::sx_superuser_graph::sx_superuser_graph, prelude::Graph};
use raphtory_benchmark::common::bench_materialise;

pub fn bench(c: &mut Criterion) {
let graph = sx_superuser_graph().unwrap();
bench_materialise("materialise", c, || graph.clone());
}

criterion_group!(benches, bench);
criterion_main!(benches);
4 changes: 1 addition & 3 deletions raphtory-benchmark/benches/parameterized.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use crate::common::{bootstrap_graph, run_large_ingestion_benchmarks};
use criterion::{
criterion_group, criterion_main, AxisScale, Criterion, PlotConfiguration, Throughput,
};
use raphtory_api::core::entities::GID;

mod common;
use raphtory_benchmark::common::{bootstrap_graph, run_large_ingestion_benchmarks};

pub fn parameterized(c: &mut Criterion) {
let nodes_exponents = 1..6;
Expand Down
20 changes: 20 additions & 0 deletions raphtory-benchmark/benches/proto_decode.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use criterion::{criterion_group, criterion_main, Criterion};
use raphtory::graph_loader::{lotr_graph::lotr_graph, sx_superuser_graph::sx_superuser_graph};
use raphtory_benchmark::common::run_proto_decode_benchmark;

fn bench(c: &mut Criterion) {
let graph = sx_superuser_graph().unwrap();
let mut group = c.benchmark_group("proto_sx_superuser");
group.sample_size(10);
run_proto_decode_benchmark(&mut group, graph);
group.finish();

let mut group = c.benchmark_group("proto_lotr");
let graph = lotr_graph();
group.sample_size(100);
run_proto_decode_benchmark(&mut group, graph);
group.finish();
}

criterion_group!(benches, bench);
criterion_main!(benches);
14 changes: 14 additions & 0 deletions raphtory-benchmark/benches/proto_encode.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use criterion::{criterion_group, criterion_main, Criterion};
use raphtory::graph_loader::sx_superuser_graph::sx_superuser_graph;
use raphtory_benchmark::common::run_proto_encode_benchmark;

fn bench(c: &mut Criterion) {
let graph = sx_superuser_graph().unwrap();
let mut group = c.benchmark_group("proto_sx_superuser");
group.sample_size(10);
run_proto_encode_benchmark(&mut group, graph);
group.finish();
}

criterion_group!(benches, bench);
criterion_main!(benches);
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use criterion::{
use rand::{distributions::Uniform, seq::*, Rng, SeedableRng};
use raphtory::{db::api::view::StaticGraphViewOps, prelude::*};
use std::collections::HashSet;
use tempfile::NamedTempFile;

fn make_index_gen() -> Box<dyn Iterator<Item = u64>> {
let rng = rand::thread_rng();
Expand Down Expand Up @@ -570,7 +571,24 @@ pub fn run_graph_ops_benches(
);
}

fn bench_materialise<F, G>(name: &str, c: &mut Criterion, make_graph: F)
pub fn run_proto_encode_benchmark(group: &mut BenchmarkGroup<WallTime>, graph: Graph) {
println!("graph: {graph}");
let f = NamedTempFile::new().unwrap();
bench(group, "proto_encode", None, |b: &mut Bencher| {
b.iter(|| graph.encode(f.path()).unwrap())
});
}

pub fn run_proto_decode_benchmark(group: &mut BenchmarkGroup<WallTime>, graph: Graph) {
println!("graph: {graph}");
let f = NamedTempFile::new().unwrap();
graph.encode(f.path()).unwrap();
bench(group, "proto_decode", None, |b| {
b.iter(|| Graph::decode(f.path()).unwrap())
})
}

pub fn bench_materialise<F, G>(name: &str, c: &mut Criterion, make_graph: F)
where
F: Fn() -> G,
G: StaticGraphViewOps,
Expand Down
1 change: 1 addition & 0 deletions raphtory-benchmark/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod common;
11 changes: 11 additions & 0 deletions raphtory-graphql/src/model/graph/edge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,17 @@ impl<G: StaticGraphViewOps + IntoDynamic, GH: StaticGraphViewOps + IntoDynamic>
}
}

impl Edge {
pub(crate) fn from_ref<
G: StaticGraphViewOps + IntoDynamic,
GH: StaticGraphViewOps + IntoDynamic,
>(
value: EdgeView<&G, &GH>,
) -> Self {
value.cloned().into()
}
}

#[ResolvedObjectFields]
impl Edge {
////////////////////////
Expand Down
2 changes: 1 addition & 1 deletion raphtory-graphql/src/model/graph/edges.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ impl GqlEdges {
}

fn iter(&self) -> Box<dyn Iterator<Item = Edge> + '_> {
let iter = self.ee.iter().map(Edge::from);
let iter = self.ee.iter().map(Edge::from_ref);
Box::new(iter)
}
}
Expand Down
10 changes: 5 additions & 5 deletions raphtory-graphql/src/model/schema/edge_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use dynamic_graphql::{ResolvedObject, ResolvedObjectFields};
use itertools::Itertools;
use raphtory::{
db::{api::view::StaticGraphViewOps, graph::edge::EdgeView},
prelude::EdgeViewOps,
prelude::{EdgeViewOps, GraphViewOps},
};
use std::collections::{HashMap, HashSet};

Expand Down Expand Up @@ -40,13 +40,13 @@ impl<G: StaticGraphViewOps> EdgeSchema<G> {

/// Returns the list of property schemas for edges connecting these types of nodes
async fn properties(&self) -> Vec<PropertySchema> {
let filter_types = |edge: &EdgeView<G>| {
let filter_types = |edge: &EdgeView<&G>| {
let src_type = get_node_type(edge.src());
let dst_type = get_node_type(edge.dst());
src_type == self.src_type && dst_type == self.dst_type
};

let filtered_edges = self.graph.edges().iter().filter(filter_types);
let edges = self.graph.edges();
let filtered_edges = edges.iter().filter(filter_types);

let schema: SchemaAggregate = filtered_edges
.map(collect_edge_schema)
Expand All @@ -57,7 +57,7 @@ impl<G: StaticGraphViewOps> EdgeSchema<G> {
}
}

fn collect_edge_schema<G: StaticGraphViewOps>(edge: EdgeView<G>) -> SchemaAggregate {
fn collect_edge_schema<'graph, G: GraphViewOps<'graph>>(edge: EdgeView<G>) -> SchemaAggregate {
edge.properties()
.iter()
.map(|(key, value)| (key.to_string(), HashSet::from([value.to_string()])))
Expand Down
6 changes: 3 additions & 3 deletions raphtory-graphql/src/model/schema/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use raphtory::{
db::{api::view::StaticGraphViewOps, graph::node::NodeView},
prelude::NodeViewOps,
db::graph::node::NodeView,
prelude::{GraphViewOps, NodeViewOps},
};
use std::collections::{HashMap, HashSet};

Expand All @@ -12,7 +12,7 @@ pub(crate) mod property_schema;

const ENUM_BOUNDARY: usize = 20;

fn get_node_type<G: StaticGraphViewOps>(node: NodeView<G>) -> String {
fn get_node_type<'graph, G: GraphViewOps<'graph>>(node: NodeView<G>) -> String {
let prop = node.properties().get("type");
prop.map(|prop| prop.to_string())
.unwrap_or_else(|| "NONE".to_string())
Expand Down
2 changes: 1 addition & 1 deletion raphtory/src/algorithms/community_detection/modularity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ impl ModularityFunction for ModularityUnDir {
let w = weight_prop
.map(|w| e.properties().get(w).unwrap_f64())
.unwrap_or(1.0);
let dst_id = local_id_map[&e.nbr()];
let dst_id = local_id_map[&e.nbr().cloned()];
(dst_id, w)
})
.filter(|(_, w)| w >= &tol)
Expand Down
Loading
Loading