From 6aaa1033de5d160d6fe92354b68a9f0115ab26fb Mon Sep 17 00:00:00 2001 From: jprochazk Date: Thu, 9 Jan 2025 18:03:04 +0100 Subject: [PATCH] Remove `Queue`, rename `QueueState` to `EventLoop`, add some comments --- crates/store/re_grpc_server/src/lib.rs | 41 ++++++++----------- .../re_protos/src/v0/rerun.log_msg.v0.rs | 1 + .../re_protos/src/v0/rerun.sdk_comms.v0.rs | 4 ++ 3 files changed, 21 insertions(+), 25 deletions(-) diff --git a/crates/store/re_grpc_server/src/lib.rs b/crates/store/re_grpc_server/src/lib.rs index 17011ddc4b11..020ba2a4559e 100644 --- a/crates/store/re_grpc_server/src/lib.rs +++ b/crates/store/re_grpc_server/src/lib.rs @@ -24,7 +24,10 @@ enum Event { Message(LogMsgProto), } -struct QueueState { +/// Main event loop for the server, which runs in its own task. +/// +/// Handles message history, and broadcasts messages to clients. +struct EventLoop { server_memory_limit: MemoryLimit, /// New messages are broadcast to all clients. @@ -43,7 +46,7 @@ struct QueueState { persistent_message_queue: VecDeque, } -impl QueueState { +impl EventLoop { fn new(server_memory_limit: MemoryLimit, event_rx: mpsc::Receiver) -> Self { Self { server_memory_limit, @@ -165,25 +168,25 @@ fn message_size(msg: &LogMsgProto) -> u64 { msg.total_size_bytes() } -struct Queue { - _task_handle: tokio::task::JoinHandle<()>, +pub struct MessageProxy { + _queue_task_handle: tokio::task::JoinHandle<()>, event_tx: mpsc::Sender, } -impl Queue { - fn spawn(server_memory_limit: MemoryLimit) -> Self { +impl MessageProxy { + pub fn new(server_memory_limit: MemoryLimit) -> Self { // Channel capacity is completely arbitrary. // We just want something large enough to handle bursts of messages. let (event_tx, event_rx) = mpsc::channel(1024); let task_handle = tokio::spawn(async move { - QueueState::new(server_memory_limit, event_rx) + EventLoop::new(server_memory_limit, event_rx) .run_in_place() .await; }); Self { - _task_handle: task_handle, + _queue_task_handle: task_handle, event_tx, } } @@ -192,7 +195,7 @@ impl Queue { self.event_tx.send(Event::Message(msg)).await.ok(); } - async fn new_client_stream(&self) -> MessageStream { + async fn new_client_stream(&self) -> LogMsgStream { let (sender, receiver) = oneshot::channel(); if let Err(err) = self.event_tx.send(Event::NewClient(sender)).await { re_log::error!("Error initializing new client: {err}"); @@ -218,19 +221,7 @@ impl Queue { } } -pub struct MessageProxy { - queue: Queue, -} - -impl MessageProxy { - pub fn new(server_memory_limit: MemoryLimit) -> Self { - Self { - queue: Queue::spawn(server_memory_limit), - } - } -} - -type MessageStream = Pin> + Send>>; +type LogMsgStream = Pin> + Send>>; #[tonic::async_trait] impl message_proxy_server::MessageProxy for MessageProxy { @@ -242,7 +233,7 @@ impl message_proxy_server::MessageProxy for MessageProxy { loop { match stream.message().await { Ok(Some(msg)) => { - self.queue.push(msg).await; + self.push(msg).await; } Ok(None) => { // Connection was closed @@ -258,13 +249,13 @@ impl message_proxy_server::MessageProxy for MessageProxy { Ok(tonic::Response::new(Empty {})) } - type ReadMessagesStream = MessageStream; + type ReadMessagesStream = LogMsgStream; async fn read_messages( &self, _: tonic::Request, ) -> tonic::Result> { - Ok(tonic::Response::new(self.queue.new_client_stream().await)) + Ok(tonic::Response::new(self.new_client_stream().await)) } } diff --git a/crates/store/re_protos/src/v0/rerun.log_msg.v0.rs b/crates/store/re_protos/src/v0/rerun.log_msg.v0.rs index 19744af04354..88dcae5534a3 100644 --- a/crates/store/re_protos/src/v0/rerun.log_msg.v0.rs +++ b/crates/store/re_protos/src/v0/rerun.log_msg.v0.rs @@ -1,4 +1,5 @@ // This file is @generated by prost-build. +/// TODO(#8631): Remove `LogMsg` #[derive(Clone, PartialEq, ::prost::Message)] pub struct LogMsg { #[prost(oneof = "log_msg::Msg", tags = "1, 2, 3")] diff --git a/crates/store/re_protos/src/v0/rerun.sdk_comms.v0.rs b/crates/store/re_protos/src/v0/rerun.sdk_comms.v0.rs index aa77fa3666e9..ac4c4e6082f4 100644 --- a/crates/store/re_protos/src/v0/rerun.sdk_comms.v0.rs +++ b/crates/store/re_protos/src/v0/rerun.sdk_comms.v0.rs @@ -99,6 +99,8 @@ pub mod message_proxy_client { self.inner = self.inner.max_encoding_message_size(limit); self } + /// TODO(jan): Would it be more efficient to send a "message batch" instead of individual messages? + /// It may allow us to amortize the overhead of the gRPC protocol. pub async fn write_messages( &mut self, request: impl tonic::IntoStreamingRequest< @@ -155,6 +157,8 @@ pub mod message_proxy_server { /// Generated trait containing gRPC methods that should be implemented for use with MessageProxyServer. #[async_trait] pub trait MessageProxy: std::marker::Send + std::marker::Sync + 'static { + /// TODO(jan): Would it be more efficient to send a "message batch" instead of individual messages? + /// It may allow us to amortize the overhead of the gRPC protocol. async fn write_messages( &self, request: tonic::Request>,