Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

opt(torii-grpc): parallelize queries #2443

Merged
merged 12 commits into from
Sep 19, 2024
66 changes: 50 additions & 16 deletions crates/torii/grpc/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use proto::world::{
RetrieveEventsRequest, RetrieveEventsResponse, SubscribeModelsRequest, SubscribeModelsResponse,
UpdateEntitiesSubscriptionRequest,
};
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohayo sensei! Introducing Rayon Requires Caution in Async Context

The addition of Rayon imports (line 24) indicates the use of data parallelism. However, using Rayon within asynchronous functions can lead to unexpected behavior due to its blocking nature and potential thread pool conflicts with Tokio's async runtime. It's important to ensure that integrating Rayon doesn't cause deadlocks or hinder async performance.

Consider using async-aware concurrency patterns provided by Tokio or the futures crate to achieve parallelism that's compatible with the async runtime.

use sqlx::prelude::FromRow;
use sqlx::sqlite::SqliteRow;
use sqlx::{Pool, Row, Sqlite};
Expand Down Expand Up @@ -265,7 +266,7 @@ impl DojoWorld {
return Ok((Vec::new(), 0));
}

// query to filter with limit and offset
// Query to get entity IDs and their model IDs
let mut query = format!(
r#"
SELECT {table}.id, group_concat({model_relation_table}.model_id) as model_ids
Expand All @@ -288,36 +289,63 @@ impl DojoWorld {
let db_entities: Vec<(String, String)> =
sqlx::query_as(&query).bind(limit).bind(offset).fetch_all(&self.pool).await?;

let mut entities = Vec::with_capacity(db_entities.len());
// Group entities by their model combinations
let mut model_groups: HashMap<Vec<Felt>, Vec<String>> = HashMap::new();
for (entity_id, models_str) in db_entities {
let model_ids: Vec<Felt> = models_str
.split(',')
.map(Felt::from_str)
.collect::<Result<_, _>>()
.map_err(ParseError::FromStr)?;
model_groups.entry(model_ids).or_default().push(entity_id);
}
Larkooo marked this conversation as resolved.
Show resolved Hide resolved

let mut entities = Vec::new();
for (model_ids, entity_ids) in model_groups {
let schemas =
self.model_cache.models(&model_ids).await?.into_iter().map(|m| m.schema).collect();

let (entity_query, arrays_queries, _) = build_sql_query(
&schemas,
table,
entity_relation_column,
Some(&format!("{table}.id = ?")),
Some(&format!("{table}.id = ?")),
Some(&format!(
"{table}.id IN ({})",
entity_ids.iter().map(|_| "?").collect::<Vec<_>>().join(",")
)),
Some(&format!(
"{table}.id IN ({})",
entity_ids.iter().map(|_| "?").collect::<Vec<_>>().join(",")
)),
Larkooo marked this conversation as resolved.
Show resolved Hide resolved
None,
None,
)?;

let row =
sqlx::query(&entity_query).bind(entity_id.clone()).fetch_one(&self.pool).await?;
let mut query = sqlx::query(&entity_query);
for id in &entity_ids {
query = query.bind(id);
}
let rows = query.fetch_all(&self.pool).await?;

let mut arrays_rows = HashMap::new();
for (name, query) in arrays_queries {
let rows =
sqlx::query(&query).bind(entity_id.clone()).fetch_all(&self.pool).await?;
arrays_rows.insert(name, rows);
for (name, array_query) in arrays_queries {
let mut query = sqlx::query(&array_query);
for id in &entity_ids {
query = query.bind(id);
}
let array_rows = query.fetch_all(&self.pool).await?;
arrays_rows.insert(name, array_rows);
}

entities.push(map_row_to_entity(&row, &arrays_rows, schemas.clone())?);
let arrays_rows = Arc::new(arrays_rows);
let schemas = Arc::new(schemas);

let group_entities: Result<Vec<_>, Error> = rows
.par_iter()
.map(|row| map_row_to_entity(row, &arrays_rows, (*schemas).clone()))
.collect();

entities.extend(group_entities?);
}

Ok((entities, total_count))
Expand Down Expand Up @@ -582,11 +610,17 @@ impl DojoWorld {
arrays_rows.insert(name, rows);
}

let entities_collection = db_entities
.iter()
.map(|row| map_row_to_entity(row, &arrays_rows, schemas.clone()))
.collect::<Result<Vec<_>, Error>>()?;
Ok((entities_collection, total_count))
// Use Rayon to parallelize the mapping of rows to entities
Larkooo marked this conversation as resolved.
Show resolved Hide resolved
let arrays_rows = Arc::new(arrays_rows);
let entities_collection: Result<Vec<_>, Error> = db_entities
.par_iter()
.map(|row| {
let schemas_clone = schemas.clone();
let arrays_rows_clone = arrays_rows.clone();
map_row_to_entity(row, &arrays_rows_clone, schemas_clone)
})
.collect();
Ok((entities_collection?, total_count))
glihm marked this conversation as resolved.
Show resolved Hide resolved
}

pub(crate) async fn query_by_composite(
Expand Down
Loading