Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
Larkooo committed Jan 15, 2025
1 parent a7cf967 commit f461cea
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 16 deletions.
22 changes: 10 additions & 12 deletions crates/torii/indexer/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,11 +600,11 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {

async fn process_tasks(&mut self) -> Result<()> {
let semaphore = Arc::new(Semaphore::new(self.config.max_concurrent_tasks));

// Process each priority level sequentially
for (priority, task_group) in std::mem::take(&mut self.tasks) {
let mut handles = Vec::new();

// Process all tasks within this priority level concurrently
for (task_id, events) in task_group {
let db = self.db.clone();
Expand All @@ -616,12 +616,13 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
handles.push(tokio::spawn(async move {
let _permit = semaphore.acquire().await?;
let mut local_db = db.clone();

// Process all events for this task sequentially
for (contract_type, event) in events {
let contract_processors = processors.get_event_processor(contract_type);
if let Some(processors) = contract_processors.get(&event.event.keys[0]) {
let processor = processors.iter()
let processor = processors
.iter()
.find(|p| p.validate(&event.event))
.expect("Must find at least one processor for the event");

Expand Down Expand Up @@ -893,24 +894,21 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
let hash = hasher.finish();
(2usize, hash) // Priority 2 (lower) for store operations
}
_ => (0, 0) // No parallelization for other events
_ => (0, 0), // No parallelization for other events
};

if task_identifier != 0 {
self.tasks
.entry(task_priority)
.or_default()
.entry(task_identifier)
.or_default()
.push((
self.tasks.entry(task_priority).or_default().entry(task_identifier).or_default().push(
(
contract_type,
ParallelizedEvent {
event_id: event_id.to_string(),
event: event.clone(),
block_number,
block_timestamp,
},
));
),
);
} else {
// Process non-parallelized events immediately
// if we dont have a task identifier, we process the event immediately
Expand Down
2 changes: 1 addition & 1 deletion crates/torii/indexer/src/processors/store_del_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ where
"Failed to retrieve model with selector {:#x}: {}",
event.selector,
e
))
));
}
};

Expand Down
2 changes: 1 addition & 1 deletion crates/torii/indexer/src/processors/store_set_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ where
"Failed to retrieve model with selector {:#x}: {}",
event.selector,
e
))
));
}
};

Expand Down
2 changes: 1 addition & 1 deletion crates/torii/indexer/src/processors/store_update_member.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ where
"Failed to retrieve model with selector {:#x}: {}",
event.selector,
e
))
));
}
};

Expand Down
2 changes: 1 addition & 1 deletion crates/torii/indexer/src/processors/store_update_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ where
"Failed to retrieve model with selector {:#x}: {}",
event.selector,
e
))
));
}
};

Expand Down

0 comments on commit f461cea

Please sign in to comment.