Skip to content

Commit

Permalink
opt(torii): use hashmap instead of vector of event processors
Browse files Browse the repository at this point in the history
commit-id:7303cc72
  • Loading branch information
lambda-0x committed Sep 2, 2024
1 parent 9c2746c commit 13d8020
Show file tree
Hide file tree
Showing 11 changed files with 249 additions and 74 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ starknet-crypto = "0.7.0"
# we need this <https://github.com/starknet-io/types-rs/pull/75>. So we put strict
# requirement here to prevent from being downgraded.
# We can remove this requirement once `starknet-rs` is using >=0.1.4
ahash = "0.8"
hashlink = "0.9.1"
starknet-types-core = "~0.1.4"
starknet_api = "0.11.0"
Expand Down
11 changes: 6 additions & 5 deletions bin/torii/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ version.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
ahash.workspace = true
anyhow.workspace = true
async-trait.workspace = true
base64.workspace = true
Expand All @@ -19,21 +20,21 @@ dojo-utils.workspace = true
dojo-world.workspace = true
either = "1.9.0"
futures.workspace = true
http.workspace = true
http-body = "0.4.5"
hyper.workspace = true
http.workspace = true
hyper-reverse-proxy = { git = "https://github.com/tarrencev/hyper-reverse-proxy" }
hyper.workspace = true
indexmap.workspace = true
lazy_static.workspace = true
scarb.workspace = true
serde.workspace = true
serde_json.workspace = true
sqlx.workspace = true
starknet.workspace = true
starknet-crypto.workspace = true
tokio.workspace = true
starknet.workspace = true
tokio-stream = "0.1.11"
tokio-util = "0.7.7"
tokio.workspace = true
torii-core.workspace = true
torii-graphql.workspace = true
torii-grpc = { workspace = true, features = [ "server" ] }
Expand All @@ -42,8 +43,8 @@ torii-server.workspace = true
tower.workspace = true

tower-http.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
tracing.workspace = true
url.workspace = true
webbrowser = "0.8"

Expand Down
84 changes: 75 additions & 9 deletions bin/torii/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;

use ahash::AHashMap;
use clap::{ArgAction, Parser};
use dojo_metrics::{metrics_process, prometheus_exporter};
use dojo_utils::parse::{parse_socket_address, parse_url};
use dojo_world::contracts::world::WorldContractReader;
use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
use sqlx::SqlitePool;
use starknet::core::types::Felt;
use starknet::core::utils::get_selector_from_name;
use starknet::providers::jsonrpc::HttpTransport;
use starknet::providers::JsonRpcClient;
use tokio::sync::broadcast;
Expand All @@ -36,6 +38,7 @@ use torii_core::processors::store_set_record::StoreSetRecordProcessor;
use torii_core::processors::store_transaction::StoreTransactionProcessor;
use torii_core::processors::store_update_member::StoreUpdateMemberProcessor;
use torii_core::processors::store_update_record::StoreUpdateRecordProcessor;
use torii_core::processors::EventProcessor;
use torii_core::simple_broker::SimpleBroker;
use torii_core::sql::Sql;
use torii_core::types::Model;
Expand Down Expand Up @@ -172,16 +175,79 @@ async fn main() -> anyhow::Result<()> {
let world = WorldContractReader::new(args.world_address, &provider);

let db = Sql::new(pool.clone(), args.world_address).await?;

let event_processors: AHashMap<Felt, Box<dyn EventProcessor<_>>> =
AHashMap::from(
[
(
get_selector_from_name(
<RegisterModelProcessor as EventProcessor<
&Arc<JsonRpcClient<HttpTransport>>,
>>::event_key(&RegisterModelProcessor)
.as_str(),
)?,
Box::new(RegisterModelProcessor)
as Box<dyn EventProcessor<&Arc<JsonRpcClient<HttpTransport>>>>,
),
(
get_selector_from_name(
<StoreSetRecordProcessor as EventProcessor<
&Arc<JsonRpcClient<HttpTransport>>,
>>::event_key(&StoreSetRecordProcessor)
.as_str(),
)?,
Box::new(StoreSetRecordProcessor),
),
(
get_selector_from_name(
<MetadataUpdateProcessor as EventProcessor<
&Arc<JsonRpcClient<HttpTransport>>,
>>::event_key(&MetadataUpdateProcessor)
.as_str(),
)?,
Box::new(MetadataUpdateProcessor),
),
(
get_selector_from_name(
<StoreDelRecordProcessor as EventProcessor<
&Arc<JsonRpcClient<HttpTransport>>,
>>::event_key(&StoreDelRecordProcessor)
.as_str(),
)?,
Box::new(StoreDelRecordProcessor),
),
(
get_selector_from_name(
<EventMessageProcessor as EventProcessor<
&Arc<JsonRpcClient<HttpTransport>>,
>>::event_key(&EventMessageProcessor)
.as_str(),
)?,
Box::new(EventMessageProcessor),
),
(
get_selector_from_name(
<StoreUpdateRecordProcessor as EventProcessor<
&Arc<JsonRpcClient<HttpTransport>>,
>>::event_key(&StoreUpdateRecordProcessor)
.as_str(),
)?,
Box::new(StoreUpdateRecordProcessor),
),
(
get_selector_from_name(
<StoreUpdateMemberProcessor as EventProcessor<
&Arc<JsonRpcClient<HttpTransport>>,
>>::event_key(&StoreUpdateMemberProcessor)
.as_str(),
)?,
Box::new(StoreUpdateMemberProcessor),
),
],
);

Check warning on line 248 in bin/torii/src/main.rs

View check run for this annotation

Codecov / codecov/patch

bin/torii/src/main.rs#L178-L248

Added lines #L178 - L248 were not covered by tests
let processors = Processors {
event: vec![
Box::new(RegisterModelProcessor),
Box::new(StoreSetRecordProcessor),
Box::new(MetadataUpdateProcessor),
Box::new(StoreDelRecordProcessor),
Box::new(EventMessageProcessor),
Box::new(StoreUpdateRecordProcessor),
Box::new(StoreUpdateMemberProcessor),
],
event: event_processors,

Check warning on line 250 in bin/torii/src/main.rs

View check run for this annotation

Codecov / codecov/patch

bin/torii/src/main.rs#L250

Added line #L250 was not covered by tests
transaction: vec![Box::new(StoreTransactionProcessor)],
..Processors::default()
};
Expand Down
3 changes: 2 additions & 1 deletion crates/torii/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ version.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
ahash.workspace = true
anyhow.workspace = true
async-trait.workspace = true
base64.workspace = true
Expand All @@ -32,8 +33,8 @@ serde_json.workspace = true
slab = "0.4.2"
sozo-ops.workspace = true
sqlx.workspace = true
starknet.workspace = true
starknet-crypto.workspace = true
starknet.workspace = true
thiserror.workspace = true
tokio = { version = "1.32.0", features = [ "sync" ], default-features = true }
tokio-stream = "0.1.11"
Expand Down
75 changes: 45 additions & 30 deletions crates/torii/core/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::collections::BTreeMap;
use std::fmt::Debug;
use std::time::Duration;

use ahash::AHashMap;
use anyhow::Result;
use dojo_world::contracts::world::WorldContractReader;
use hashlink::LinkedHashMap;
Expand All @@ -10,26 +11,32 @@ use starknet::core::types::{
MaybePendingBlockWithTxs, PendingBlockWithTxs, ReceiptBlock, TransactionReceipt,
TransactionReceiptWithBlockInfo,
};
use starknet::core::utils::get_selector_from_name;
use starknet::providers::Provider;
use tokio::sync::broadcast::Sender;
use tokio::sync::mpsc::Sender as BoundedSender;
use tokio::time::sleep;
use tracing::{debug, error, info, trace, warn};

use crate::processors::event_message::EventMessageProcessor;
use crate::processors::{BlockProcessor, EventProcessor, TransactionProcessor};
use crate::sql::Sql;

#[allow(missing_debug_implementations)]
pub struct Processors<P: Provider + Sync> {
pub struct Processors<P: Provider + Send + Sync + std::fmt::Debug> {
pub block: Vec<Box<dyn BlockProcessor<P>>>,
pub transaction: Vec<Box<dyn TransactionProcessor<P>>>,
pub event: Vec<Box<dyn EventProcessor<P>>>,
pub event: AHashMap<Felt, Box<dyn EventProcessor<P>>>,
pub catch_all_event: Box<dyn EventProcessor<P>>,
}

impl<P: Provider + Sync> Default for Processors<P> {
impl<P: Provider + Send + Sync + std::fmt::Debug> Default for Processors<P> {
fn default() -> Self {
Self { block: vec![], event: vec![], transaction: vec![] }
Self {
block: vec![],
event: AHashMap::new(),
transaction: vec![],
catch_all_event: Box::new(EventMessageProcessor) as Box<dyn EventProcessor<P>>,
}
}
}

Expand Down Expand Up @@ -78,7 +85,7 @@ pub struct FetchPendingResult {
}

#[allow(missing_debug_implementations)]
pub struct Engine<P: Provider + Sync> {
pub struct Engine<P: Provider + Send + Sync + std::fmt::Debug> {
world: WorldContractReader<P>,
db: Sql,
provider: Box<P>,
Expand All @@ -93,7 +100,7 @@ struct UnprocessedEvent {
data: Vec<String>,
}

impl<P: Provider + Sync> Engine<P> {
impl<P: Provider + Send + Sync + std::fmt::Debug> Engine<P> {
pub fn new(
world: WorldContractReader<P>,
db: Sql,
Expand Down Expand Up @@ -574,16 +581,14 @@ impl<P: Provider + Sync> Engine<P> {
transaction_hash: Felt,
) -> Result<()> {
self.db.store_event(event_id, event, transaction_hash, block_timestamp);
let mut unprocessed = true;
for processor in &self.processors.event {
// If the processor has no event_key, means it's a catch-all processor.
// We also validate the event
if (processor.event_key().is_empty()
|| get_selector_from_name(&processor.event_key())? == event.keys[0])
&& processor.validate(event)
{
unprocessed = false;
if let Err(e) = processor
let event_key = event.keys[0];

let Some(processor) = self.processors.event.get(&event_key) else {
// if we dont have a processor for this event, we try the catch all processor
if self.processors.catch_all_event.validate(event) {
if let Err(e) = self
.processors
.catch_all_event
.process(
&self.world,
&mut self.db,
Expand All @@ -594,24 +599,34 @@ impl<P: Provider + Sync> Engine<P> {
)
.await
{
error!(target: LOG_TARGET, event_name = processor.event_key(), error = %e, "Processing event.");
error!(target: LOG_TARGET, error = %e, "Processing catch all event processor.");

Check warning on line 602 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L602

Added line #L602 was not covered by tests
}
} else {
let unprocessed_event = UnprocessedEvent {
keys: event.keys.iter().map(|k| format!("{:#x}", k)).collect(),
data: event.data.iter().map(|d| format!("{:#x}", d)).collect(),
};

trace!(
target: LOG_TARGET,
keys = ?unprocessed_event.keys,
data = ?unprocessed_event.data,
"Unprocessed event.",

Check warning on line 614 in crates/torii/core/src/engine.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/engine.rs#L614

Added line #L614 was not covered by tests
);
}
}

if unprocessed {
let unprocessed_event = UnprocessedEvent {
keys: event.keys.iter().map(|k| format!("{:#x}", k)).collect(),
data: event.data.iter().map(|d| format!("{:#x}", d)).collect(),
};
return Ok(());
};

trace!(
target: LOG_TARGET,
keys = ?unprocessed_event.keys,
data = ?unprocessed_event.data,
"Unprocessed event.",
);
// if processor.validate(event) {
if let Err(e) = processor
.process(&self.world, &mut self.db, block_number, block_timestamp, event_id, event)
.await
{
error!(target: LOG_TARGET, event_name = processor.event_key(), error = %e, "Processing event.");
}
// }

Ok(())
}
}
Loading

0 comments on commit 13d8020

Please sign in to comment.