Skip to content

Commit

Permalink
Implement Telemetry Metrics for Metadata Server
Browse files Browse the repository at this point in the history
Summary:
This pull request introduces telemetry metrics to the metadata server, enhancing monitoring and performance analysis capabilities.

Fixes restatedev#2605
  • Loading branch information
muhamadazmy committed Feb 4, 2025
1 parent 86cd732 commit bf1ecd6
Show file tree
Hide file tree
Showing 10 changed files with 373 additions and 29 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
/// the metrics' sink.
use metrics::{describe_counter, describe_histogram, Unit};

pub(crate) const BIFROST_REPLICATED_APPEND: &str = "restate.bifrost.replicatedloglet.appends.total";
pub(crate) const BIFROST_REPLICATED_READ_CACHE_HIT: &str =
"restate.bifrost.replicatedloglet.read_record_cache_hit.total";
pub(crate) const BIFROST_REPLICATED_READ_CACHE_FILTERED: &str =
Expand All @@ -31,12 +30,6 @@ pub(crate) const BIFROST_SEQ_RECORDS_COMMITTED_BYTES: &str =
pub(crate) const BIFROST_SEQ_STORE_DURATION: &str = "restate.bifrost.sequencer.store_duration";

pub(crate) fn describe_metrics() {
describe_counter!(
BIFROST_REPLICATED_APPEND,
Unit::Count,
"Number of append requests to bifrost's replicated loglet"
);

describe_counter!(
BIFROST_REPLICATED_READ_CACHE_HIT,
Unit::Count,
Expand Down
73 changes: 57 additions & 16 deletions crates/core/src/metadata_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,15 @@ mod test_util;

#[cfg(any(test, feature = "test-util"))]
use crate::metadata_store::test_util::InMemoryMetadataStore;
use crate::metric_definitions::{
METADATA_CLIENT_DELETE_DURATION, METADATA_CLIENT_DELETE_TOTAL, METADATA_CLIENT_GET_DURATION,
METADATA_CLIENT_GET_TOTAL, METADATA_CLIENT_GET_VERSION_DURATION,
METADATA_CLIENT_GET_VERSION_TOTAL, METADATA_CLIENT_PUT_DURATION, METADATA_CLIENT_PUT_TOTAL,
};
use async_trait::async_trait;
use bytes::{Bytes, BytesMut};
use bytestring::ByteString;
use metrics::{counter, histogram};
use restate_types::errors::{
BoxedMaybeRetryableError, GenericError, IntoMaybeRetryable, MaybeRetryableError,
};
Expand All @@ -26,6 +32,7 @@ use restate_types::storage::{StorageCodec, StorageDecode, StorageEncode, Storage
use restate_types::{flexbuffers_storage_encode_decode, Version, Versioned};
use std::future::Future;
use std::sync::Arc;
use std::time::Instant;
use tracing::debug;

#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -299,28 +306,43 @@ impl MetadataStoreClient {
&self,
key: ByteString,
) -> Result<Option<T>, ReadError> {
let value = self.inner.get(key).await?;
let start_time = Instant::now();
counter!(METADATA_CLIENT_GET_TOTAL, "key" => key.to_string()).increment(1);

if let Some(mut versioned_value) = value {
let value = StorageCodec::decode::<T, _>(&mut versioned_value.value)
.map_err(|err| ReadError::Codec(err.into()))?;
let value = self.inner.get(key.clone()).await?;

assert_eq!(
versioned_value.version,
value.version(),
"versions must align"
);
let result = value
.map(|mut versioned_value| {
let value = StorageCodec::decode::<T, _>(&mut versioned_value.value)
.map_err(|err| ReadError::Codec(err.into()))?;

Ok(Some(value))
} else {
Ok(None)
}
assert_eq!(
versioned_value.version,
value.version(),
"versions must align"
);

Ok(value)
})
.transpose();

histogram!(METADATA_CLIENT_GET_DURATION, "key" => key.to_string())
.record(start_time.elapsed());

result
}

/// Gets the current version for the given key. If key-value pair is not present, then return
/// [`None`].
pub async fn get_version(&self, key: ByteString) -> Result<Option<Version>, ReadError> {
self.inner.get_version(key).await
let start_time = Instant::now();
counter!(METADATA_CLIENT_GET_VERSION_TOTAL, "key" => key.to_string()).increment(1);
let result = self.inner.get_version(key.clone()).await;

histogram!(METADATA_CLIENT_GET_VERSION_DURATION, "key" => key.to_string())
.record(start_time.elapsed());

result
}

/// Puts the versioned value under the given key following the provided precondition. If the
Expand All @@ -334,10 +356,21 @@ impl MetadataStoreClient {
where
T: Versioned + StorageEncode,
{
let start_time = Instant::now();
counter!(METADATA_CLIENT_PUT_TOTAL, "key" => key.to_string()).increment(1);

let versioned_value =
serialize_value(value).map_err(|err| WriteError::Codec(err.into()))?;

self.inner.put(key, versioned_value, precondition).await
let result = self
.inner
.put(key.clone(), versioned_value, precondition)
.await;

histogram!(METADATA_CLIENT_PUT_DURATION, "key" => key.to_string())
.record(start_time.elapsed());

result
}

/// Deletes the key-value pair for the given key following the provided precondition. If the
Expand All @@ -347,7 +380,15 @@ impl MetadataStoreClient {
key: ByteString,
precondition: Precondition,
) -> Result<(), WriteError> {
self.inner.delete(key, precondition).await
let start_time = Instant::now();
counter!(METADATA_CLIENT_DELETE_TOTAL, "key" => key.to_string()).increment(1);

let result = self.inner.delete(key.clone(), precondition).await;

histogram!(METADATA_CLIENT_DELETE_DURATION, "key" => key.to_string())
.record(start_time.elapsed());

result
}

/// Gets the value under the specified key or inserts a new value if it is not present into the
Expand Down
63 changes: 62 additions & 1 deletion crates/core/src/metric_definitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use metrics::{describe_counter, Unit};
use metrics::{describe_counter, describe_histogram, Unit};

// value of label `kind` in TC_SPAWN are defined in [`crate::TaskKind`].
pub const TC_SPAWN: &str = "restate.task_center.spawned.total";
Expand All @@ -18,15 +18,76 @@ pub const TC_FINISHED: &str = "restate.task_center.finished.total";
pub const TC_STATUS_COMPLETED: &str = "completed";
pub const TC_STATUS_FAILED: &str = "failed";

pub(crate) const METADATA_CLIENT_GET_DURATION: &str = "restate.metadata_client.get.duration";
pub(crate) const METADATA_CLIENT_GET_VERSION_DURATION: &str =
"restate.metadata_client.get_version.duration";
pub(crate) const METADATA_CLIENT_PUT_DURATION: &str = "restate.metadata_client.put.duration";
pub(crate) const METADATA_CLIENT_DELETE_DURATION: &str = "restate.metadata_client.delete.duration";

pub(crate) const METADATA_CLIENT_GET_TOTAL: &str = "restate.metadata_client.get.total";
pub(crate) const METADATA_CLIENT_GET_VERSION_TOTAL: &str =
"restate.metadata_client.get_version.total";
pub(crate) const METADATA_CLIENT_PUT_TOTAL: &str = "restate.metadata_client.put.total";
pub(crate) const METADATA_CLIENT_DELETE_TOTAL: &str = "restate.metadata_client.delete.total";

pub fn describe_metrics() {
describe_counter!(
TC_SPAWN,
Unit::Count,
"Total tasks spawned by the task center"
);

describe_counter!(
TC_FINISHED,
Unit::Count,
"Number of tasks that finished with 'status'"
);

describe_histogram!(
METADATA_CLIENT_GET_DURATION,
Unit::Seconds,
"Metadata client get request duration in seconds"
);

describe_histogram!(
METADATA_CLIENT_GET_VERSION_DURATION,
Unit::Seconds,
"Metadata client get_version request duration in seconds"
);

describe_histogram!(
METADATA_CLIENT_PUT_DURATION,
Unit::Seconds,
"Metadata client put request duration in seconds"
);

describe_histogram!(
METADATA_CLIENT_DELETE_DURATION,
Unit::Seconds,
"Metadata client delete request duration in seconds"
);

describe_counter!(
METADATA_CLIENT_GET_TOTAL,
Unit::Count,
"Metadata client get request count"
);

describe_counter!(
METADATA_CLIENT_GET_VERSION_TOTAL,
Unit::Count,
"Metadata client get_version request count"
);

describe_counter!(
METADATA_CLIENT_PUT_TOTAL,
Unit::Count,
"Metadata client put request count"
);

describe_counter!(
METADATA_CLIENT_DELETE_TOTAL,
Unit::Count,
"Metadata client delete request count"
);
}
1 change: 1 addition & 0 deletions crates/metadata-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ derive_more = { workspace = true }
flexbuffers = { workspace = true }
futures = { workspace = true }
http = { workspace = true }
metrics = { workspace = true }
parking_lot = { workspace = true}
prost = { workspace = true }
prost-dto = { workspace = true }
Expand Down
22 changes: 22 additions & 0 deletions crates/metadata-server/src/grpc/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,24 @@ use crate::grpc::{
DeleteRequest, GetRequest, GetResponse, GetVersionResponse,
ProvisionRequest as ProtoProvisionRequest, ProvisionResponse, PutRequest, StatusResponse,
};
use crate::metric_definitions::{
METADATA_SERVER_DELETE_DURATION, METADATA_SERVER_DELETE_TOTAL, METADATA_SERVER_GET_DURATION,
METADATA_SERVER_GET_TOTAL, METADATA_SERVER_GET_VERSION_DURATION,
METADATA_SERVER_GET_VERSION_TOTAL, METADATA_SERVER_PUT_DURATION, METADATA_SERVER_PUT_TOTAL,
};
use crate::{
prepare_initial_nodes_configuration, MetadataStoreRequest, MetadataStoreSummary,
ProvisionError, ProvisionRequest, ProvisionSender, RequestError, RequestSender, StatusWatch,
};
use async_trait::async_trait;
use metrics::{counter, histogram};
use restate_core::metadata_store::{serialize_value, Precondition};
use restate_types::config::Configuration;
use restate_types::metadata_store::keys::NODES_CONFIG_KEY;
use restate_types::nodes_config::NodesConfiguration;
use restate_types::storage::StorageCodec;
use std::ops::Deref;
use std::time::Instant;
use tokio::sync::{oneshot, watch};
use tonic::{Request, Response, Status};

Expand Down Expand Up @@ -53,6 +60,9 @@ impl MetadataStoreHandler {
#[async_trait]
impl MetadataServerSvc for MetadataStoreHandler {
async fn get(&self, request: Request<GetRequest>) -> Result<Response<GetResponse>, Status> {
counter!(METADATA_SERVER_GET_TOTAL).increment(1);

let start_time = Instant::now();
let (result_tx, result_rx) = oneshot::channel();

let request = request.into_inner();
Expand All @@ -68,6 +78,8 @@ impl MetadataServerSvc for MetadataStoreHandler {
.await
.map_err(|_| Status::unavailable("metadata store is shut down"))??;

histogram!(METADATA_SERVER_GET_DURATION).record(start_time.elapsed());

Ok(Response::new(GetResponse {
value: result.map(Into::into),
}))
Expand All @@ -77,6 +89,8 @@ impl MetadataServerSvc for MetadataStoreHandler {
&self,
request: Request<GetRequest>,
) -> Result<Response<GetVersionResponse>, Status> {
counter!(METADATA_SERVER_GET_VERSION_TOTAL).increment(1);
let start_time = Instant::now();
let (result_tx, result_rx) = oneshot::channel();

let request = request.into_inner();
Expand All @@ -92,12 +106,16 @@ impl MetadataServerSvc for MetadataStoreHandler {
.await
.map_err(|_| Status::unavailable("metadata store is shut down"))??;

histogram!(METADATA_SERVER_GET_VERSION_DURATION).record(start_time.elapsed());

Ok(Response::new(GetVersionResponse {
version: result.map(Into::into),
}))
}

async fn put(&self, request: Request<PutRequest>) -> Result<Response<()>, Status> {
counter!(METADATA_SERVER_PUT_TOTAL).increment(1);
let start_time = Instant::now();
let (result_tx, result_rx) = oneshot::channel();

let request = request.into_inner();
Expand All @@ -123,10 +141,13 @@ impl MetadataServerSvc for MetadataStoreHandler {
.await
.map_err(|_| Status::unavailable("metadata store is shut down"))??;

histogram!(METADATA_SERVER_PUT_DURATION).record(start_time.elapsed());
Ok(Response::new(()))
}

async fn delete(&self, request: Request<DeleteRequest>) -> Result<Response<()>, Status> {
counter!(METADATA_SERVER_DELETE_TOTAL).increment(1);
let start_time = Instant::now();
let (result_tx, result_rx) = oneshot::channel();

let request = request.into_inner();
Expand All @@ -147,6 +168,7 @@ impl MetadataServerSvc for MetadataStoreHandler {
.await
.map_err(|_| Status::unavailable("metadata store is shut down"))??;

histogram!(METADATA_SERVER_DELETE_DURATION).record(start_time.elapsed());
Ok(Response::new(()))
}

Expand Down
3 changes: 3 additions & 0 deletions crates/metadata-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

pub mod grpc;
pub mod local;
mod metric_definitions;
pub mod raft;
mod util;

Expand Down Expand Up @@ -241,6 +242,8 @@ pub async fn create_metadata_server(
metadata_writer: Option<MetadataWriter>,
server_builder: &mut NetworkServerBuilder,
) -> anyhow::Result<BoxedMetadataStoreService> {
metric_definitions::describe_metrics();

match metadata_server_options.kind {
MetadataServerKind::Local => local::create_server(
metadata_server_options,
Expand Down
Loading

0 comments on commit bf1ecd6

Please sign in to comment.