Skip to content

Commit

Permalink
[LogServer] GET_LOGLET_INFO is high priority and drain workers on shu…
Browse files Browse the repository at this point in the history
…tdown
  • Loading branch information
AhmedSoliman committed Feb 6, 2025
1 parent 3329fb1 commit 3ee5fff
Showing 1 changed file with 37 additions and 12 deletions.
49 changes: 37 additions & 12 deletions crates/log-server/src/loglet_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,14 +178,25 @@ impl<S: LogStore> LogletWorker<S> {
// this might include sending notifications of shutdown to allow graceful
// handoff
trace!(loglet_id = %self.loglet_id, "Loglet writer shutting down");
return;
break;
}
// GET_DIGEST
Some(msg) = get_digest_rx.recv() => {
self.loglet_state.notify_known_global_tail(msg.body().header.known_global_tail);
// digest responses are spawned as tasks
self.process_get_digest(msg);
}
// GET_LOGLET_INFO
Some(msg) = get_loglet_info_rx.recv() => {
self.loglet_state.notify_known_global_tail(msg.body().header.known_global_tail);
// drop response if connection is lost/congested
let peer = msg.peer();
if let Err(e) = msg.to_rpc_response(LogletInfo::new(self.loglet_state.local_tail(), self.loglet_state.trim_point(), self.loglet_state.known_global_tail())).try_send() {
debug!(?e.source, peer = %peer, "Failed to respond to GetLogletInfo message due to peer channel capacity being full");
} else {
tracing::trace!(%peer, %self.loglet_id, local_tail = ?self.loglet_state.local_tail(), known_global_tail = %self.loglet_state.known_global_tail(), "GetLogletInfo response");
}
}
Some(_) = in_flight_stores.next() => {}
// The in-flight seal (if any)
Some(Ok(_)) = &mut in_flight_seal => {
Expand Down Expand Up @@ -231,17 +242,6 @@ impl<S: LogStore> LogletWorker<S> {
}

}
// GET_LOGLET_INFO
Some(msg) = get_loglet_info_rx.recv() => {
self.loglet_state.notify_known_global_tail(msg.body().header.known_global_tail);
// drop response if connection is lost/congested
let peer = msg.peer();
if let Err(e) = msg.to_rpc_response(LogletInfo::new(self.loglet_state.local_tail(), self.loglet_state.trim_point(), self.loglet_state.known_global_tail())).try_send() {
debug!(?e.source, peer = %peer, "Failed to respond to GetLogletInfo message due to peer channel capacity being full");
} else {
tracing::trace!(%peer, %self.loglet_id, local_tail = ?self.loglet_state.local_tail(), known_global_tail = %self.loglet_state.known_global_tail(), "GetLogletInfo response");
}
}
// GET_RECORDS
Some(msg) = get_records_rx.recv() => {
self.loglet_state.notify_known_global_tail(msg.body().header.known_global_tail);
Expand Down Expand Up @@ -302,6 +302,31 @@ impl<S: LogStore> LogletWorker<S> {
}
}
}

// draining in-flight operations
drop(store_rx);
drop(release_rx);
drop(seal_rx);
drop(get_loglet_info_rx);
drop(get_records_rx);
drop(trim_rx);
drop(wait_for_tail_rx);
drop(get_digest_rx);
tracing::debug!(loglet_id = %self.loglet_id, "Draining loglet worker");
loop {
tokio::select! {
Some(Ok(_)) = &mut in_flight_seal => {
self.loglet_state.get_local_tail_watch().notify_seal();
debug!(loglet_id = %self.loglet_id, "Loglet is now sealed on this log-server node");
in_flight_seal.set(None.into());
}
Some(_) = in_flight_stores.next() => {}
Some(_) = in_flight_network_sends.next() => {}
Some(_) = waiting_for_seal.next() => {}
else => break,
}
}
tracing::debug!(loglet_id = %self.loglet_id, "loglet worker drained");
}

async fn process_store(
Expand Down

0 comments on commit 3ee5fff

Please sign in to comment.