From 6fedca53cd615f879f451831c7fef380da25befe Mon Sep 17 00:00:00 2001 From: Muhamad Awad Date: Wed, 5 Feb 2025 13:28:35 +0100 Subject: [PATCH] Implement Telemetry Metrics for Metadata Server Summary: This pull request introduces telemetry metrics to the metadata server, enhancing monitoring and performance analysis capabilities. Fixes #2605 --- Cargo.lock | 1 + .../replicated_loglet/metric_definitions.rs | 7 - crates/core/src/metadata_store.rs | 107 ++++++-- crates/core/src/metric_definitions.rs | 69 +++++- crates/core/src/task_center.rs | 10 +- crates/metadata-server/Cargo.toml | 1 + crates/metadata-server/src/grpc/handler.rs | 234 +++++++++++------- crates/metadata-server/src/lib.rs | 3 + .../metadata-server/src/metric_definitions.rs | 166 +++++++++++++ .../src/raft/network/connection_manager.rs | 32 ++- crates/metadata-server/src/raft/store.rs | 41 ++- 11 files changed, 529 insertions(+), 142 deletions(-) create mode 100644 crates/metadata-server/src/metric_definitions.rs diff --git a/Cargo.lock b/Cargo.lock index ee5b99df99..2925c17448 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6914,6 +6914,7 @@ dependencies = [ "futures", "googletest", "http 1.2.0", + "metrics", "parking_lot", "prost", "prost-dto", diff --git a/crates/bifrost/src/providers/replicated_loglet/metric_definitions.rs b/crates/bifrost/src/providers/replicated_loglet/metric_definitions.rs index ab86263bfa..cbb660df33 100644 --- a/crates/bifrost/src/providers/replicated_loglet/metric_definitions.rs +++ b/crates/bifrost/src/providers/replicated_loglet/metric_definitions.rs @@ -12,7 +12,6 @@ /// the metrics' sink. use metrics::{describe_counter, describe_histogram, Unit}; -pub(crate) const BIFROST_REPLICATED_APPEND: &str = "restate.bifrost.replicatedloglet.appends.total"; pub(crate) const BIFROST_REPLICATED_READ_CACHE_HIT: &str = "restate.bifrost.replicatedloglet.read_record_cache_hit.total"; pub(crate) const BIFROST_REPLICATED_READ_CACHE_FILTERED: &str = @@ -31,12 +30,6 @@ pub(crate) const BIFROST_SEQ_RECORDS_COMMITTED_BYTES: &str = pub(crate) const BIFROST_SEQ_STORE_DURATION: &str = "restate.bifrost.sequencer.store_duration"; pub(crate) fn describe_metrics() { - describe_counter!( - BIFROST_REPLICATED_APPEND, - Unit::Count, - "Number of append requests to bifrost's replicated loglet" - ); - describe_counter!( BIFROST_REPLICATED_READ_CACHE_HIT, Unit::Count, diff --git a/crates/core/src/metadata_store.rs b/crates/core/src/metadata_store.rs index 1679583827..77c2361689 100644 --- a/crates/core/src/metadata_store.rs +++ b/crates/core/src/metadata_store.rs @@ -11,11 +11,14 @@ pub mod providers; mod test_util; -#[cfg(any(test, feature = "test-util"))] -use crate::metadata_store::test_util::InMemoryMetadataStore; +use std::future::Future; +use std::sync::Arc; + use async_trait::async_trait; use bytes::{Bytes, BytesMut}; use bytestring::ByteString; +use tracing::debug; + use restate_types::errors::{ BoxedMaybeRetryableError, GenericError, IntoMaybeRetryable, MaybeRetryableError, }; @@ -24,9 +27,14 @@ use restate_types::nodes_config::NodesConfiguration; use restate_types::retries::RetryPolicy; use restate_types::storage::{StorageCodec, StorageDecode, StorageEncode, StorageEncodeError}; use restate_types::{flexbuffers_storage_encode_decode, Version, Versioned}; -use std::future::Future; -use std::sync::Arc; -use tracing::debug; + +#[cfg(any(test, feature = "test-util"))] +use crate::metadata_store::test_util::InMemoryMetadataStore; +use crate::metric_definitions::{ + METADATA_CLIENT_DELETE_DURATION, METADATA_CLIENT_DELETE_TOTAL, METADATA_CLIENT_GET_DURATION, + METADATA_CLIENT_GET_TOTAL, METADATA_CLIENT_GET_VERSION_DURATION, + METADATA_CLIENT_GET_VERSION_TOTAL, METADATA_CLIENT_PUT_DURATION, METADATA_CLIENT_PUT_TOTAL, +}; #[derive(Debug, thiserror::Error)] pub enum ReadError { @@ -263,6 +271,23 @@ impl MetadataStore for T { } } +macro_rules! metrics_helper { + (@counter=$counter:ident, @duration=$dur:ident, @key=$key:ident, $block:block) => {{ + let start_time = tokio::time::Instant::now(); + let key_str = $key.to_string(); + let result = $block; + metrics::histogram!($dur).record(start_time.elapsed()); + let status = if result.is_ok() { + $crate::metric_definitions::STATUS_COMPLETED + } else { + $crate::metric_definitions::STATUS_FAILED + }; + metrics::counter!($counter, "key" => key_str, "status" => status).increment(1); + + result + }}; +} + /// Metadata store client which allows storing [`Versioned`] values into a [`MetadataStore`]. #[derive(Clone)] pub struct MetadataStoreClient { @@ -299,28 +324,42 @@ impl MetadataStoreClient { &self, key: ByteString, ) -> Result, ReadError> { - let value = self.inner.get(key).await?; - - if let Some(mut versioned_value) = value { - let value = StorageCodec::decode::(&mut versioned_value.value) - .map_err(|err| ReadError::Codec(err.into()))?; - - assert_eq!( - versioned_value.version, - value.version(), - "versions must align" - ); - - Ok(Some(value)) - } else { - Ok(None) - } + metrics_helper!( + @counter=METADATA_CLIENT_GET_TOTAL, + @duration=METADATA_CLIENT_GET_DURATION, + @key=key, + { + let value = self.inner.get(key).await?; + + if let Some(mut versioned_value) = value { + let value = StorageCodec::decode::(&mut versioned_value.value) + .map_err(|err| ReadError::Codec(err.into()))?; + + assert_eq!( + versioned_value.version, + value.version(), + "versions must align" + ); + + Ok(Some(value)) + } else { + Ok(None) + } + } + ) } /// Gets the current version for the given key. If key-value pair is not present, then return /// [`None`]. pub async fn get_version(&self, key: ByteString) -> Result, ReadError> { - self.inner.get_version(key).await + metrics_helper!( + @counter=METADATA_CLIENT_GET_VERSION_TOTAL, + @duration=METADATA_CLIENT_GET_VERSION_DURATION, + @key=key, + { + self.inner.get_version(key).await + } + ) } /// Puts the versioned value under the given key following the provided precondition. If the @@ -334,10 +373,17 @@ impl MetadataStoreClient { where T: Versioned + StorageEncode, { - let versioned_value = - serialize_value(value).map_err(|err| WriteError::Codec(err.into()))?; - - self.inner.put(key, versioned_value, precondition).await + metrics_helper!( + @counter=METADATA_CLIENT_PUT_TOTAL, + @duration=METADATA_CLIENT_PUT_DURATION, + @key=key, + { + let versioned_value = + serialize_value(value).map_err(|err| WriteError::Codec(err.into()))?; + + self.inner.put(key, versioned_value, precondition).await + } + ) } /// Deletes the key-value pair for the given key following the provided precondition. If the @@ -347,7 +393,14 @@ impl MetadataStoreClient { key: ByteString, precondition: Precondition, ) -> Result<(), WriteError> { - self.inner.delete(key, precondition).await + metrics_helper!( + @counter=METADATA_CLIENT_DELETE_TOTAL, + @duration=METADATA_CLIENT_DELETE_DURATION, + @key=key, + { + self.inner.delete(key, precondition).await + } + ) } /// Gets the value under the specified key or inserts a new value if it is not present into the diff --git a/crates/core/src/metric_definitions.rs b/crates/core/src/metric_definitions.rs index 599fa0d632..0ddf78f1f2 100644 --- a/crates/core/src/metric_definitions.rs +++ b/crates/core/src/metric_definitions.rs @@ -8,15 +8,27 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use metrics::{describe_counter, Unit}; +use metrics::{describe_counter, describe_histogram, Unit}; // value of label `kind` in TC_SPAWN are defined in [`crate::TaskKind`]. pub const TC_SPAWN: &str = "restate.task_center.spawned.total"; pub const TC_FINISHED: &str = "restate.task_center.finished.total"; -// values of label `status` in TC_FINISHED -pub const TC_STATUS_COMPLETED: &str = "completed"; -pub const TC_STATUS_FAILED: &str = "failed"; +// values of label `status` in METRICS +pub const STATUS_COMPLETED: &str = "completed"; +pub const STATUS_FAILED: &str = "failed"; + +pub(crate) const METADATA_CLIENT_GET_DURATION: &str = "restate.metadata_client.get.duration"; +pub(crate) const METADATA_CLIENT_GET_VERSION_DURATION: &str = + "restate.metadata_client.get_version.duration"; +pub(crate) const METADATA_CLIENT_PUT_DURATION: &str = "restate.metadata_client.put.duration"; +pub(crate) const METADATA_CLIENT_DELETE_DURATION: &str = "restate.metadata_client.delete.duration"; + +pub(crate) const METADATA_CLIENT_GET_TOTAL: &str = "restate.metadata_client.get.total"; +pub(crate) const METADATA_CLIENT_GET_VERSION_TOTAL: &str = + "restate.metadata_client.get_version.total"; +pub(crate) const METADATA_CLIENT_PUT_TOTAL: &str = "restate.metadata_client.put.total"; +pub(crate) const METADATA_CLIENT_DELETE_TOTAL: &str = "restate.metadata_client.delete.total"; pub fn describe_metrics() { describe_counter!( @@ -24,9 +36,58 @@ pub fn describe_metrics() { Unit::Count, "Total tasks spawned by the task center" ); + describe_counter!( TC_FINISHED, Unit::Count, "Number of tasks that finished with 'status'" ); + + describe_histogram!( + METADATA_CLIENT_GET_DURATION, + Unit::Seconds, + "Metadata client get request duration in seconds" + ); + + describe_histogram!( + METADATA_CLIENT_GET_VERSION_DURATION, + Unit::Seconds, + "Metadata client get_version request duration in seconds" + ); + + describe_histogram!( + METADATA_CLIENT_PUT_DURATION, + Unit::Seconds, + "Metadata client put request duration in seconds" + ); + + describe_histogram!( + METADATA_CLIENT_DELETE_DURATION, + Unit::Seconds, + "Metadata client delete request duration in seconds" + ); + + describe_counter!( + METADATA_CLIENT_GET_TOTAL, + Unit::Count, + "Metadata client get request success count" + ); + + describe_counter!( + METADATA_CLIENT_GET_VERSION_TOTAL, + Unit::Count, + "Metadata client get_version request success count" + ); + + describe_counter!( + METADATA_CLIENT_PUT_TOTAL, + Unit::Count, + "Metadata client put request success count" + ); + + describe_counter!( + METADATA_CLIENT_DELETE_TOTAL, + Unit::Count, + "Metadata client delete request success count" + ); } diff --git a/crates/core/src/task_center.rs b/crates/core/src/task_center.rs index 146c8f9d0b..1a18a74574 100644 --- a/crates/core/src/task_center.rs +++ b/crates/core/src/task_center.rs @@ -43,9 +43,7 @@ use tracing::{debug, error, info, trace, warn}; use restate_types::identifiers::PartitionId; use restate_types::GenerationalNodeId; -use crate::metric_definitions::{ - self, TC_FINISHED, TC_SPAWN, TC_STATUS_COMPLETED, TC_STATUS_FAILED, -}; +use crate::metric_definitions::{self, STATUS_COMPLETED, STATUS_FAILED, TC_FINISHED, TC_SPAWN}; use crate::{Metadata, ShutdownError, ShutdownSourceErr}; const EXIT_CODE_FAILURE: i32 = 1; @@ -717,7 +715,7 @@ impl TaskCenterInner { match result { Ok(Ok(())) => { trace!(kind = ?task.kind(), name = ?task.name(), "Task {} exited normally", task_id); - counter!(TC_FINISHED, "kind" => kind_str, "status" => TC_STATUS_COMPLETED) + counter!(TC_FINISHED, "kind" => kind_str, "status" => STATUS_COMPLETED) .increment(1); } Ok(Err(err)) => { @@ -740,11 +738,11 @@ impl TaskCenterInner { } else { error!(kind = ?task.kind(), name = ?task.name(), "Task {} failed with: {:?}", task_id, err); } - counter!(TC_FINISHED, "kind" => kind_str, "status" => TC_STATUS_FAILED) + counter!(TC_FINISHED, "kind" => kind_str, "status" => STATUS_FAILED) .increment(1); } Err(err) => { - counter!(TC_FINISHED, "kind" => kind_str, "status" => TC_STATUS_FAILED) + counter!(TC_FINISHED, "kind" => kind_str, "status" => STATUS_FAILED) .increment(1); if should_shutdown_on_error { error!(kind = ?task.kind(), name = ?task.name(), "Shutting down: task {} panicked: {:?}", task_id, err); diff --git a/crates/metadata-server/Cargo.toml b/crates/metadata-server/Cargo.toml index 6376f2324c..94b6fdcb86 100644 --- a/crates/metadata-server/Cargo.toml +++ b/crates/metadata-server/Cargo.toml @@ -28,6 +28,7 @@ derive_more = { workspace = true } flexbuffers = { workspace = true } futures = { workspace = true } http = { workspace = true } +metrics = { workspace = true } parking_lot = { workspace = true} prost = { workspace = true } prost-dto = { workspace = true } diff --git a/crates/metadata-server/src/grpc/handler.rs b/crates/metadata-server/src/grpc/handler.rs index fcdf8eba9f..cbfbc4c4cc 100644 --- a/crates/metadata-server/src/grpc/handler.rs +++ b/crates/metadata-server/src/grpc/handler.rs @@ -8,26 +8,33 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::ops::Deref; + +use async_trait::async_trait; +use tokio::sync::{oneshot, watch}; +use tonic::{Request, Response, Status}; + +use restate_core::metadata_store::{serialize_value, Precondition}; +use restate_types::config::Configuration; +use restate_types::metadata_store::keys::NODES_CONFIG_KEY; +use restate_types::nodes_config::NodesConfiguration; +use restate_types::storage::StorageCodec; + use crate::grpc::metadata_server_svc_server::MetadataServerSvc; use crate::grpc::pb_conversions::ConversionError; use crate::grpc::{ DeleteRequest, GetRequest, GetResponse, GetVersionResponse, ProvisionRequest as ProtoProvisionRequest, ProvisionResponse, PutRequest, StatusResponse, }; +use crate::metric_definitions::{ + METADATA_SERVER_DELETE_DURATION, METADATA_SERVER_DELETE_TOTAL, METADATA_SERVER_GET_DURATION, + METADATA_SERVER_GET_TOTAL, METADATA_SERVER_GET_VERSION_DURATION, + METADATA_SERVER_GET_VERSION_TOTAL, METADATA_SERVER_PUT_DURATION, METADATA_SERVER_PUT_TOTAL, +}; use crate::{ prepare_initial_nodes_configuration, MetadataStoreRequest, MetadataStoreSummary, ProvisionError, ProvisionRequest, ProvisionSender, RequestError, RequestSender, StatusWatch, }; -use async_trait::async_trait; -use restate_core::metadata_store::{serialize_value, Precondition}; -use restate_types::config::Configuration; -use restate_types::metadata_store::keys::NODES_CONFIG_KEY; -use restate_types::nodes_config::NodesConfiguration; -use restate_types::storage::StorageCodec; -use std::ops::Deref; -use tokio::sync::{oneshot, watch}; -use tonic::{Request, Response, Status}; - /// Grpc svc handler for the metadata store. #[derive(Debug)] pub struct MetadataStoreHandler { @@ -50,104 +57,145 @@ impl MetadataStoreHandler { } } +macro_rules! metrics_helper { + (@counter=$counter:ident, @duration=$dur:ident, $block:block) => {{ + let start_time = tokio::time::Instant::now(); + let result = $block; + metrics::histogram!($dur).record(start_time.elapsed()); + let status = if result.is_ok() { + $crate::metric_definitions::STATUS_COMPLETED + } else { + $crate::metric_definitions::STATUS_FAILED + }; + + metrics::counter!($counter, "status" => status).increment(1); + + result + }}; +} + #[async_trait] impl MetadataServerSvc for MetadataStoreHandler { async fn get(&self, request: Request) -> Result, Status> { - let (result_tx, result_rx) = oneshot::channel(); - - let request = request.into_inner(); - self.request_tx - .send(MetadataStoreRequest::Get { - key: request.key.into(), - result_tx, - }) - .await - .map_err(|_| Status::unavailable("metadata store is shut down"))?; - - let result = result_rx - .await - .map_err(|_| Status::unavailable("metadata store is shut down"))??; - - Ok(Response::new(GetResponse { - value: result.map(Into::into), - })) + metrics_helper!( + @counter=METADATA_SERVER_GET_TOTAL, + @duration=METADATA_SERVER_GET_DURATION, + { + let (result_tx, result_rx) = oneshot::channel(); + + let request = request.into_inner(); + self.request_tx + .send(MetadataStoreRequest::Get { + key: request.key.into(), + result_tx, + }) + .await + .map_err(|_| Status::unavailable("metadata store is shut down"))?; + + let result = result_rx + .await + .map_err(|_| Status::unavailable("metadata store is shut down"))??; + + Ok(Response::new(GetResponse { + value: result.map(Into::into), + })) + } + ) } async fn get_version( &self, request: Request, ) -> Result, Status> { - let (result_tx, result_rx) = oneshot::channel(); - - let request = request.into_inner(); - self.request_tx - .send(MetadataStoreRequest::GetVersion { - key: request.key.into(), - result_tx, - }) - .await - .map_err(|_| Status::unavailable("metadata store is shut down"))?; - - let result = result_rx - .await - .map_err(|_| Status::unavailable("metadata store is shut down"))??; - - Ok(Response::new(GetVersionResponse { - version: result.map(Into::into), - })) + metrics_helper!( + @counter=METADATA_SERVER_GET_VERSION_TOTAL, + @duration=METADATA_SERVER_GET_VERSION_DURATION, + { + let (result_tx, result_rx) = oneshot::channel(); + + let request = request.into_inner(); + self.request_tx + .send(MetadataStoreRequest::GetVersion { + key: request.key.into(), + result_tx, + }) + .await + .map_err(|_| Status::unavailable("metadata store is shut down"))?; + + let result = result_rx + .await + .map_err(|_| Status::unavailable("metadata store is shut down"))??; + + Ok(Response::new(GetVersionResponse { + version: result.map(Into::into), + })) + } + ) } async fn put(&self, request: Request) -> Result, Status> { - let (result_tx, result_rx) = oneshot::channel(); - - let request = request.into_inner(); - self.request_tx - .send(MetadataStoreRequest::Put { - key: request.key.into(), - value: request - .value - .ok_or_else(|| Status::invalid_argument("missing value field"))? - .try_into() - .map_err(|err: ConversionError| Status::invalid_argument(err.to_string()))?, - precondition: request - .precondition - .ok_or_else(|| Status::invalid_argument("missing precondition field"))? - .try_into() - .map_err(|err: ConversionError| Status::invalid_argument(err.to_string()))?, - result_tx, - }) - .await - .map_err(|_| Status::unavailable("metadata store is shut down"))?; - - result_rx - .await - .map_err(|_| Status::unavailable("metadata store is shut down"))??; - - Ok(Response::new(())) + metrics_helper!( + @counter=METADATA_SERVER_PUT_TOTAL, + @duration=METADATA_SERVER_PUT_DURATION, + { + let (result_tx, result_rx) = oneshot::channel(); + + let request = request.into_inner(); + self.request_tx + .send(MetadataStoreRequest::Put { + key: request.key.into(), + value: request + .value + .ok_or_else(|| Status::invalid_argument("missing value field"))? + .try_into() + .map_err(|err: ConversionError| Status::invalid_argument(err.to_string()))?, + precondition: request + .precondition + .ok_or_else(|| Status::invalid_argument("missing precondition field"))? + .try_into() + .map_err(|err: ConversionError| Status::invalid_argument(err.to_string()))?, + result_tx, + }) + .await + .map_err(|_| Status::unavailable("metadata store is shut down"))?; + + result_rx + .await + .map_err(|_| Status::unavailable("metadata store is shut down"))??; + + Ok(Response::new(())) + } + ) } async fn delete(&self, request: Request) -> Result, Status> { - let (result_tx, result_rx) = oneshot::channel(); - - let request = request.into_inner(); - self.request_tx - .send(MetadataStoreRequest::Delete { - key: request.key.into(), - precondition: request - .precondition - .ok_or_else(|| Status::invalid_argument("missing precondition field"))? - .try_into() - .map_err(|err: ConversionError| Status::invalid_argument(err.to_string()))?, - result_tx, - }) - .await - .map_err(|_| Status::unavailable("metadata store is shut down"))?; - - result_rx - .await - .map_err(|_| Status::unavailable("metadata store is shut down"))??; - - Ok(Response::new(())) + metrics_helper!( + @counter=METADATA_SERVER_DELETE_TOTAL, + @duration=METADATA_SERVER_DELETE_DURATION, + { + let (result_tx, result_rx) = oneshot::channel(); + + let request = request.into_inner(); + self.request_tx + .send(MetadataStoreRequest::Delete { + key: request.key.into(), + precondition: request + .precondition + .ok_or_else(|| Status::invalid_argument("missing precondition field"))? + .try_into() + .map_err(|err: ConversionError| Status::invalid_argument(err.to_string()))?, + result_tx, + }) + .await + .map_err(|_| Status::unavailable("metadata store is shut down"))?; + + result_rx + .await + .map_err(|_| Status::unavailable("metadata store is shut down"))??; + + Ok(Response::new(())) + } + ) } async fn provision( diff --git a/crates/metadata-server/src/lib.rs b/crates/metadata-server/src/lib.rs index 8e4b9b1568..8f0c306fee 100644 --- a/crates/metadata-server/src/lib.rs +++ b/crates/metadata-server/src/lib.rs @@ -10,6 +10,7 @@ pub mod grpc; pub mod local; +mod metric_definitions; pub mod raft; mod util; @@ -241,6 +242,8 @@ pub async fn create_metadata_server( metadata_writer: Option, server_builder: &mut NetworkServerBuilder, ) -> anyhow::Result { + metric_definitions::describe_metrics(); + match metadata_server_options.kind { MetadataServerKind::Local => local::create_server( metadata_server_options, diff --git a/crates/metadata-server/src/metric_definitions.rs b/crates/metadata-server/src/metric_definitions.rs new file mode 100644 index 0000000000..66b3ba8833 --- /dev/null +++ b/crates/metadata-server/src/metric_definitions.rs @@ -0,0 +1,166 @@ +// Copyright (c) 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use metrics::{describe_counter, describe_gauge, describe_histogram, Unit}; + +// values of label `status` in METRICS +pub const STATUS_COMPLETED: &str = "completed"; +pub const STATUS_FAILED: &str = "failed"; + +pub(crate) const METADATA_SERVER_GET_DURATION: &str = "restate.metadata_server.get.duration"; +pub(crate) const METADATA_SERVER_GET_VERSION_DURATION: &str = + "restate.metadata_server.get_version.duration"; +pub(crate) const METADATA_SERVER_PUT_DURATION: &str = "restate.metadata_server.put.duration"; +pub(crate) const METADATA_SERVER_DELETE_DURATION: &str = "restate.metadata_server.delete.duration"; + +pub(crate) const METADATA_SERVER_GET_TOTAL: &str = "restate.metadata_server.get.total"; +pub(crate) const METADATA_SERVER_GET_VERSION_TOTAL: &str = + "restate.metadata_server.get_version.total"; +pub(crate) const METADATA_SERVER_PUT_TOTAL: &str = "restate.metadata_server.put.total"; +pub(crate) const METADATA_SERVER_DELETE_TOTAL: &str = "restate.metadata_server.delete.total"; + +// Raft specific metrics +pub(crate) const METADATA_SERVER_EMBEDDED_SENT_MESSAGE_TOTAL: &str = + "restate.metadata_server.embedded.sent_messages.total"; +pub(crate) const METADATA_SERVER_EMBEDDED_RECV_MESSAGE_TOTAL: &str = + "restate.metadata_server.embedded.received_messages.total"; +pub(crate) const METADATA_SERVER_EMBEDDED_SENT_MESSAGE_BYTES: &str = + "restate.metadata_server.embedded.sent_messages.bytes"; +pub(crate) const METADATA_SERVER_EMBEDDED_RECV_MESSAGE_BYTES: &str = + "restate.metadata_server.embedded.received_messages.bytes"; + +pub(crate) const METADATA_SERVER_EMBEDDED_LEADER_ID: &str = + "restate.metadata_server.embedded.leader.id"; +pub(crate) const METADATA_SERVER_EMBEDDED_SNAPSHOT_SIZE_BYTES: &str = + "restate.metadata_server.embedded.snapshot_size.bytes"; +pub(crate) const METADATA_SERVER_EMBEDDED_TERM: &str = + "restate.metadata_server.embedded.snapshot.bytes"; +pub(crate) const METADATA_SERVER_EMBEDDED_COMMITTED_LSN: &str = + "restate.metadata_server.embedded.committed_lsn"; +pub(crate) const METADATA_SERVER_EMBEDDED_APPLIED_LSN: &str = + "restate.metadata_server.embedded.applied_lsn"; +pub(crate) const METADATA_SERVER_EMBEDDED_FIRST_INDEX: &str = + "restate.metadata_server.embedded.first_index"; +pub(crate) const METADATA_SERVER_EMBEDDED_LAST_INDEX: &str = + "restate.metadata_server.embedded.last_index"; + +pub(crate) fn describe_metrics() { + describe_histogram!( + METADATA_SERVER_GET_DURATION, + Unit::Seconds, + "Metadata get request duration in seconds as measured by the metadata handler" + ); + + describe_histogram!( + METADATA_SERVER_GET_VERSION_DURATION, + Unit::Seconds, + "Metadata get_version request duration in seconds as measured by the metadata handler" + ); + + describe_histogram!( + METADATA_SERVER_PUT_DURATION, + Unit::Seconds, + "Metadata put request duration in seconds as measured by the metadata handler" + ); + + describe_histogram!( + METADATA_SERVER_DELETE_DURATION, + Unit::Seconds, + "Metadata delete request duration in seconds as measured by the metadata handler" + ); + + describe_counter!( + METADATA_SERVER_GET_TOTAL, + Unit::Count, + "Metadata get request success count as measured by the metadata handler" + ); + + describe_counter!( + METADATA_SERVER_GET_VERSION_TOTAL, + Unit::Count, + "Metadata get_version success request count as measured by the metadata handler" + ); + + describe_counter!( + METADATA_SERVER_PUT_TOTAL, + Unit::Count, + "Metadata put request success count as measured by the metadata handler" + ); + + describe_counter!( + METADATA_SERVER_DELETE_TOTAL, + Unit::Count, + "Metadata delete request success count as measured by the metadata handler" + ); + + describe_counter!( + METADATA_SERVER_EMBEDDED_SENT_MESSAGE_TOTAL, + Unit::Count, + "Raft Metadata server sent messages count" + ); + + describe_counter!( + METADATA_SERVER_EMBEDDED_RECV_MESSAGE_TOTAL, + Unit::Count, + "Raft Metadata server received messages count" + ); + + describe_counter!( + METADATA_SERVER_EMBEDDED_SENT_MESSAGE_BYTES, + Unit::Bytes, + "Raft Metadata server sent messages size in bytes" + ); + + describe_counter!( + METADATA_SERVER_EMBEDDED_RECV_MESSAGE_BYTES, + Unit::Bytes, + "Raft Metadata server received messages size in bytes" + ); + + describe_gauge!( + METADATA_SERVER_EMBEDDED_LEADER_ID, + "Raft Metadata server know leader id" + ); + + describe_gauge!( + METADATA_SERVER_EMBEDDED_SNAPSHOT_SIZE_BYTES, + Unit::Bytes, + "Raft Metadata snapshot size" + ); + + describe_gauge!( + METADATA_SERVER_EMBEDDED_TERM, + Unit::Count, + "Raft Metadata raft term number" + ); + + describe_gauge!( + METADATA_SERVER_EMBEDDED_APPLIED_LSN, + Unit::Count, + "Raft Metadata raft applied lsn" + ); + + describe_gauge!( + METADATA_SERVER_EMBEDDED_COMMITTED_LSN, + Unit::Count, + "Raft Metadata raft committed lsn" + ); + + describe_gauge!( + METADATA_SERVER_EMBEDDED_FIRST_INDEX, + Unit::Count, + "Raft Metadata raft first index" + ); + describe_gauge!( + METADATA_SERVER_EMBEDDED_LAST_INDEX, + Unit::Count, + "Raft Metadata raft last index" + ); +} diff --git a/crates/metadata-server/src/raft/network/connection_manager.rs b/crates/metadata-server/src/raft/network/connection_manager.rs index d5e56c0266..2439654541 100644 --- a/crates/metadata-server/src/raft/network/connection_manager.rs +++ b/crates/metadata-server/src/raft/network/connection_manager.rs @@ -8,8 +8,13 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use crate::metric_definitions::{ + METADATA_SERVER_EMBEDDED_RECV_MESSAGE_BYTES, METADATA_SERVER_EMBEDDED_RECV_MESSAGE_TOTAL, + METADATA_SERVER_EMBEDDED_SENT_MESSAGE_BYTES, METADATA_SERVER_EMBEDDED_SENT_MESSAGE_TOTAL, +}; use crate::raft::network::{grpc_svc, NetworkMessage}; use futures::StreamExt; +use metrics::counter; use restate_core::{cancellation_watcher, ShutdownError, TaskCenter, TaskKind}; use std::collections::HashMap; use std::io::Cursor; @@ -74,7 +79,7 @@ where return Ok(()); } - let connection = Connection::new(outgoing_tx); + let connection = Connection::new(remote_peer, outgoing_tx); guard.insert(remote_peer, connection); let reactor = ConnectionReactor { @@ -123,6 +128,11 @@ where Some(message) => { match message { Ok(message) => { + counter!(METADATA_SERVER_EMBEDDED_RECV_MESSAGE_TOTAL, "peer" => self.remote_peer.to_string()) + .increment(1); + counter!(METADATA_SERVER_EMBEDDED_RECV_MESSAGE_BYTES, "peer" => self.remote_peer.to_string()) + .increment(message.payload.len() as u64); + let mut cursor = Cursor::new(&message.payload); let message = M::deserialize(&mut cursor)?; @@ -183,18 +193,32 @@ impl ConnectionManagerInner { #[derive(Debug, Clone)] pub struct Connection { + remote_peer: u64, tx: mpsc::Sender, } impl Connection { - pub fn new(tx: mpsc::Sender) -> Self { - Connection { tx } + pub fn new(remote_peer: u64, tx: mpsc::Sender) -> Self { + Connection { remote_peer, tx } } pub fn try_send( &self, message: grpc_svc::NetworkMessage, ) -> Result<(), TrySendError> { - self.tx.try_send(message) + let size = message.payload.len(); + + self.tx.try_send(message)?; + + // note that we assume that this message is sent while in fact + // it's just queued. The actual message might still not get + // sent + + counter!(METADATA_SERVER_EMBEDDED_SENT_MESSAGE_TOTAL, "peer" => self.remote_peer.to_string()) + .increment(1); + counter!(METADATA_SERVER_EMBEDDED_SENT_MESSAGE_BYTES, "peer" => self.remote_peer.to_string()) + .increment(size as u64); + + Ok(()) } } diff --git a/crates/metadata-server/src/raft/store.rs b/crates/metadata-server/src/raft/store.rs index 171e5998bf..f4880d2e81 100644 --- a/crates/metadata-server/src/raft/store.rs +++ b/crates/metadata-server/src/raft/store.rs @@ -10,6 +10,12 @@ use crate::grpc::pb_conversions::ConversionError; use crate::grpc::MetadataServerSnapshot; +use crate::metric_definitions::{ + METADATA_SERVER_EMBEDDED_APPLIED_LSN, METADATA_SERVER_EMBEDDED_COMMITTED_LSN, + METADATA_SERVER_EMBEDDED_FIRST_INDEX, METADATA_SERVER_EMBEDDED_LAST_INDEX, + METADATA_SERVER_EMBEDDED_LEADER_ID, METADATA_SERVER_EMBEDDED_SNAPSHOT_SIZE_BYTES, + METADATA_SERVER_EMBEDDED_TERM, +}; use crate::raft::kv_memory_storage::KvMemoryStorage; use crate::raft::network::grpc_svc::metadata_server_network_svc_client::MetadataServerNetworkSvcClient; use crate::raft::network::{ConnectionManager, Networking}; @@ -30,6 +36,7 @@ use futures::future::{FusedFuture, OptionFuture}; use futures::never::Never; use futures::FutureExt; use futures::TryFutureExt; +use metrics::gauge; use prost::{DecodeError, EncodeError, Message as ProstMessage}; use protobuf::{Message as ProtobufMessage, ProtobufError}; use raft::prelude::{ConfChange, ConfChangeV2, ConfState, Entry, EntryType, Message}; @@ -160,7 +167,7 @@ impl RaftMetadataServer { let (request_tx, request_rx) = mpsc::channel(2); let (provision_tx, provision_rx) = mpsc::channel(1); let (join_cluster_tx, join_cluster_rx) = mpsc::channel(1); - let (status_tx, _status_rx) = watch::channel(MetadataStoreSummary::default()); + let (status_tx, _) = watch::channel(MetadataStoreSummary::default()); let mut metadata_store_options = Configuration::updateable().map(|configuration| &configuration.metadata_server); @@ -1252,6 +1259,38 @@ impl Member { }; } }); + + self.record_summary_metrics(&self.status_tx.borrow()); + } + + fn record_summary_metrics(&self, summary: &MetadataStoreSummary) { + let MetadataStoreSummary::Member { + leader, + raft, + snapshot, + .. + } = summary + else { + return; + }; + + if let Some(id) = leader { + gauge!(METADATA_SERVER_EMBEDDED_LEADER_ID).set(u32::from(*id) as f64); + } else { + gauge!(METADATA_SERVER_EMBEDDED_LEADER_ID).set(INVALID_ID as f64); + } + + if let Some(snapshot) = snapshot { + gauge!(METADATA_SERVER_EMBEDDED_SNAPSHOT_SIZE_BYTES).set(snapshot.size as f64); + } else { + gauge!(METADATA_SERVER_EMBEDDED_SNAPSHOT_SIZE_BYTES).set(0); + } + + gauge!(METADATA_SERVER_EMBEDDED_TERM).set(raft.term as f64); + gauge!(METADATA_SERVER_EMBEDDED_APPLIED_LSN).set(raft.applied as f64); + gauge!(METADATA_SERVER_EMBEDDED_COMMITTED_LSN).set(raft.committed as f64); + gauge!(METADATA_SERVER_EMBEDDED_FIRST_INDEX).set(raft.first_index as f64); + gauge!(METADATA_SERVER_EMBEDDED_LAST_INDEX).set(raft.last_index as f64); } fn raft_summary(&self) -> RaftSummary {