Skip to content

Commit

Permalink
Remove Queue, rename QueueState to EventLoop, add some comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jprochazk committed Jan 9, 2025
1 parent 8c1a78e commit 6aaa103
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 25 deletions.
41 changes: 16 additions & 25 deletions crates/store/re_grpc_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ enum Event {
Message(LogMsgProto),
}

struct QueueState {
/// Main event loop for the server, which runs in its own task.
///
/// Handles message history, and broadcasts messages to clients.
struct EventLoop {
server_memory_limit: MemoryLimit,

/// New messages are broadcast to all clients.
Expand All @@ -43,7 +46,7 @@ struct QueueState {
persistent_message_queue: VecDeque<LogMsgProto>,
}

impl QueueState {
impl EventLoop {
fn new(server_memory_limit: MemoryLimit, event_rx: mpsc::Receiver<Event>) -> Self {
Self {
server_memory_limit,
Expand Down Expand Up @@ -165,25 +168,25 @@ fn message_size(msg: &LogMsgProto) -> u64 {
msg.total_size_bytes()
}

struct Queue {
_task_handle: tokio::task::JoinHandle<()>,
pub struct MessageProxy {
_queue_task_handle: tokio::task::JoinHandle<()>,
event_tx: mpsc::Sender<Event>,
}

impl Queue {
fn spawn(server_memory_limit: MemoryLimit) -> Self {
impl MessageProxy {
pub fn new(server_memory_limit: MemoryLimit) -> Self {
// Channel capacity is completely arbitrary.
// We just want something large enough to handle bursts of messages.
let (event_tx, event_rx) = mpsc::channel(1024);

let task_handle = tokio::spawn(async move {
QueueState::new(server_memory_limit, event_rx)
EventLoop::new(server_memory_limit, event_rx)
.run_in_place()
.await;
});

Self {
_task_handle: task_handle,
_queue_task_handle: task_handle,
event_tx,
}
}
Expand All @@ -192,7 +195,7 @@ impl Queue {
self.event_tx.send(Event::Message(msg)).await.ok();
}

async fn new_client_stream(&self) -> MessageStream {
async fn new_client_stream(&self) -> LogMsgStream {
let (sender, receiver) = oneshot::channel();
if let Err(err) = self.event_tx.send(Event::NewClient(sender)).await {
re_log::error!("Error initializing new client: {err}");
Expand All @@ -218,19 +221,7 @@ impl Queue {
}
}

pub struct MessageProxy {
queue: Queue,
}

impl MessageProxy {
pub fn new(server_memory_limit: MemoryLimit) -> Self {
Self {
queue: Queue::spawn(server_memory_limit),
}
}
}

type MessageStream = Pin<Box<dyn Stream<Item = tonic::Result<LogMsgProto>> + Send>>;
type LogMsgStream = Pin<Box<dyn Stream<Item = tonic::Result<LogMsgProto>> + Send>>;

#[tonic::async_trait]
impl message_proxy_server::MessageProxy for MessageProxy {
Expand All @@ -242,7 +233,7 @@ impl message_proxy_server::MessageProxy for MessageProxy {
loop {
match stream.message().await {
Ok(Some(msg)) => {
self.queue.push(msg).await;
self.push(msg).await;
}
Ok(None) => {
// Connection was closed
Expand All @@ -258,13 +249,13 @@ impl message_proxy_server::MessageProxy for MessageProxy {
Ok(tonic::Response::new(Empty {}))
}

type ReadMessagesStream = MessageStream;
type ReadMessagesStream = LogMsgStream;

async fn read_messages(
&self,
_: tonic::Request<Empty>,
) -> tonic::Result<tonic::Response<Self::ReadMessagesStream>> {
Ok(tonic::Response::new(self.queue.new_client_stream().await))
Ok(tonic::Response::new(self.new_client_stream().await))
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/store/re_protos/src/v0/rerun.log_msg.v0.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions crates/store/re_protos/src/v0/rerun.sdk_comms.v0.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 6aaa103

Please sign in to comment.