Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve sink metrics #18758

Merged
merged 7 commits into from
Sep 29, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

66 changes: 18 additions & 48 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -4134,44 +4134,22 @@ def section_memory_manager(outer_panels):
),
]


def section_connector_node(outer_panels):
def section_sink_metrics(outer_panels):
panels = outer_panels.sub_panel()
return [
outer_panels.row_collapsed(
"Connector Node",
"Sink Metrics",
[
panels.timeseries_rowsps(
"Connector Source Throughput(rows)",
"",
[
panels.target(
f"rate({metric('source_rows_received')}[$__rate_interval])",
"source={{source_type}} @ {{source_id}}",
),
],
),
panels.timeseries_rowsps(
"Connector Sink Throughput(rows)",
"",
"Remote Sink (Java) Throughput",
"The rows sent by remote sink to the Java connector process",
[
panels.target(
f"rate({metric('connector_sink_rows_received')}[$__rate_interval])",
"sink={{connector_type}} @ {{sink_id}}",
"{{sink_id}} {{sink_name}} @ actor {{actor_id}}",
),
],
),
],
)
]


def section_sink_metrics(outer_panels):
panels = outer_panels.sub_panel()
return [
outer_panels.row_collapsed(
"Sink Metrics",
[
panels.timeseries_latency(
"Commit Duration",
"",
Expand All @@ -4195,15 +4173,15 @@ def section_sink_metrics(outer_panels):
[
panels.target(
f"{metric('log_store_latest_write_epoch')}",
"latest write epoch @ {{sink_id}} {{sink_name}} ({{connector}}) actor {{actor_id}}",
"latest write epoch @ {{sink_id}} {{sink_name}} @ actor {{actor_id}}",
),
panels.target(
f"{metric('log_store_latest_read_epoch')}",
"latest read epoch @ {{sink_id}} {{sink_name}} ({{connector}}) actor {{actor_id}}",
),
panels.target(
f"{metric('kv_log_store_buffer_unconsumed_min_epoch')}",
"Kv log store uncomsuned min epoch @ {{sink_id}} {{sink_name}} ({{connector}}) actor {{actor_id}}",
"Kv log store unconsumed min epoch @ {{sink_id}} {{sink_name}} ({{connector}}) actor {{actor_id}}",
),
],
),
Expand All @@ -4212,9 +4190,9 @@ def section_sink_metrics(outer_panels):
"",
[
panels.target(
f"(max({metric('log_store_latest_write_epoch')}) by (connector, sink_id, actor_id, sink_name)"
+ f"- max({metric('log_store_latest_read_epoch')}) by (connector, sink_id, actor_id, sink_name)) / (2^16) / 1000",
"Consume lag @ {{sink_id}} {{sink_name}} ({{connector}}) actor {{actor_id}}",
f"(max({metric('log_store_latest_write_epoch')}) by (sink_id, actor_id, sink_name)"
+ f"- max({metric('log_store_latest_read_epoch')}) by (sink_id, actor_id, sink_name)) / (2^16) / 1000",
"{{sink_id}} {{sink_name}} @ actor {{actor_id}}",
),
],
),
Expand All @@ -4233,9 +4211,9 @@ def section_sink_metrics(outer_panels):
"",
[
panels.target(
f"clamp_min((max({metric('log_store_first_write_epoch')}) by (connector, sink_id, actor_id, sink_name)"
+ f"- max({metric('log_store_latest_read_epoch')}) by (connector, sink_id, actor_id, sink_name)) / (2^16) / 1000, 0)",
"Consume persistent log lag @ {{sink_id}} {{sink_name}} ({{connector}}) actor {{actor_id}}",
f"clamp_min((max({metric('log_store_first_write_epoch')}) by (sink_id, actor_id, sink_name)"
+ f"- max({metric('log_store_latest_read_epoch')}) by (sink_id, actor_id, sink_name)) / (2^16) / 1000, 0)",
"{{sink_id}} {{sink_name}} @ actor {{actor_id}}",
),
],
),
Expand Down Expand Up @@ -4265,19 +4243,12 @@ def section_sink_metrics(outer_panels):
"",
[
panels.target(
f"sum(rate({metric('log_store_write_rows')}[$__rate_interval])) by (connector, sink_id, sink_name)",
"sink={{sink_id}} {{sink_name}} ({{connector}})",
f"sum(rate({metric('log_store_write_rows')}[$__rate_interval])) by (sink_id, sink_name)",
"{{sink_id}} {{sink_name}}",
),
],
),
panels.timeseries_rowsps(
"Executor Log Store Write Throughput(rows)",
"",
[
panels.target(
f"sum(rate({metric('log_store_write_rows')}[$__rate_interval])) by ({NODE_LABEL}, connector, sink_id, actor_id, sink_name)",
"{{sink_id}} {{sink_name}} ({{connector}}) actor {{actor_id}} {{%s}}"
% NODE_LABEL,
panels.target_hidden(
f"sum(rate({metric('log_store_write_rows')}[$__rate_interval])) by ({NODE_LABEL}, sink_id, actor_id, sink_name)",
"{{sink_id}} {{sink_name}} @ actor {{actor_id}} {{%s}}" % NODE_LABEL,
),
],
),
Expand Down Expand Up @@ -4805,7 +4776,6 @@ def section_udf(outer_panels):
*section_grpc_hummock_meta_client(panels),
*section_frontend(panels),
*section_memory_manager(panels),
*section_connector_node(panels),
*section_sink_metrics(panels),
*section_kafka_metrics(panels),
*section_network_connection(panels),
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

33 changes: 0 additions & 33 deletions grafana/risingwave-user-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -822,38 +822,6 @@ def section_batch(outer_panels):
)
]


def section_connector_node(outer_panels):
panels = outer_panels.sub_panel()
return [
outer_panels.row_collapsed(
"Connector Node",
[
panels.timeseries_rowsps(
"Connector Source Throughput(rows)",
"",
[
panels.target(
f"rate({metric('connector_source_rows_received')}[$__rate_interval])",
"source={{source_type}} @ {{source_id}}",
),
],
),
panels.timeseries_rowsps(
"Connector Sink Throughput(rows)",
"",
[
panels.target(
f"rate({metric('connector_sink_rows_received')}[$__rate_interval])",
"sink={{connector_type}} @ {{sink_id}}",
),
],
),
],
)
]


templating_list = []
if dynamic_source_enabled:
templating_list.append(
Expand Down Expand Up @@ -990,6 +958,5 @@ def section_connector_node(outer_panels):
*section_storage(panels),
*section_streaming(panels),
*section_batch(panels),
*section_connector_node(panels),
],
).auto_panel_ids()
2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions src/connector/src/sink/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,9 +347,10 @@ pub struct MonitoredLogWriter<W: LogWriter> {
}

pub struct LogWriterMetrics {
pub log_store_first_write_epoch: LabelGuardedIntGauge<4>,
pub log_store_latest_write_epoch: LabelGuardedIntGauge<4>,
pub log_store_write_rows: LabelGuardedIntCounter<4>,
// Labels: [actor_id, sink_id, sink_name]
pub log_store_first_write_epoch: LabelGuardedIntGauge<3>,
pub log_store_latest_write_epoch: LabelGuardedIntGauge<3>,
pub log_store_write_rows: LabelGuardedIntCounter<3>,
}

impl<W: LogWriter> LogWriter for MonitoredLogWriter<W> {
Expand Down
43 changes: 20 additions & 23 deletions src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,12 +303,14 @@ pub static GLOBAL_SINK_METRICS: LazyLock<SinkMetrics> =
#[derive(Clone)]
pub struct SinkMetrics {
pub sink_commit_duration: LabelGuardedHistogramVec<4>,
pub connector_sink_rows_received: LabelGuardedIntCounterVec<3>,
pub connector_sink_rows_received: LabelGuardedIntCounterVec<4>,

// Log store metrics
pub log_store_first_write_epoch: LabelGuardedIntGaugeVec<4>,
pub log_store_latest_write_epoch: LabelGuardedIntGaugeVec<4>,
pub log_store_write_rows: LabelGuardedIntCounterVec<4>,
// Log store writer metrics
pub log_store_first_write_epoch: LabelGuardedIntGaugeVec<3>,
pub log_store_latest_write_epoch: LabelGuardedIntGaugeVec<3>,
pub log_store_write_rows: LabelGuardedIntCounterVec<3>,

// Log store reader metrics
pub log_store_latest_read_epoch: LabelGuardedIntGaugeVec<4>,
pub log_store_read_rows: LabelGuardedIntCounterVec<4>,
pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounterVec<4>,
Expand All @@ -334,31 +336,31 @@ impl SinkMetrics {
let connector_sink_rows_received = register_guarded_int_counter_vec_with_registry!(
"connector_sink_rows_received",
"Number of rows received by sink",
&["connector_type", "sink_id", "sink_name"],
&["actor_id", "connector_type", "sink_id", "sink_name"],
registry
)
.unwrap();

let log_store_first_write_epoch = register_guarded_int_gauge_vec_with_registry!(
"log_store_first_write_epoch",
"The first write epoch of log store",
&["actor_id", "connector", "sink_id", "sink_name"],
&["actor_id", "sink_id", "sink_name"],
registry
)
.unwrap();

let log_store_latest_write_epoch = register_guarded_int_gauge_vec_with_registry!(
"log_store_latest_write_epoch",
"The latest write epoch of log store",
&["actor_id", "connector", "sink_id", "sink_name"],
&["actor_id", "sink_id", "sink_name"],
registry
)
.unwrap();

let log_store_write_rows = register_guarded_int_counter_vec_with_registry!(
"log_store_write_rows",
"The write rate of rows",
&["actor_id", "connector", "sink_id", "sink_name"],
&["actor_id", "sink_id", "sink_name"],
registry
)
.unwrap();
Expand Down Expand Up @@ -467,28 +469,23 @@ pub struct SinkWriterParam {
#[derive(Clone)]
pub struct SinkWriterMetrics {
pub sink_commit_duration: LabelGuardedHistogram<4>,
pub connector_sink_rows_received: LabelGuardedIntCounter<3>,
pub connector_sink_rows_received: LabelGuardedIntCounter<4>,
}

impl SinkWriterMetrics {
pub fn new(writer_param: &SinkWriterParam) -> Self {
let labels = [
&writer_param.actor_id.to_string(),
writer_param.connector.as_str(),
&writer_param.sink_id.to_string(),
writer_param.sink_name.as_str(),
];
let sink_commit_duration = GLOBAL_SINK_METRICS
.sink_commit_duration
.with_guarded_label_values(&[
&writer_param.actor_id.to_string(),
writer_param.connector.as_str(),
&writer_param.sink_id.to_string(),
writer_param.sink_name.as_str(),
]);
.with_guarded_label_values(&labels);
let connector_sink_rows_received = GLOBAL_SINK_METRICS
.connector_sink_rows_received
.with_guarded_label_values(&[
// TODO: should have `actor_id` as label
// &writer_param.actor_id.to_string(),
writer_param.connector.as_str(),
&writer_param.sink_id.to_string(),
writer_param.sink_name.as_str(),
]);
.with_guarded_label_values(&labels);
Self {
sink_commit_duration,
connector_sink_rows_received,
Expand Down
6 changes: 0 additions & 6 deletions src/connector/src/source/cdc/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,19 +209,13 @@ impl<T: CdcSourceTypeTrait> SplitReader for CdcSplitReader<T> {
impl<T: CdcSourceTypeTrait> CdcSplitReader<T> {
#[try_stream(ok = Vec<SourceMessage>, error = ConnectorError)]
async fn into_data_stream(self) {
let source_type = T::source_type();
let mut rx = self.rx;
let source_id = self.source_id.to_string();
let metrics = self.source_ctx.metrics.clone();
let connector_source_rows_received_metrics = metrics
.connector_source_rows_received
.with_guarded_label_values(&[source_type.as_str_name(), &source_id]);

while let Some(result) = rx.recv().await {
match result {
Ok(GetEventStreamResponse { events, .. }) => {
tracing::trace!("receive {} cdc events ", events.len());
connector_source_rows_received_metrics.inc_by(events.len() as u64);
let msgs = events.into_iter().map(SourceMessage::from).collect_vec();
yield msgs;
}
Expand Down
11 changes: 0 additions & 11 deletions src/connector/src/source/monitor/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ pub struct SourceMetrics {
pub latest_message_id: LabelGuardedIntGaugeVec<3>,
pub rdkafka_native_metric: Arc<RdKafkaStats>,

pub connector_source_rows_received: LabelGuardedIntCounterVec<2>,

pub direct_cdc_event_lag_latency: LabelGuardedHistogramVec<1>,
}

Expand Down Expand Up @@ -112,14 +110,6 @@ impl SourceMetrics {
)
.unwrap();

let connector_source_rows_received = register_guarded_int_counter_vec_with_registry!(
"source_rows_received",
"Number of rows received by source",
&["source_type", "source_id"],
registry
)
.unwrap();

let opts = histogram_opts!(
"source_cdc_event_lag_duration_milliseconds",
"source_cdc_lag_latency",
Expand All @@ -134,7 +124,6 @@ impl SourceMetrics {
partition_input_bytes,
latest_message_id,
rdkafka_native_metric,
connector_source_rows_received,
direct_cdc_event_lag_latency,
}
}
Expand Down
1 change: 0 additions & 1 deletion src/stream/src/executor/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,6 @@ impl<F: LogStoreFactory> SinkExecutor<F> {
} else {
let labels = [
&actor_id.to_string(),
"NA", // TODO: remove the connector label for log writer metrics
&sink_id.to_string(),
self.sink_param.sink_name.as_str(),
];
Expand Down