Skip to content

Commit

Permalink
Implement Telemetry Metrics for Metadata Server
Browse files Browse the repository at this point in the history
Summary:
This pull request introduces telemetry metrics to the metadata server, enhancing monitoring and performance analysis capabilities.

Fixes #2605
  • Loading branch information
muhamadazmy committed Feb 5, 2025
1 parent f7b1e5a commit 2760587
Show file tree
Hide file tree
Showing 11 changed files with 565 additions and 138 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
/// the metrics' sink.
use metrics::{describe_counter, describe_histogram, Unit};

pub(crate) const BIFROST_REPLICATED_APPEND: &str = "restate.bifrost.replicatedloglet.appends.total";
pub(crate) const BIFROST_REPLICATED_READ_CACHE_HIT: &str =
"restate.bifrost.replicatedloglet.read_record_cache_hit.total";
pub(crate) const BIFROST_REPLICATED_READ_CACHE_FILTERED: &str =
Expand All @@ -31,12 +30,6 @@ pub(crate) const BIFROST_SEQ_RECORDS_COMMITTED_BYTES: &str =
pub(crate) const BIFROST_SEQ_STORE_DURATION: &str = "restate.bifrost.sequencer.store_duration";

pub(crate) fn describe_metrics() {
describe_counter!(
BIFROST_REPLICATED_APPEND,
Unit::Count,
"Number of append requests to bifrost's replicated loglet"
);

describe_counter!(
BIFROST_REPLICATED_READ_CACHE_HIT,
Unit::Count,
Expand Down
114 changes: 91 additions & 23 deletions crates/core/src/metadata_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,16 @@
pub mod providers;
mod test_util;

#[cfg(any(test, feature = "test-util"))]
use crate::metadata_store::test_util::InMemoryMetadataStore;
use std::future::Future;
use std::sync::Arc;

use async_trait::async_trait;
use bytes::{Bytes, BytesMut};
use bytestring::ByteString;
use metrics::{counter, histogram};
use tokio::time::Instant;
use tracing::debug;

use restate_types::errors::{
BoxedMaybeRetryableError, GenericError, IntoMaybeRetryable, MaybeRetryableError,
};
Expand All @@ -24,9 +29,15 @@ use restate_types::nodes_config::NodesConfiguration;
use restate_types::retries::RetryPolicy;
use restate_types::storage::{StorageCodec, StorageDecode, StorageEncode, StorageEncodeError};
use restate_types::{flexbuffers_storage_encode_decode, Version, Versioned};
use std::future::Future;
use std::sync::Arc;
use tracing::debug;

#[cfg(any(test, feature = "test-util"))]
use crate::metadata_store::test_util::InMemoryMetadataStore;
use crate::metric_definitions::{
METADATA_CLIENT_DELETE_DURATION, METADATA_CLIENT_DELETE_TOTAL, METADATA_CLIENT_GET_DURATION,
METADATA_CLIENT_GET_TOTAL, METADATA_CLIENT_GET_VERSION_DURATION,
METADATA_CLIENT_GET_VERSION_TOTAL, METADATA_CLIENT_PUT_DURATION, METADATA_CLIENT_PUT_TOTAL,
STATUS_COMPLETED, STATUS_FAILED,
};

#[derive(Debug, thiserror::Error)]
pub enum ReadError {
Expand Down Expand Up @@ -300,28 +311,57 @@ impl MetadataStoreClient {
&self,
key: ByteString,
) -> Result<Option<T>, ReadError> {
let value = self.inner.get(key).await?;
let start_time = Instant::now();
let key_str = key.to_string();
let result = {
let value = self.inner.get(key).await?;

if let Some(mut versioned_value) = value {
let value = StorageCodec::decode::<T, _>(&mut versioned_value.value)
.map_err(|err| ReadError::Codec(err.into()))?;

assert_eq!(
versioned_value.version,
value.version(),
"versions must align"
);

Ok(Some(value))
} else {
Ok(None)
}
};

if let Some(mut versioned_value) = value {
let value = StorageCodec::decode::<T, _>(&mut versioned_value.value)
.map_err(|err| ReadError::Codec(err.into()))?;
let status = if result.is_ok() {
STATUS_COMPLETED
} else {
STATUS_FAILED
};

assert_eq!(
versioned_value.version,
value.version(),
"versions must align"
);
histogram!(METADATA_CLIENT_GET_DURATION).record(start_time.elapsed());
counter!(METADATA_CLIENT_GET_TOTAL, "key" => key_str, "status" => status).increment(1);

Ok(Some(value))
} else {
Ok(None)
}
result
}

/// Gets the current version for the given key. If key-value pair is not present, then return
/// [`None`].
pub async fn get_version(&self, key: ByteString) -> Result<Option<Version>, ReadError> {
self.inner.get_version(key).await
let start_time = Instant::now();
let key_str = key.to_string();
let result = self.inner.get_version(key).await;

let status = if result.is_ok() {
STATUS_COMPLETED
} else {
STATUS_FAILED
};

histogram!(METADATA_CLIENT_GET_VERSION_DURATION).record(start_time.elapsed());
counter!(METADATA_CLIENT_GET_VERSION_TOTAL, "key" => key_str, "status" => status)
.increment(1);

result
}

/// Puts the versioned value under the given key following the provided precondition. If the
Expand All @@ -335,10 +375,25 @@ impl MetadataStoreClient {
where
T: Versioned + StorageEncode,
{
let versioned_value =
serialize_value(value).map_err(|err| WriteError::Codec(err.into()))?;
let start_time = Instant::now();
let key_str = key.to_string();
let result = {
let versioned_value =
serialize_value(value).map_err(|err| WriteError::Codec(err.into()))?;

self.inner.put(key, versioned_value, precondition).await
};

self.inner.put(key, versioned_value, precondition).await
let status = if result.is_ok() {
STATUS_COMPLETED
} else {
STATUS_FAILED
};

histogram!(METADATA_CLIENT_PUT_DURATION).record(start_time.elapsed());
counter!(METADATA_CLIENT_PUT_TOTAL, "key" => key_str, "status" => status).increment(1);

result
}

/// Deletes the key-value pair for the given key following the provided precondition. If the
Expand All @@ -348,7 +403,20 @@ impl MetadataStoreClient {
key: ByteString,
precondition: Precondition,
) -> Result<(), WriteError> {
self.inner.delete(key, precondition).await
let start_time = Instant::now();
let key_str = key.to_string();
let result = self.inner.delete(key, precondition).await;

let status = if result.is_ok() {
STATUS_COMPLETED
} else {
STATUS_FAILED
};

histogram!(METADATA_CLIENT_DELETE_DURATION).record(start_time.elapsed());
counter!(METADATA_CLIENT_DELETE_TOTAL,"key" => key_str, "status" => status).increment(1);

result
}

/// Gets the value under the specified key or inserts a new value if it is not present into the
Expand Down
69 changes: 65 additions & 4 deletions crates/core/src/metric_definitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,86 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use metrics::{describe_counter, Unit};
use metrics::{describe_counter, describe_histogram, Unit};

// value of label `kind` in TC_SPAWN are defined in [`crate::TaskKind`].
pub const TC_SPAWN: &str = "restate.task_center.spawned.total";
pub const TC_FINISHED: &str = "restate.task_center.finished.total";

// values of label `status` in TC_FINISHED
pub const TC_STATUS_COMPLETED: &str = "completed";
pub const TC_STATUS_FAILED: &str = "failed";
// values of label `status` in METRICS
pub const STATUS_COMPLETED: &str = "completed";
pub const STATUS_FAILED: &str = "failed";

pub(crate) const METADATA_CLIENT_GET_DURATION: &str = "restate.metadata_client.get.duration";
pub(crate) const METADATA_CLIENT_GET_VERSION_DURATION: &str =
"restate.metadata_client.get_version.duration";
pub(crate) const METADATA_CLIENT_PUT_DURATION: &str = "restate.metadata_client.put.duration";
pub(crate) const METADATA_CLIENT_DELETE_DURATION: &str = "restate.metadata_client.delete.duration";

pub(crate) const METADATA_CLIENT_GET_TOTAL: &str = "restate.metadata_client.get.total";
pub(crate) const METADATA_CLIENT_GET_VERSION_TOTAL: &str =
"restate.metadata_client.get_version.total";
pub(crate) const METADATA_CLIENT_PUT_TOTAL: &str = "restate.metadata_client.put.total";
pub(crate) const METADATA_CLIENT_DELETE_TOTAL: &str = "restate.metadata_client.delete.total";

pub fn describe_metrics() {
describe_counter!(
TC_SPAWN,
Unit::Count,
"Total tasks spawned by the task center"
);

describe_counter!(
TC_FINISHED,
Unit::Count,
"Number of tasks that finished with 'status'"
);

describe_histogram!(
METADATA_CLIENT_GET_DURATION,
Unit::Seconds,
"Metadata client get request duration in seconds"
);

describe_histogram!(
METADATA_CLIENT_GET_VERSION_DURATION,
Unit::Seconds,
"Metadata client get_version request duration in seconds"
);

describe_histogram!(
METADATA_CLIENT_PUT_DURATION,
Unit::Seconds,
"Metadata client put request duration in seconds"
);

describe_histogram!(
METADATA_CLIENT_DELETE_DURATION,
Unit::Seconds,
"Metadata client delete request duration in seconds"
);

describe_counter!(
METADATA_CLIENT_GET_TOTAL,
Unit::Count,
"Metadata client get request success count"
);

describe_counter!(
METADATA_CLIENT_GET_VERSION_TOTAL,
Unit::Count,
"Metadata client get_version request success count"
);

describe_counter!(
METADATA_CLIENT_PUT_TOTAL,
Unit::Count,
"Metadata client put request success count"
);

describe_counter!(
METADATA_CLIENT_DELETE_TOTAL,
Unit::Count,
"Metadata client delete request success count"
);
}
10 changes: 4 additions & 6 deletions crates/core/src/task_center.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,7 @@ use tracing::{debug, error, info, trace, warn};
use restate_types::identifiers::PartitionId;
use restate_types::GenerationalNodeId;

use crate::metric_definitions::{
self, TC_FINISHED, TC_SPAWN, TC_STATUS_COMPLETED, TC_STATUS_FAILED,
};
use crate::metric_definitions::{self, STATUS_COMPLETED, STATUS_FAILED, TC_FINISHED, TC_SPAWN};
use crate::{Metadata, ShutdownError, ShutdownSourceErr};

const EXIT_CODE_FAILURE: i32 = 1;
Expand Down Expand Up @@ -729,7 +727,7 @@ impl TaskCenterInner {
match result {
Ok(Ok(())) => {
trace!(kind = ?task.kind(), name = ?task.name(), "Task {} exited normally", task_id);
counter!(TC_FINISHED, "kind" => kind_str, "status" => TC_STATUS_COMPLETED)
counter!(TC_FINISHED, "kind" => kind_str, "status" => STATUS_COMPLETED)
.increment(1);
}
Ok(Err(err)) => {
Expand All @@ -752,11 +750,11 @@ impl TaskCenterInner {
} else {
error!(kind = ?task.kind(), name = ?task.name(), "Task {} failed with: {:?}", task_id, err);
}
counter!(TC_FINISHED, "kind" => kind_str, "status" => TC_STATUS_FAILED)
counter!(TC_FINISHED, "kind" => kind_str, "status" => STATUS_FAILED)
.increment(1);
}
Err(err) => {
counter!(TC_FINISHED, "kind" => kind_str, "status" => TC_STATUS_FAILED)
counter!(TC_FINISHED, "kind" => kind_str, "status" => STATUS_FAILED)
.increment(1);
if should_shutdown_on_error {
error!(kind = ?task.kind(), name = ?task.name(), "Shutting down: task {} panicked: {:?}", task_id, err);
Expand Down
1 change: 1 addition & 0 deletions crates/metadata-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ derive_more = { workspace = true }
flexbuffers = { workspace = true }
futures = { workspace = true }
http = { workspace = true }
metrics = { workspace = true }
parking_lot = { workspace = true}
prost = { workspace = true }
prost-dto = { workspace = true }
Expand Down
Loading

0 comments on commit 2760587

Please sign in to comment.