Skip to content

Commit

Permalink
Health in TaskCenter
Browse files Browse the repository at this point in the history
  • Loading branch information
AhmedSoliman committed Feb 4, 2025
1 parent df6e6f5 commit a8bbd8a
Show file tree
Hide file tree
Showing 14 changed files with 113 additions and 109 deletions.
7 changes: 4 additions & 3 deletions crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,13 +295,14 @@ impl<T: TransportConnect> Service<T> {
)?;

tokio::select! {
_ = self.run_inner() => {
unreachable!("Cluster controller service has terminated unexpectedly.");
}
biased;
_ = cancellation_watcher() => {
health_status.update(AdminStatus::Unknown);
Ok(())
}
_ = self.run_inner() => {
unreachable!("Cluster controller service has terminated unexpectedly.");
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion crates/bifrost/src/appender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::time::{Duration, Instant};

use tracing::{debug, info, instrument, trace, warn};

use restate_core::{Metadata, TargetVersion};
use restate_core::{Metadata, TargetVersion, TaskCenter};
use restate_futures_util::overdue::OverdueLoggingExt;
use restate_types::config::Configuration;
use restate_types::live::Live;
Expand Down Expand Up @@ -219,6 +219,7 @@ impl Appender {
let log_metadata_version = Metadata::with_current(|m| m.logs_version());
if start.elapsed() > auto_recovery_threshold
&& error_recovery_strategy >= ErrorRecoveryStrategy::ExtendChainAllowed
&& !TaskCenter::is_shutdown_requested()
{
// taking the matter into our own hands
let admin = BifrostAdmin::new(bifrost_inner);
Expand Down
16 changes: 10 additions & 6 deletions crates/core/src/network/message_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,18 @@ use std::sync::Arc;

use async_trait::async_trait;
use futures::Stream;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tracing::{trace, warn};

use restate_types::net::codec::{Targeted, WireDecode};
use restate_types::net::CodecError;
use restate_types::net::ProtocolVersion;
use restate_types::net::TargetName;
use restate_types::protobuf::node::message::BinaryMessage;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tracing::warn;

use crate::is_cancellation_requested;

use super::{Incoming, RouterError};
use crate::TaskCenter;

pub type MessageStream<T> = Pin<Box<dyn Stream<Item = Incoming<T>> + Send + Sync + 'static>>;

Expand Down Expand Up @@ -247,12 +246,17 @@ where
message.try_map(|mut m| <M as WireDecode>::decode(&mut m.payload, protocol_version))?;
if let Err(e) = self.sender.send(message).await {
// Can be benign if we are shutting down
if !is_cancellation_requested() {
if !TaskCenter::is_shutdown_requested() {
warn!(
"Failed to send message for target {} to stream: {}",
M::TARGET,
e
);
} else {
trace!(
"Blackholed message {} since handler stream has been dropped",
M::TARGET
);
}
}
Ok(())
Expand Down
15 changes: 14 additions & 1 deletion crates/core/src/task_center.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub use builder::*;
pub use extensions::*;
pub use handle::*;
pub use monitoring::*;
use restate_types::health::{Health, NodeStatus};
pub use runtime::*;
pub use task::*;
pub use task_kind::*;
Expand Down Expand Up @@ -122,6 +123,16 @@ impl TaskCenter {
Self::with_current(|tc| tc.try_set_global_metadata(metadata))
}

/// Returns true if the task center was requested to shutdown
pub fn is_shutdown_requested() -> bool {
Self::with_current(|tc| tc.is_shutdown_requested())
}

/// Returns an error if a shutdown has been requested.
pub fn check_shutdown() -> Result<(), ShutdownError> {
Self::with_current(|tc| tc.check_shutdown())
}

/// Launch a new task
#[track_caller]
pub fn spawn<F>(kind: TaskKind, name: &'static str, future: F) -> Result<TaskId, ShutdownError>
Expand Down Expand Up @@ -277,6 +288,7 @@ struct TaskCenterInner {
current_exit_code: AtomicI32,
managed_tasks: Mutex<HashMap<TaskId, Arc<Task>>>,
global_metadata: OnceLock<Metadata>,
health: Health,
root_task_context: TaskContext,
}

Expand Down Expand Up @@ -313,6 +325,7 @@ impl TaskCenterInner {
managed_runtimes: Mutex::new(HashMap::with_capacity(64)),
root_task_context,
pause_time,
health: Health::default(),
}
}

Expand Down Expand Up @@ -591,7 +604,6 @@ impl TaskCenterInner {
root_future(),
)));

debug!("Runtime {} completed", runtime_name);
drop(rt_handle);
tc.drop_runtime(runtime_name);

Expand Down Expand Up @@ -776,6 +788,7 @@ impl TaskCenterInner {
// already shutting down....
return;
}
self.health.node_status().merge(NodeStatus::ShuttingDown);
let start = Instant::now();
self.current_exit_code.store(exit_code, Ordering::Relaxed);

Expand Down
25 changes: 24 additions & 1 deletion crates/core/src/task_center/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ use std::future::Future;
use std::sync::atomic::Ordering;
use std::sync::Arc;

use restate_types::identifiers::PartitionId;
use tokio_util::sync::CancellationToken;
use tracing::{instrument, Instrument};

use restate_types::health::{Health, NodeStatus};
use restate_types::identifiers::PartitionId;

use crate::{Metadata, ShutdownError};

use super::{
Expand Down Expand Up @@ -57,9 +59,30 @@ impl Handle {
/// Attempt to set the global metadata handle. This should be called once
/// at the startup of the node.
pub fn try_set_global_metadata(&self, metadata: Metadata) -> bool {
self.inner
.health
.node_status()
.merge(NodeStatus::StartingUp);
self.inner.try_set_global_metadata(metadata)
}

pub fn health(&self) -> &Health {
&self.inner.health
}

/// Returns true if the task center was requested to shutdown
pub fn is_shutdown_requested(&self) -> bool {
self.inner.shutdown_requested.load(Ordering::Relaxed)
}

/// Returns an error if a shutdown has been requested.
pub fn check_shutdown(&self) -> Result<(), ShutdownError> {
if self.is_shutdown_requested() {
return Err(ShutdownError);
}
Ok(())
}

/// Sets the current task_center but doesn't create a task. Use this when you need to run a
/// closure within task_center scope.
pub fn run_sync<F, O>(&self, f: F) -> O
Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/task_center/task_kind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ pub enum TaskKind {
#[strum(props(OnCancel = "abort", runtime = "ingress"))]
IngressServer,
RoleRunner,
/// Cluster controller is the first thing that gets stopped when the server is shut down
ClusterController,
SystemService,
#[strum(props(OnCancel = "abort", runtime = "ingress"))]
Ingress,
Expand Down
51 changes: 17 additions & 34 deletions crates/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,15 @@ use restate_core::network::{
GrpcConnector, MessageRouterBuilder, NetworkServerBuilder, Networking,
};
use restate_core::partitions::{spawn_partition_routing_refresher, PartitionRoutingRefresher};
use restate_core::{cancellation_watcher, Metadata, TaskKind};
use restate_core::{spawn_metadata_manager, MetadataBuilder, MetadataManager, TaskCenter};
use restate_core::{Metadata, TaskKind};
#[cfg(feature = "replicated-loglet")]
use restate_log_server::LogServerService;
use restate_metadata_server::{
BoxedMetadataStoreService, MetadataServer, MetadataStoreClient, ReadModifyWriteError,
};
use restate_types::config::{CommonOptions, Configuration};
use restate_types::errors::GenericError;
use restate_types::health::Health;
use restate_types::live::Live;
use restate_types::logs::metadata::{Logs, LogsConfiguration, ProviderConfiguration};
#[cfg(feature = "replicated-loglet")]
Expand All @@ -52,7 +51,7 @@ use restate_types::nodes_config::{
};
use restate_types::partition_table::{PartitionReplication, PartitionTable, PartitionTableBuilder};
use restate_types::protobuf::common::{
AdminStatus, IngressStatus, LogServerStatus, NodeRpcStatus, NodeStatus, WorkerStatus,
AdminStatus, IngressStatus, LogServerStatus, NodeRpcStatus, WorkerStatus,
};
use restate_types::storage::StorageEncode;
use restate_types::{GenerationalNodeId, Version, Versioned};
Expand Down Expand Up @@ -115,7 +114,6 @@ pub enum BuildError {
}

pub struct Node {
health: Health,
server_builder: NetworkServerBuilder,
updateable_config: Live<Configuration>,
metadata_manager: MetadataManager,
Expand All @@ -134,8 +132,6 @@ pub struct Node {

impl Node {
pub async fn create(updateable_config: Live<Configuration>) -> Result<Self, BuildError> {
let health = Health::default();
health.node_status().update(NodeStatus::StartingUp);
let mut server_builder = NetworkServerBuilder::default();
let config = updateable_config.pinned();

Expand Down Expand Up @@ -164,7 +160,7 @@ impl Node {
.clone()
.map(|config| &config.metadata_server.rocksdb)
.boxed(),
health.metadata_server_status(),
TaskCenter::with_current(|tc| tc.health().metadata_server_status()),
Some(metadata_writer),
&mut server_builder,
)
Expand Down Expand Up @@ -214,7 +210,7 @@ impl Node {
let log_server = if config.has_role(Role::LogServer) {
Some(
LogServerService::create(
health.log_server_status(),
TaskCenter::with_current(|tc| tc.health().log_server_status()),
updateable_config.clone(),
metadata.clone(),
record_cache,
Expand All @@ -230,7 +226,7 @@ impl Node {
let worker_role = if config.has_role(Role::Worker) {
Some(
WorkerRole::create(
health.worker_status(),
TaskCenter::with_current(|tc| tc.health().worker_status()),
metadata.clone(),
partition_routing_refresher.partition_routing(),
updateable_config.clone(),
Expand Down Expand Up @@ -260,7 +256,7 @@ impl Node {
.clone()
.map(|config| &config.ingress)
.boxed(),
health.ingress_status(),
TaskCenter::with_current(|tc| tc.health().ingress_status()),
networking.clone(),
metadata.updateable_schema(),
metadata.updateable_partition_table(),
Expand All @@ -274,7 +270,7 @@ impl Node {
let admin_role = if config.has_role(Role::Admin) {
Some(
AdminRole::create(
health.admin_status(),
TaskCenter::with_current(|tc| tc.health().admin_status()),
bifrost.clone(),
updateable_config.clone(),
partition_routing_refresher.partition_routing(),
Expand Down Expand Up @@ -308,7 +304,6 @@ impl Node {
.set_message_router(message_router);

Ok(Node {
health,
updateable_config,
metadata_manager,
partition_routing_refresher,
Expand Down Expand Up @@ -336,13 +331,11 @@ impl Node {

// spawn the node rpc server first to enable connecting to the metadata store
TaskCenter::spawn(TaskKind::RpcServer, "node-rpc-server", {
let health = self.health.clone();
let common_options = config.common.clone();
let connection_manager = self.networking.connection_manager().clone();
let metadata_store_client = self.metadata_store_client.clone();
async move {
NetworkServer::run(
health,
connection_manager,
self.server_builder,
common_options,
Expand All @@ -354,10 +347,8 @@ impl Node {
})?;

// wait until the node rpc server is up and running before continuing
self.health
.node_rpc_status()
.wait_for_value(NodeRpcStatus::Ready)
.await;
let node_rpc_status = TaskCenter::with_current(|tc| tc.health().node_rpc_status());
node_rpc_status.wait_for_value(NodeRpcStatus::Ready).await;

if let Some(metadata_store) = self.metadata_store_role {
TaskCenter::spawn(
Expand Down Expand Up @@ -462,55 +453,53 @@ impl Node {
}

if let Some(ingress_role) = self.ingress_role {
TaskCenter::spawn_child(TaskKind::Ingress, "ingress-http", ingress_role.run())?;
TaskCenter::spawn(TaskKind::IngressServer, "ingress-http", ingress_role.run())?;
}

self.base_role.start()?;

let node_status = self.health.node_status();
node_status.update(NodeStatus::Alive);

let my_roles = my_node_config.roles;
// Report that the node is running when all roles are ready
let _ = TaskCenter::spawn(TaskKind::Disposable, "status-report", async move {
self.health
let health = TaskCenter::with_current(|tc| tc.health().clone());
health
.node_rpc_status()
.wait_for_value(NodeRpcStatus::Ready)
.await;
trace!("Node-to-node networking is ready");
for role in my_roles {
match role {
Role::Worker => {
self.health
health
.worker_status()
.wait_for_value(WorkerStatus::Ready)
.await;
trace!("Worker role is reporting ready");
}
Role::Admin => {
self.health
health
.admin_status()
.wait_for_value(AdminStatus::Ready)
.await;
trace!("Worker role is reporting ready");
}
Role::MetadataServer => {
self.health
health
.metadata_server_status()
.wait_for(|status| status.is_running())
.await;
trace!("Metadata role is reporting ready");
}

Role::LogServer => {
self.health
health
.log_server_status()
.wait_for_value(LogServerStatus::Ready)
.await;
trace!("Log-server is reporting ready");
}
Role::HttpIngress => {
self.health
health
.ingress_status()
.wait_for_value(IngressStatus::Ready)
.await;
Expand All @@ -522,12 +511,6 @@ impl Node {
Ok(())
});

let _ = TaskCenter::spawn_child(TaskKind::Background, "node-status", async move {
cancellation_watcher().await;
node_status.update(NodeStatus::ShuttingDown);
Ok(())
});

Ok(())
}

Expand Down
Loading

0 comments on commit a8bbd8a

Please sign in to comment.