Skip to content

Commit

Permalink
refactor: subscription updates ordered (#2507)
Browse files Browse the repository at this point in the history
* refactor: subscription updates ordered

* fmt

* clippy
  • Loading branch information
Larkooo authored Oct 9, 2024
1 parent 975f3e4 commit 42b1568
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 50 deletions.
43 changes: 29 additions & 14 deletions crates/torii/grpc/src/server/subscriptions/entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use futures::Stream;
use futures_util::StreamExt;
use rand::Rng;
use starknet::core::types::Felt;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::mpsc::{
channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender,
};
use tokio::sync::RwLock;
use torii_core::error::{Error, ParseError};
use torii_core::simple_broker::SimpleBroker;
Expand Down Expand Up @@ -77,20 +79,36 @@ impl EntityManager {
#[must_use = "Service does nothing unless polled"]
#[allow(missing_debug_implementations)]
pub struct Service {
subs_manager: Arc<EntityManager>,
simple_broker: Pin<Box<dyn Stream<Item = OptimisticEntity> + Send>>,
entity_sender: UnboundedSender<OptimisticEntity>,
}

impl Service {
pub fn new(subs_manager: Arc<EntityManager>) -> Self {
Self {
subs_manager,
let (entity_sender, entity_receiver) = unbounded_channel();
let service = Self {
simple_broker: Box::pin(SimpleBroker::<OptimisticEntity>::subscribe()),
}
entity_sender,
};

tokio::spawn(Self::publish_updates(subs_manager, entity_receiver));

service
}

async fn publish_updates(
subs: Arc<EntityManager>,
mut entity_receiver: UnboundedReceiver<OptimisticEntity>,
) {
while let Some(entity) = entity_receiver.recv().await {
if let Err(e) = Self::process_entity_update(&subs, &entity).await {
error!(target = LOG_TARGET, error = %e, "Processing entity update.");
}
}
}

async fn process_entity_update(
subs: &Arc<EntityManager>,
entity: &OptimisticEntity,
) -> Result<(), Error> {
let mut closed_stream = Vec::new();
Expand Down Expand Up @@ -217,16 +235,13 @@ impl Service {
impl Future for Service {
type Output = ();

fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
let pin = self.get_mut();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();

while let Poll::Ready(Some(entity)) = pin.simple_broker.poll_next_unpin(cx) {
let subs = Arc::clone(&pin.subs_manager);
tokio::spawn(async move {
if let Err(e) = Service::publish_updates(subs, &entity).await {
error!(target = LOG_TARGET, error = %e, "Publishing entity update.");
}
});
while let Poll::Ready(Some(entity)) = this.simple_broker.poll_next_unpin(cx) {
if let Err(e) = this.entity_sender.send(entity) {
error!(target = LOG_TARGET, error = %e, "Sending entity update to processor.");
}
}

Poll::Pending
Expand Down
36 changes: 26 additions & 10 deletions crates/torii/grpc/src/server/subscriptions/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use futures::Stream;
use futures_util::StreamExt;
use rand::Rng;
use starknet::core::types::Felt;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::mpsc::{
channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender,
};
use tokio::sync::RwLock;
use torii_core::error::{Error, ParseError};
use torii_core::simple_broker::SimpleBroker;
Expand Down Expand Up @@ -62,16 +64,33 @@ impl EventManager {
#[must_use = "Service does nothing unless polled"]
#[allow(missing_debug_implementations)]
pub struct Service {
subs_manager: Arc<EventManager>,
simple_broker: Pin<Box<dyn Stream<Item = Event> + Send>>,
event_sender: UnboundedSender<Event>,
}

impl Service {
pub fn new(subs_manager: Arc<EventManager>) -> Self {
Self { subs_manager, simple_broker: Box::pin(SimpleBroker::<Event>::subscribe()) }
let (event_sender, event_receiver) = unbounded_channel();
let service =
Self { simple_broker: Box::pin(SimpleBroker::<Event>::subscribe()), event_sender };

tokio::spawn(Self::publish_updates(subs_manager, event_receiver));

service
}

async fn publish_updates(
subs: Arc<EventManager>,
mut event_receiver: UnboundedReceiver<Event>,
) {
while let Some(event) = event_receiver.recv().await {
if let Err(e) = Self::process_event(&subs, &event).await {
error!(target = LOG_TARGET, error = %e, "Processing event update.");
}
}
}

async fn publish_updates(subs: Arc<EventManager>, event: &Event) -> Result<(), Error> {
async fn process_event(subs: &Arc<EventManager>, event: &Event) -> Result<(), Error> {
let mut closed_stream = Vec::new();
let keys = event
.keys
Expand Down Expand Up @@ -151,12 +170,9 @@ impl Future for Service {
let pin = self.get_mut();

while let Poll::Ready(Some(event)) = pin.simple_broker.poll_next_unpin(cx) {
let subs = Arc::clone(&pin.subs_manager);
tokio::spawn(async move {
if let Err(e) = Service::publish_updates(subs, &event).await {
error!(target = LOG_TARGET, error = %e, "Publishing events update.");
}
});
if let Err(e) = pin.event_sender.send(event) {
error!(target = LOG_TARGET, error = %e, "Sending event to processor.");
}
}

Poll::Pending
Expand Down
41 changes: 27 additions & 14 deletions crates/torii/grpc/src/server/subscriptions/event_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use futures::Stream;
use futures_util::StreamExt;
use rand::Rng;
use starknet::core::types::Felt;
use tokio::sync::mpsc::{channel, Receiver};
use tokio::sync::mpsc::{channel, unbounded_channel, Receiver, UnboundedReceiver, UnboundedSender};
use tokio::sync::RwLock;
use torii_core::error::{Error, ParseError};
use torii_core::simple_broker::SimpleBroker;
Expand Down Expand Up @@ -71,20 +71,36 @@ impl EventMessageManager {
#[must_use = "Service does nothing unless polled"]
#[allow(missing_debug_implementations)]
pub struct Service {
subs_manager: Arc<EventMessageManager>,
simple_broker: Pin<Box<dyn Stream<Item = OptimisticEventMessage> + Send>>,
event_sender: UnboundedSender<OptimisticEventMessage>,
}

impl Service {
pub fn new(subs_manager: Arc<EventMessageManager>) -> Self {
Self {
subs_manager,
let (event_sender, event_receiver) = unbounded_channel();
let service = Self {
simple_broker: Box::pin(SimpleBroker::<OptimisticEventMessage>::subscribe()),
}
event_sender,
};

tokio::spawn(Self::publish_updates(subs_manager, event_receiver));

service
}

async fn publish_updates(
subs: Arc<EventMessageManager>,
mut event_receiver: UnboundedReceiver<OptimisticEventMessage>,
) {
while let Some(event) = event_receiver.recv().await {
if let Err(e) = Self::process_event_update(&subs, &event).await {
error!(target = LOG_TARGET, error = %e, "Processing event update.");
}
}
}

async fn process_event_update(
subs: &Arc<EventMessageManager>,
entity: &OptimisticEventMessage,
) -> Result<(), Error> {
let mut closed_stream = Vec::new();
Expand Down Expand Up @@ -195,16 +211,13 @@ impl Service {
impl Future for Service {
type Output = ();

fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
let pin = self.get_mut();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();

while let Poll::Ready(Some(entity)) = pin.simple_broker.poll_next_unpin(cx) {
let subs = Arc::clone(&pin.subs_manager);
tokio::spawn(async move {
if let Err(e) = Service::publish_updates(subs, &entity).await {
error!(target = LOG_TARGET, error = %e, "Publishing entity update.");
}
});
while let Poll::Ready(Some(event)) = this.simple_broker.poll_next_unpin(cx) {
if let Err(e) = this.event_sender.send(event) {
error!(target = LOG_TARGET, error = %e, "Sending event update to processor.");
}
}

Poll::Pending
Expand Down
42 changes: 30 additions & 12 deletions crates/torii/grpc/src/server/subscriptions/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use futures::{Stream, StreamExt};
use rand::Rng;
use sqlx::{Pool, Sqlite};
use starknet::core::types::Felt;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::mpsc::{
channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender,
};
use tokio::sync::RwLock;
use torii_core::error::{Error, ParseError};
use torii_core::simple_broker::SimpleBroker;
Expand Down Expand Up @@ -81,17 +83,36 @@ impl IndexerManager {
#[must_use = "Service does nothing unless polled"]
#[allow(missing_debug_implementations)]
pub struct Service {
subs_manager: Arc<IndexerManager>,
simple_broker: Pin<Box<dyn Stream<Item = ContractUpdated> + Send>>,
update_sender: UnboundedSender<ContractUpdated>,
}

impl Service {
pub fn new(subs_manager: Arc<IndexerManager>) -> Self {
Self { subs_manager, simple_broker: Box::pin(SimpleBroker::<ContractUpdated>::subscribe()) }
let (update_sender, update_receiver) = unbounded_channel();
let service = Self {
simple_broker: Box::pin(SimpleBroker::<ContractUpdated>::subscribe()),
update_sender,
};

tokio::spawn(Self::publish_updates(subs_manager, update_receiver));

service
}

async fn publish_updates(
subs: Arc<IndexerManager>,
mut update_receiver: UnboundedReceiver<ContractUpdated>,
) {
while let Some(update) = update_receiver.recv().await {
if let Err(e) = Self::process_update(&subs, &update).await {
error!(target = LOG_TARGET, error = %e, "Processing indexer update.");
}
}
}

async fn process_update(
subs: &Arc<IndexerManager>,
update: &ContractUpdated,
) -> Result<(), Error> {
let mut closed_stream = Vec::new();
Expand Down Expand Up @@ -127,16 +148,13 @@ impl Service {
impl Future for Service {
type Output = ();

fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
let pin = self.get_mut();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();

while let Poll::Ready(Some(event)) = pin.simple_broker.poll_next_unpin(cx) {
let subs = Arc::clone(&pin.subs_manager);
tokio::spawn(async move {
if let Err(e) = Service::publish_updates(subs, &event).await {
error!(target = LOG_TARGET, error = %e, "Publishing indexer update.");
}
});
while let Poll::Ready(Some(update)) = this.simple_broker.poll_next_unpin(cx) {
if let Err(e) = this.update_sender.send(update) {
error!(target = LOG_TARGET, error = %e, "Sending indexer update to processor.");
}
}

Poll::Pending
Expand Down

0 comments on commit 42b1568

Please sign in to comment.