Skip to content

Commit

Permalink
WebSocket server now indefinitely keeps track of non-data RPC commands (
Browse files Browse the repository at this point in the history
#8146)

* Fixes #5344
  • Loading branch information
teh-cmc authored Nov 14, 2024
1 parent 1a30934 commit 00309df
Showing 1 changed file with 27 additions and 1 deletion.
28 changes: 27 additions & 1 deletion crates/store/re_ws_comms/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,17 @@ use crate::{server_url, RerunServerError, RerunServerPort};
struct MessageQueue {
server_memory_limit: MemoryLimit,
messages: VecDeque<Vec<u8>>,

/// Never garbage collected.
messages_static: VecDeque<Vec<u8>>,
}

impl MessageQueue {
pub fn new(server_memory_limit: MemoryLimit) -> Self {
Self {
server_memory_limit,
messages: Default::default(),
messages_static: Default::default(),
}
}

Expand All @@ -43,6 +47,15 @@ impl MessageQueue {
self.messages.push_back(msg);
}

/// Messages pushed using this method will stay around indefinitely.
///
/// Useful e.g. for `SetStoreInfo` messages, so that clients late to the party actually get a
/// chance of receiving them.
pub fn push_static(&mut self, msg: Vec<u8>) {
self.gc_if_using_too_much_ram();
self.messages_static.push_back(msg);
}

fn gc_if_using_too_much_ram(&mut self) {
re_tracing::profile_function!();

Expand Down Expand Up @@ -365,7 +378,13 @@ impl ReceiveSetBroadcaster {
}
});

inner.history.push(msg);
let msg_is_data = matches!(data, LogMsg::ArrowMsg(_, _));
if msg_is_data {
inner.history.push(msg);
} else {
// Keep non-data commands around for clients late to the party.
inner.history.push_static(msg);
}
}

re_smart_channel::SmartMessagePayload::Flush { on_flush_done } => {
Expand Down Expand Up @@ -395,6 +414,13 @@ impl ReceiveSetBroadcaster {
// Meaning that if a new one connects, we stall the old connections until we have sent all messages to this one.
let mut inner = self.inner.lock();

for msg in &inner.history.messages_static {
if let Err(err) = client.send(tungstenite::Message::Binary(msg.clone())) {
re_log::warn!("Error sending static message to web socket client: {err}");
return;
}
}

for msg in &inner.history.messages {
if let Err(err) = client.send(tungstenite::Message::Binary(msg.clone())) {
re_log::warn!("Error sending message to web socket client: {err}");
Expand Down

0 comments on commit 00309df

Please sign in to comment.