Skip to content

Commit

Permalink
feat: improve sink metrics (#18758)
Browse files Browse the repository at this point in the history
  • Loading branch information
fuyufjh authored Sep 29, 2024
1 parent e6435e6 commit 4dfc4c9
Show file tree
Hide file tree
Showing 11 changed files with 46 additions and 129 deletions.
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

66 changes: 18 additions & 48 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -4174,44 +4174,22 @@ def section_memory_manager(outer_panels):
),
]


def section_connector_node(outer_panels):
def section_sink_metrics(outer_panels):
panels = outer_panels.sub_panel()
return [
outer_panels.row_collapsed(
"Connector Node",
"Sink Metrics",
[
panels.timeseries_rowsps(
"Connector Source Throughput(rows)",
"",
[
panels.target(
f"rate({metric('source_rows_received')}[$__rate_interval])",
"source={{source_type}} @ {{source_id}}",
),
],
),
panels.timeseries_rowsps(
"Connector Sink Throughput(rows)",
"",
"Remote Sink (Java) Throughput",
"The rows sent by remote sink to the Java connector process",
[
panels.target(
f"rate({metric('connector_sink_rows_received')}[$__rate_interval])",
"sink={{connector_type}} @ {{sink_id}}",
"{{sink_id}} {{sink_name}} @ actor {{actor_id}}",
),
],
),
],
)
]


def section_sink_metrics(outer_panels):
panels = outer_panels.sub_panel()
return [
outer_panels.row_collapsed(
"Sink Metrics",
[
panels.timeseries_latency(
"Commit Duration",
"",
Expand All @@ -4235,15 +4213,15 @@ def section_sink_metrics(outer_panels):
[
panels.target(
f"{metric('log_store_latest_write_epoch')}",
"latest write epoch @ {{sink_id}} {{sink_name}} ({{connector}}) actor {{actor_id}}",
"latest write epoch @ {{sink_id}} {{sink_name}} @ actor {{actor_id}}",
),
panels.target(
f"{metric('log_store_latest_read_epoch')}",
"latest read epoch @ {{sink_id}} {{sink_name}} ({{connector}}) actor {{actor_id}}",
),
panels.target(
f"{metric('kv_log_store_buffer_unconsumed_min_epoch')}",
"Kv log store uncomsuned min epoch @ {{sink_id}} {{sink_name}} ({{connector}}) actor {{actor_id}}",
"Kv log store unconsumed min epoch @ {{sink_id}} {{sink_name}} ({{connector}}) actor {{actor_id}}",
),
],
),
Expand All @@ -4252,9 +4230,9 @@ def section_sink_metrics(outer_panels):
"",
[
panels.target(
f"(max({metric('log_store_latest_write_epoch')}) by (connector, sink_id, actor_id, sink_name)"
+ f"- max({metric('log_store_latest_read_epoch')}) by (connector, sink_id, actor_id, sink_name)) / (2^16) / 1000",
"Consume lag @ {{sink_id}} {{sink_name}} ({{connector}}) actor {{actor_id}}",
f"(max({metric('log_store_latest_write_epoch')}) by (sink_id, actor_id, sink_name)"
+ f"- max({metric('log_store_latest_read_epoch')}) by (sink_id, actor_id, sink_name)) / (2^16) / 1000",
"{{sink_id}} {{sink_name}} @ actor {{actor_id}}",
),
],
),
Expand All @@ -4273,9 +4251,9 @@ def section_sink_metrics(outer_panels):
"",
[
panels.target(
f"clamp_min((max({metric('log_store_first_write_epoch')}) by (connector, sink_id, actor_id, sink_name)"
+ f"- max({metric('log_store_latest_read_epoch')}) by (connector, sink_id, actor_id, sink_name)) / (2^16) / 1000, 0)",
"Consume persistent log lag @ {{sink_id}} {{sink_name}} ({{connector}}) actor {{actor_id}}",
f"clamp_min((max({metric('log_store_first_write_epoch')}) by (sink_id, actor_id, sink_name)"
+ f"- max({metric('log_store_latest_read_epoch')}) by (sink_id, actor_id, sink_name)) / (2^16) / 1000, 0)",
"{{sink_id}} {{sink_name}} @ actor {{actor_id}}",
),
],
),
Expand Down Expand Up @@ -4305,19 +4283,12 @@ def section_sink_metrics(outer_panels):
"",
[
panels.target(
f"sum(rate({metric('log_store_write_rows')}[$__rate_interval])) by (connector, sink_id, sink_name)",
"sink={{sink_id}} {{sink_name}} ({{connector}})",
f"sum(rate({metric('log_store_write_rows')}[$__rate_interval])) by (sink_id, sink_name)",
"{{sink_id}} {{sink_name}}",
),
],
),
panels.timeseries_rowsps(
"Executor Log Store Write Throughput(rows)",
"",
[
panels.target(
f"sum(rate({metric('log_store_write_rows')}[$__rate_interval])) by ({NODE_LABEL}, connector, sink_id, actor_id, sink_name)",
"{{sink_id}} {{sink_name}} ({{connector}}) actor {{actor_id}} {{%s}}"
% NODE_LABEL,
panels.target_hidden(
f"sum(rate({metric('log_store_write_rows')}[$__rate_interval])) by ({NODE_LABEL}, sink_id, actor_id, sink_name)",
"{{sink_id}} {{sink_name}} @ actor {{actor_id}} {{%s}}" % NODE_LABEL,
),
],
),
Expand Down Expand Up @@ -4845,7 +4816,6 @@ def section_udf(outer_panels):
*section_grpc_hummock_meta_client(panels),
*section_frontend(panels),
*section_memory_manager(panels),
*section_connector_node(panels),
*section_sink_metrics(panels),
*section_kafka_metrics(panels),
*section_network_connection(panels),
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

33 changes: 0 additions & 33 deletions grafana/risingwave-user-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -822,38 +822,6 @@ def section_batch(outer_panels):
)
]


def section_connector_node(outer_panels):
panels = outer_panels.sub_panel()
return [
outer_panels.row_collapsed(
"Connector Node",
[
panels.timeseries_rowsps(
"Connector Source Throughput(rows)",
"",
[
panels.target(
f"rate({metric('connector_source_rows_received')}[$__rate_interval])",
"source={{source_type}} @ {{source_id}}",
),
],
),
panels.timeseries_rowsps(
"Connector Sink Throughput(rows)",
"",
[
panels.target(
f"rate({metric('connector_sink_rows_received')}[$__rate_interval])",
"sink={{connector_type}} @ {{sink_id}}",
),
],
),
],
)
]


templating_list = []
if dynamic_source_enabled:
templating_list.append(
Expand Down Expand Up @@ -990,6 +958,5 @@ def section_connector_node(outer_panels):
*section_storage(panels),
*section_streaming(panels),
*section_batch(panels),
*section_connector_node(panels),
],
).auto_panel_ids()
2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions src/connector/src/sink/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,9 +347,10 @@ pub struct MonitoredLogWriter<W: LogWriter> {
}

pub struct LogWriterMetrics {
pub log_store_first_write_epoch: LabelGuardedIntGauge<4>,
pub log_store_latest_write_epoch: LabelGuardedIntGauge<4>,
pub log_store_write_rows: LabelGuardedIntCounter<4>,
// Labels: [actor_id, sink_id, sink_name]
pub log_store_first_write_epoch: LabelGuardedIntGauge<3>,
pub log_store_latest_write_epoch: LabelGuardedIntGauge<3>,
pub log_store_write_rows: LabelGuardedIntCounter<3>,
}

impl<W: LogWriter> LogWriter for MonitoredLogWriter<W> {
Expand Down
43 changes: 20 additions & 23 deletions src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,12 +303,14 @@ pub static GLOBAL_SINK_METRICS: LazyLock<SinkMetrics> =
#[derive(Clone)]
pub struct SinkMetrics {
pub sink_commit_duration: LabelGuardedHistogramVec<4>,
pub connector_sink_rows_received: LabelGuardedIntCounterVec<3>,
pub connector_sink_rows_received: LabelGuardedIntCounterVec<4>,

// Log store metrics
pub log_store_first_write_epoch: LabelGuardedIntGaugeVec<4>,
pub log_store_latest_write_epoch: LabelGuardedIntGaugeVec<4>,
pub log_store_write_rows: LabelGuardedIntCounterVec<4>,
// Log store writer metrics
pub log_store_first_write_epoch: LabelGuardedIntGaugeVec<3>,
pub log_store_latest_write_epoch: LabelGuardedIntGaugeVec<3>,
pub log_store_write_rows: LabelGuardedIntCounterVec<3>,

// Log store reader metrics
pub log_store_latest_read_epoch: LabelGuardedIntGaugeVec<4>,
pub log_store_read_rows: LabelGuardedIntCounterVec<4>,
pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounterVec<4>,
Expand All @@ -334,31 +336,31 @@ impl SinkMetrics {
let connector_sink_rows_received = register_guarded_int_counter_vec_with_registry!(
"connector_sink_rows_received",
"Number of rows received by sink",
&["connector_type", "sink_id", "sink_name"],
&["actor_id", "connector_type", "sink_id", "sink_name"],
registry
)
.unwrap();

let log_store_first_write_epoch = register_guarded_int_gauge_vec_with_registry!(
"log_store_first_write_epoch",
"The first write epoch of log store",
&["actor_id", "connector", "sink_id", "sink_name"],
&["actor_id", "sink_id", "sink_name"],
registry
)
.unwrap();

let log_store_latest_write_epoch = register_guarded_int_gauge_vec_with_registry!(
"log_store_latest_write_epoch",
"The latest write epoch of log store",
&["actor_id", "connector", "sink_id", "sink_name"],
&["actor_id", "sink_id", "sink_name"],
registry
)
.unwrap();

let log_store_write_rows = register_guarded_int_counter_vec_with_registry!(
"log_store_write_rows",
"The write rate of rows",
&["actor_id", "connector", "sink_id", "sink_name"],
&["actor_id", "sink_id", "sink_name"],
registry
)
.unwrap();
Expand Down Expand Up @@ -467,28 +469,23 @@ pub struct SinkWriterParam {
#[derive(Clone)]
pub struct SinkWriterMetrics {
pub sink_commit_duration: LabelGuardedHistogram<4>,
pub connector_sink_rows_received: LabelGuardedIntCounter<3>,
pub connector_sink_rows_received: LabelGuardedIntCounter<4>,
}

impl SinkWriterMetrics {
pub fn new(writer_param: &SinkWriterParam) -> Self {
let labels = [
&writer_param.actor_id.to_string(),
writer_param.connector.as_str(),
&writer_param.sink_id.to_string(),
writer_param.sink_name.as_str(),
];
let sink_commit_duration = GLOBAL_SINK_METRICS
.sink_commit_duration
.with_guarded_label_values(&[
&writer_param.actor_id.to_string(),
writer_param.connector.as_str(),
&writer_param.sink_id.to_string(),
writer_param.sink_name.as_str(),
]);
.with_guarded_label_values(&labels);
let connector_sink_rows_received = GLOBAL_SINK_METRICS
.connector_sink_rows_received
.with_guarded_label_values(&[
// TODO: should have `actor_id` as label
// &writer_param.actor_id.to_string(),
writer_param.connector.as_str(),
&writer_param.sink_id.to_string(),
writer_param.sink_name.as_str(),
]);
.with_guarded_label_values(&labels);
Self {
sink_commit_duration,
connector_sink_rows_received,
Expand Down
6 changes: 0 additions & 6 deletions src/connector/src/source/cdc/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,19 +209,13 @@ impl<T: CdcSourceTypeTrait> SplitReader for CdcSplitReader<T> {
impl<T: CdcSourceTypeTrait> CdcSplitReader<T> {
#[try_stream(ok = Vec<SourceMessage>, error = ConnectorError)]
async fn into_data_stream(self) {
let source_type = T::source_type();
let mut rx = self.rx;
let source_id = self.source_id.to_string();
let metrics = self.source_ctx.metrics.clone();
let connector_source_rows_received_metrics = metrics
.connector_source_rows_received
.with_guarded_label_values(&[source_type.as_str_name(), &source_id]);

while let Some(result) = rx.recv().await {
match result {
Ok(GetEventStreamResponse { events, .. }) => {
tracing::trace!("receive {} cdc events ", events.len());
connector_source_rows_received_metrics.inc_by(events.len() as u64);
let msgs = events.into_iter().map(SourceMessage::from).collect_vec();
yield msgs;
}
Expand Down
11 changes: 0 additions & 11 deletions src/connector/src/source/monitor/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ pub struct SourceMetrics {
pub latest_message_id: LabelGuardedIntGaugeVec<3>,
pub rdkafka_native_metric: Arc<RdKafkaStats>,

pub connector_source_rows_received: LabelGuardedIntCounterVec<2>,

pub direct_cdc_event_lag_latency: LabelGuardedHistogramVec<1>,
}

Expand Down Expand Up @@ -112,14 +110,6 @@ impl SourceMetrics {
)
.unwrap();

let connector_source_rows_received = register_guarded_int_counter_vec_with_registry!(
"source_rows_received",
"Number of rows received by source",
&["source_type", "source_id"],
registry
)
.unwrap();

let opts = histogram_opts!(
"source_cdc_event_lag_duration_milliseconds",
"source_cdc_lag_latency",
Expand All @@ -134,7 +124,6 @@ impl SourceMetrics {
partition_input_bytes,
latest_message_id,
rdkafka_native_metric,
connector_source_rows_received,
direct_cdc_event_lag_latency,
}
}
Expand Down
1 change: 0 additions & 1 deletion src/stream/src/executor/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,6 @@ impl<F: LogStoreFactory> SinkExecutor<F> {
} else {
let labels = [
&actor_id.to_string(),
"NA", // TODO: remove the connector label for log writer metrics
&sink_id.to_string(),
self.sink_param.sink_name.as_str(),
];
Expand Down

0 comments on commit 4dfc4c9

Please sign in to comment.