Skip to content

Commit

Permalink
opt-fix(torii-core): fix and optimize partial updates (#2427)
Browse files Browse the repository at this point in the history
* opt-fix(torii-core): fix and optimize partial updates

* fmt

* values

* fix: store update whne recursive set

* fmt

* test from lambda

* remove get entity keys test

* fix clippy
  • Loading branch information
Larkooo authored Sep 16, 2024
1 parent a96c4c9 commit b039f53
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 141 deletions.
3 changes: 2 additions & 1 deletion crates/torii/core/src/processors/store_set_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ where
let mut entity = model.schema;
entity.deserialize(&mut keys_and_unpacked)?;

db.set_entity(entity, event_id, block_timestamp, entity_id, model_id, &keys_str).await?;
db.set_entity(entity, event_id, block_timestamp, entity_id, model_id, Some(&keys_str))
.await?;
Ok(())
}
}
9 changes: 5 additions & 4 deletions crates/torii/core/src/processors/store_update_member.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use anyhow::{Context, Error, Result};
use async_trait::async_trait;
use dojo_types::schema::{Struct, Ty};
use dojo_world::contracts::naming;
use dojo_world::contracts::world::WorldContractReader;
use num_traits::ToPrimitive;
Expand Down Expand Up @@ -50,11 +51,11 @@ where
event_id: &str,
event: &Event,
) -> Result<(), Error> {
let selector = event.data[MODEL_INDEX];
let model_id = event.data[MODEL_INDEX];
let entity_id = event.data[ENTITY_ID_INDEX];
let member_selector = event.data[MEMBER_INDEX];

let model = db.model(selector).await?;
let model = db.model(model_id).await?;
let schema = model.schema;

let mut member = schema
Expand Down Expand Up @@ -98,9 +99,9 @@ where
}

member.ty.deserialize(&mut values)?;
let wrapped_ty = Ty::Struct(Struct { name: schema.name(), children: vec![member] });

db.set_model_member(&schema.name(), entity_id, false, &member, event_id, block_timestamp)
.await?;
db.set_entity(wrapped_ty, event_id, block_timestamp, entity_id, model_id, None).await?;
Ok(())
}
}
28 changes: 14 additions & 14 deletions crates/torii/core/src/processors/store_update_record.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::{Context, Error, Ok, Result};
use async_trait::async_trait;
use dojo_world::contracts::naming;
use dojo_types::schema::Ty;
use dojo_world::contracts::world::WorldContractReader;
use num_traits::ToPrimitive;
use starknet::core::types::Event;
Expand All @@ -9,7 +9,7 @@ use tracing::info;

use super::EventProcessor;
use crate::processors::{ENTITY_ID_INDEX, MODEL_INDEX};
use crate::sql::{felts_sql_string, Sql};
use crate::sql::Sql;

pub(crate) const LOG_TARGET: &str = "torii_core::processors::store_update_record";

Expand Down Expand Up @@ -64,21 +64,21 @@ where
values_start + event.data[values_start].to_usize().context("invalid usize")?;

// Skip the length to only get the values as they will be deserialized.
let values = event.data[values_start + 1..=values_end].to_vec();

let tag = naming::get_tag(&model.namespace, &model.name);

// Keys are read from the db, since we don't have access to them when only
// the entity id is passed.
let keys = db.get_entity_keys(entity_id, &tag).await?;

let keys_str = felts_sql_string(&keys);
let mut keys_and_unpacked = [keys, values].concat();
let mut values = event.data[values_start + 1..=values_end].to_vec();

let mut entity = model.schema;
entity.deserialize(&mut keys_and_unpacked)?;
match entity {
Ty::Struct(ref mut struct_) => {
// we do not need the keys. the entity Ty has the keys in its schema
// so we should get rid of them to avoid trying to deserialize them
struct_.children.retain(|field| !field.key);
}
_ => return Err(anyhow::anyhow!("Expected struct")),
}

entity.deserialize(&mut values)?;

db.set_entity(entity, event_id, block_timestamp, entity_id, model_id, &keys_str).await?;
db.set_entity(entity, event_id, block_timestamp, entity_id, model_id, None).await?;
Ok(())
}
}
129 changes: 27 additions & 102 deletions crates/torii/core/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ use std::sync::Arc;
use anyhow::{anyhow, Result};
use chrono::Utc;
use dojo_types::primitive::Primitive;
use dojo_types::schema::{EnumOption, Member, Struct, Ty};
use dojo_types::schema::{EnumOption, Member, Ty};
use dojo_world::contracts::abi::model::Layout;
use dojo_world::contracts::naming::{compute_selector_from_names, compute_selector_from_tag};
use dojo_world::metadata::WorldMetadata;
use sqlx::pool::PoolConnection;
use sqlx::{Pool, Row, Sqlite};
use sqlx::{Pool, Sqlite};
use starknet::core::types::{Event, Felt, InvokeTransaction, Transaction};
use starknet_crypto::poseidon_hash_many;
use tracing::debug;
Expand All @@ -24,7 +24,7 @@ use crate::types::{
use crate::utils::{must_utc_datetime_from_timestamp, utc_dt_string_from_timestamp};

type IsEventMessage = bool;
type IsStoreUpdateMember = bool;
type IsStoreUpdate = bool;

pub const WORLD_CONTRACT_TYPE: &str = "WORLD";
pub const FELT_DELIMITER: &str = "/";
Expand Down Expand Up @@ -169,28 +169,35 @@ impl Sql {
block_timestamp: u64,
entity_id: Felt,
model_id: Felt,
keys_str: &str,
keys_str: Option<&str>,
) -> Result<()> {
let namespaced_name = entity.name();

let entity_id = format!("{:#x}", entity_id);
let model_id = format!("{:#x}", model_id);

let insert_entities = "INSERT INTO entities (id, keys, event_id, executed_at) VALUES (?, \
?, ?, ?) ON CONFLICT(id) DO UPDATE SET \
updated_at=CURRENT_TIMESTAMP, executed_at=EXCLUDED.executed_at, \
event_id=EXCLUDED.event_id RETURNING *";
let insert_entities = if keys_str.is_some() {
"INSERT INTO entities (id, event_id, executed_at, keys) VALUES (?, ?, ?, ?) ON \
CONFLICT(id) DO UPDATE SET updated_at=CURRENT_TIMESTAMP, \
executed_at=EXCLUDED.executed_at, event_id=EXCLUDED.event_id, keys=EXCLUDED.keys \
RETURNING *"
} else {
"INSERT INTO entities (id, event_id, executed_at) VALUES (?, ?, ?) ON CONFLICT(id) DO \
UPDATE SET updated_at=CURRENT_TIMESTAMP, executed_at=EXCLUDED.executed_at, \
event_id=EXCLUDED.event_id RETURNING *"
};

self.query_queue.enqueue(
insert_entities,
vec![
Argument::String(entity_id.clone()),
Argument::String(keys_str.to_string()),
Argument::String(event_id.to_string()),
Argument::String(utc_dt_string_from_timestamp(block_timestamp)),
],
QueryType::SetEntity(entity.clone()),
);
let mut arguments = vec![
Argument::String(entity_id.clone()),
Argument::String(event_id.to_string()),
Argument::String(utc_dt_string_from_timestamp(block_timestamp)),
];

if let Some(keys) = keys_str {
arguments.push(Argument::String(keys.to_string()));
}

self.query_queue.enqueue(insert_entities, arguments, QueryType::SetEntity(entity.clone()));

self.query_queue.enqueue(
"INSERT INTO entity_model (entity_id, model_id) VALUES (?, ?) ON CONFLICT(entity_id, \
Expand All @@ -204,7 +211,7 @@ impl Sql {
path,
event_id,
(&entity_id, false),
(&entity, false),
(&entity, keys_str.is_none()),
block_timestamp,
&vec![],
);
Expand Down Expand Up @@ -271,48 +278,6 @@ impl Sql {
Ok(())
}

pub async fn set_model_member(
&mut self,
model_tag: &str,
entity_id: Felt,
is_event_message: bool,
member: &Member,
event_id: &str,
block_timestamp: u64,
) -> Result<()> {
let entity_id = format!("{:#x}", entity_id);
let path = vec![model_tag.to_string()];

let wrapped_ty =
Ty::Struct(Struct { name: model_tag.to_string(), children: vec![member.clone()] });

// update model member
self.build_set_entity_queries_recursive(
path,
event_id,
(&entity_id, is_event_message),
(&wrapped_ty, true),
block_timestamp,
&vec![],
);
self.execute().await?;

let update_query = "UPDATE entities SET updated_at=CURRENT_TIMESTAMP, executed_at=?, \
event_id=? WHERE id = ? RETURNING *";

self.query_queue.enqueue(
update_query.to_string(),
vec![
Argument::String(utc_dt_string_from_timestamp(block_timestamp)),
Argument::String(event_id.to_string()),
Argument::String(entity_id.clone()),
],
QueryType::SetEntity(wrapped_ty),
);

Ok(())
}

pub async fn delete_entity(
&mut self,
entity_id: Felt,
Expand Down Expand Up @@ -419,46 +384,6 @@ impl Sql {
self.model_cache.model(&selector).await.map_err(|e| e.into())
}

/// Retrieves the keys definition for a given model.
/// The key definition is currently implemented as (`name`, `type`).
pub async fn get_entity_keys_def(&self, model_tag: &str) -> Result<Vec<(String, String)>> {
let query = sqlx::query_as::<_, (String, String)>(
"SELECT name, type FROM model_members WHERE id = ? AND key = true",
)
.bind(model_tag);

let mut conn: PoolConnection<Sqlite> = self.pool.acquire().await?;
let rows: Vec<(String, String)> = query.fetch_all(&mut *conn).await?;
Ok(rows.iter().map(|(name, ty)| (name.to_string(), ty.to_string())).collect())
}

/// Retrieves the keys for a given entity.
/// The keys are returned in the same order as the keys definition.
pub async fn get_entity_keys(&self, entity_id: Felt, model_tag: &str) -> Result<Vec<Felt>> {
let entity_id = format!("{:#x}", entity_id);
let keys_def = self.get_entity_keys_def(model_tag).await?;

let keys_names =
keys_def.iter().map(|(name, _)| format!("external_{}", name)).collect::<Vec<String>>();

let sql = format!("SELECT {} FROM [{}] WHERE id = ?", keys_names.join(", "), model_tag);
let query = sqlx::query(sql.as_str()).bind(entity_id);

let mut conn: PoolConnection<Sqlite> = self.pool.acquire().await?;

let mut keys: Vec<Felt> = vec![];
let result = query.fetch_all(&mut *conn).await?;

for row in result {
for (i, _) in row.columns().iter().enumerate() {
let value: String = row.try_get(i)?;
keys.push(Felt::from_hex(&value)?);
}
}

Ok(keys)
}

pub async fn does_entity_exist(&self, model: String, key: Felt) -> Result<bool> {
let sql = format!("SELECT COUNT(*) FROM [{model}] WHERE id = ?");

Expand Down Expand Up @@ -641,7 +566,7 @@ impl Sql {
event_id: &str,
// The id of the entity and if the entity is an event message
entity_id: (&str, IsEventMessage),
entity: (&Ty, IsStoreUpdateMember),
entity: (&Ty, IsStoreUpdate),
block_timestamp: u64,
indexes: &Vec<i64>,
) {
Expand Down
49 changes: 32 additions & 17 deletions crates/torii/core/src/sql_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ use crate::processors::generate_event_processors_map;
use crate::processors::register_model::RegisterModelProcessor;
use crate::processors::store_del_record::StoreDelRecordProcessor;
use crate::processors::store_set_record::StoreSetRecordProcessor;
use crate::processors::store_update_member::StoreUpdateMemberProcessor;
use crate::processors::store_update_record::StoreUpdateRecordProcessor;
use crate::sql::Sql;

pub async fn bootstrap_engine<P>(
Expand All @@ -42,6 +44,8 @@ where
event: generate_event_processors_map(vec![
Box::new(RegisterModelProcessor),
Box::new(StoreSetRecordProcessor),
Box::new(StoreUpdateRecordProcessor),
Box::new(StoreUpdateMemberProcessor),
Box::new(StoreDelRecordProcessor),
])?,
..Processors::default()
Expand Down Expand Up @@ -292,24 +296,31 @@ async fn test_load_from_remote_del() {
db.execute().await.unwrap();
}

// Start of Selection
#[tokio::test(flavor = "multi_thread")]
async fn test_get_entity_keys() {
async fn test_update_with_set_record() {
// Initialize the SQLite in-memory database
let options =
SqliteConnectOptions::from_str("sqlite::memory:").unwrap().create_if_missing(true);
let pool = SqlitePoolOptions::new().max_connections(5).connect_with(options).await.unwrap();
sqlx::migrate!("../migrations").run(&pool).await.unwrap();

// Set up the compiler test environment
let setup = CompilerTestSetup::from_examples("../../dojo-core", "../../../examples/");
let config = setup.build_test_config("spawn-and-move", Profile::DEV);

let ws = scarb::ops::read_workspace(config.manifest_path(), &config).unwrap();
let manifest_path = Utf8PathBuf::from(config.manifest_path().parent().unwrap());
let target_dir = Utf8PathBuf::from(ws.target_dir().to_string()).join("dev");

// Configure and start the KatanaRunner
let seq_config = KatanaRunnerConfig { n_accounts: 10, ..Default::default() }
.with_db_dir(copy_spawn_and_move_db().as_str());

let sequencer = KatanaRunner::new_with_config(seq_config).expect("Failed to start runner.");
let account = sequencer.account(0);

// Prepare migration with world and seed
let (strat, _) = prepare_migration_with_world_and_seed(
manifest_path,
target_dir,
Expand All @@ -327,10 +338,9 @@ async fn test_get_entity_keys() {
strat.world_address,
);

let account = sequencer.account(0);

let world = WorldContract::new(strat.world_address, &account);

// Grant writer permissions
let res = world
.grant_writer(&compute_bytearray_hash("dojo_examples"), &ContractAddress(actions_address))
.send_with_cfg(&TxnConfig::init_wait())
Expand All @@ -339,8 +349,8 @@ async fn test_get_entity_keys() {

TransactionWaiter::new(res.transaction_hash, &account.provider()).await.unwrap();

// spawn
let res = account
// Send spawn transaction
let spawn_res = account
.execute_v1(vec![Call {
to: actions_address,
selector: get_selector_from_name("spawn").unwrap(),
Expand All @@ -350,23 +360,28 @@ async fn test_get_entity_keys() {
.await
.unwrap();

TransactionWaiter::new(res.transaction_hash, &account.provider()).await.unwrap();

let world_reader = WorldContractReader::new(strat.world_address, account.provider());

let mut db = Sql::new(pool.clone(), world_reader.address).await.unwrap();
TransactionWaiter::new(spawn_res.transaction_hash, &account.provider()).await.unwrap();

let _ = bootstrap_engine(world_reader, db.clone(), account.provider()).await;
// Send move transaction
let move_res = account
.execute_v1(vec![Call {
to: actions_address,
selector: get_selector_from_name("move").unwrap(),
calldata: vec![Felt::ZERO],
}])
.send_with_cfg(&TxnConfig::init_wait())
.await
.unwrap();

let keys = db.get_entity_keys_def("dojo_examples-Moves").await.unwrap();
assert_eq!(keys, vec![("player".to_string(), "ContractAddress".to_string()),]);
TransactionWaiter::new(move_res.transaction_hash, &account.provider()).await.unwrap();

let entity_id = poseidon_hash_many(&[account.address()]);
let world_reader = WorldContractReader::new(strat.world_address, account.provider());

let keys = db.get_entity_keys(entity_id, "dojo_examples-Moves").await.unwrap();
assert_eq!(keys, vec![account.address()]);
let db = Sql::new(pool.clone(), world_reader.address).await.unwrap();

db.execute().await.unwrap();
// Expect bootstrap_engine to error out due to the existing bug
let result = bootstrap_engine(world_reader, db.clone(), account.provider()).await;
assert!(result.is_ok(), "bootstrap_engine should not fail");
}

/// Count the number of rows in a table.
Expand Down
Loading

0 comments on commit b039f53

Please sign in to comment.