Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

page_service: don't count time spent flushing towards smgr latency metrics #10042

Merged
merged 10 commits into from
Dec 7, 2024
138 changes: 125 additions & 13 deletions pageserver/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1223,31 +1223,60 @@ pub(crate) mod virtual_file_io_engine {
});
}

pub(crate) struct SmgrOpTimer {
pub(crate) struct SmgrOpTimer(Option<SmgrOpTimerInner>);
pub(crate) struct SmgrOpTimerInner {
global_latency_histo: Histogram,

// Optional because not all op types are tracked per-timeline
per_timeline_latency_histo: Option<Histogram>,

global_flush_in_progress_micros: IntCounter,
per_timeline_flush_in_progress_micros: IntCounter,

start: Instant,
throttled: Duration,
op: SmgrQueryType,
}

pub(crate) struct SmgrOpFlushInProgress {
base: Instant,
global_micros: IntCounter,
per_timeline_micros: IntCounter,
}

impl SmgrOpTimer {
pub(crate) fn deduct_throttle(&mut self, throttle: &Option<Duration>) {
let Some(throttle) = throttle else {
return;
};
self.throttled += *throttle;
let inner = self.0.as_mut().expect("other public methods consume self");
inner.throttled += *throttle;
}
}

impl Drop for SmgrOpTimer {
fn drop(&mut self) {
let elapsed = self.start.elapsed();
pub(crate) fn observe_smgr_op_completion_and_start_flushing(mut self) -> SmgrOpFlushInProgress {
let (flush_start, inner) = self
.smgr_op_end()
.expect("this method consume self, and the only other caller is drop handler");
let SmgrOpTimerInner {
global_flush_in_progress_micros,
per_timeline_flush_in_progress_micros,
..
} = inner;
SmgrOpFlushInProgress {
base: flush_start,
global_micros: global_flush_in_progress_micros,
per_timeline_micros: per_timeline_flush_in_progress_micros,
}
}

let elapsed = match elapsed.checked_sub(self.throttled) {
/// Returns `None`` if this method has already been called, `Some` otherwise.
fn smgr_op_end(&mut self) -> Option<(Instant, SmgrOpTimerInner)> {
let inner = self.0.take()?;

let now = Instant::now();
let elapsed = now - inner.start;

let elapsed = match elapsed.checked_sub(inner.throttled) {
Some(elapsed) => elapsed,
None => {
use utils::rate_limit::RateLimit;
Expand All @@ -1258,20 +1287,64 @@ impl Drop for SmgrOpTimer {
})))
});
let mut guard = LOGGED.lock().unwrap();
let rate_limit = &mut guard[self.op];
let rate_limit = &mut guard[inner.op];
rate_limit.call(|| {
warn!(op=?self.op, ?elapsed, ?self.throttled, "implementation error: time spent throttled exceeds total request wall clock time");
warn!(op=?inner.op, ?elapsed, ?inner.throttled, "implementation error: time spent throttled exceeds total request wall clock time");
});
elapsed // un-throttled time, more info than just saturating to 0
}
};

let elapsed = elapsed.as_secs_f64();

self.global_latency_histo.observe(elapsed);
if let Some(per_timeline_getpage_histo) = &self.per_timeline_latency_histo {
inner.global_latency_histo.observe(elapsed);
if let Some(per_timeline_getpage_histo) = &inner.per_timeline_latency_histo {
per_timeline_getpage_histo.observe(elapsed);
}

Some((now, inner))
}
}

impl Drop for SmgrOpTimer {
fn drop(&mut self) {
self.smgr_op_end();
}
}
problame marked this conversation as resolved.
Show resolved Hide resolved

impl SmgrOpFlushInProgress {
pub(crate) async fn measure<Fut, O>(mut self, mut fut: Fut) -> O
where
Fut: std::future::Future<Output = O>,
{
let mut fut = std::pin::pin!(fut);

let now = Instant::now();
// Whenever observe_guard gets called, or dropped,
// it adds the time elapsed since its last call to metrics.
// Last call is tracked in `now`.
let mut observe_guard = scopeguard::guard(
|| {
let elapsed = now - self.base;
self.global_micros
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
self.per_timeline_micros
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
self.base = now;
},
|mut observe| {
observe();
},
);
problame marked this conversation as resolved.
Show resolved Hide resolved

loop {
match tokio::time::timeout(Duration::from_secs(10), &mut fut).await {
Ok(v) => return v,
Err(_timeout) => {
(*observe_guard)();
}
}
}
}
}

Expand Down Expand Up @@ -1302,6 +1375,8 @@ pub(crate) struct SmgrQueryTimePerTimeline {
per_timeline_getpage_latency: Histogram,
global_batch_size: Histogram,
per_timeline_batch_size: Histogram,
global_flush_in_progress_micros: IntCounter,
per_timeline_flush_in_progress_micros: IntCounter,
}

static SMGR_QUERY_STARTED_GLOBAL: Lazy<IntCounterVec> = Lazy::new(|| {
Expand Down Expand Up @@ -1464,6 +1539,26 @@ fn set_page_service_config_max_batch_size(conf: &PageServicePipeliningConfig) {
.set(value.try_into().unwrap());
}

static PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_page_service_pagestream_flush_in_progress_micros",
"Counter that sums up the microseconds that a pagestream response was being flushed into the TCP connection. \
If the flush is particularly slow, this counter will be updated periodically to make slow flushes \
easily discoverable in monitoring. \
Hence, this is NOT a completion latency historgram.",
&["tenant_id", "shard_id", "timeline_id"],
)
.expect("failed to define a metric")
});

static PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS_GLOBAL: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_page_service_pagestream_flush_in_progress_micros_global",
"Like pageserver_page_service_pagestream_flush_in_progress_seconds, but instance-wide.",
)
.expect("failed to define a metric")
});

impl SmgrQueryTimePerTimeline {
pub(crate) fn new(tenant_shard_id: &TenantShardId, timeline_id: &TimelineId) -> Self {
let tenant_id = tenant_shard_id.tenant_id.to_string();
Expand Down Expand Up @@ -1504,13 +1599,21 @@ impl SmgrQueryTimePerTimeline {
.get_metric_with_label_values(&[&tenant_id, &shard_slug, &timeline_id])
.unwrap();

let global_flush_in_progress_micros =
PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS_GLOBAL.clone();
let per_timeline_flush_in_progress_micros = PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS
.get_metric_with_label_values(&[&tenant_id, &shard_slug, &timeline_id])
.unwrap();

Self {
global_started,
global_latency,
per_timeline_getpage_latency,
per_timeline_getpage_started,
global_batch_size,
per_timeline_batch_size,
global_flush_in_progress_micros,
per_timeline_flush_in_progress_micros,
}
}
pub(crate) fn start_smgr_op(&self, op: SmgrQueryType, started_at: Instant) -> SmgrOpTimer {
Expand All @@ -1523,13 +1626,17 @@ impl SmgrQueryTimePerTimeline {
None
};

SmgrOpTimer {
SmgrOpTimer(Some(SmgrOpTimerInner {
global_latency_histo: self.global_latency[op as usize].clone(),
per_timeline_latency_histo,
start: started_at,
op,
throttled: Duration::ZERO,
}
global_flush_in_progress_micros: self.global_flush_in_progress_micros.clone(),
per_timeline_flush_in_progress_micros: self
.per_timeline_flush_in_progress_micros
.clone(),
}))
}

pub(crate) fn observe_getpage_batch_start(&self, batch_size: usize) {
Expand Down Expand Up @@ -2777,6 +2884,11 @@ impl TimelineMetrics {
shard_id,
timeline_id,
]);
let _ = PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS.remove_label_values(&[
tenant_id,
shard_id,
timeline_id,
]);
}
}

Expand Down
76 changes: 53 additions & 23 deletions pageserver/src/page_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1017,10 +1017,8 @@ impl PageServerHandler {
// Map handler result to protocol behavior.
// Some handler errors cause exit from pagestream protocol.
// Other handler errors are sent back as an error message and we stay in pagestream protocol.
let mut timers: smallvec::SmallVec<[_; 1]> =
smallvec::SmallVec::with_capacity(handler_results.len());
for handler_result in handler_results {
let response_msg = match handler_result {
let (response_msg, timer) = match handler_result {
Err(e) => match &e {
PageStreamError::Shutdown => {
// If we fail to fulfil a request during shutdown, which may be _because_ of
Expand All @@ -1044,34 +1042,66 @@ impl PageServerHandler {
span.in_scope(|| {
error!("error reading relation or page version: {full:#}")
});
PagestreamBeMessage::Error(PagestreamErrorResponse {
message: e.to_string(),
})
(
PagestreamBeMessage::Error(PagestreamErrorResponse {
message: e.to_string(),
}),
None, // TODO: measure errors
)
}
},
Ok((response_msg, timer)) => {
// Extending the lifetime of the timers so observations on drop
// include the flush time.
timers.push(timer);
response_msg
}
Ok((response_msg, timer)) => (response_msg, Some(timer)),
};

//
// marshal & transmit response message
//

pgb_writer.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?;
}
tokio::select! {
biased;
_ = cancel.cancelled() => {
// We were requested to shut down.
info!("shutdown request received in page handler");
return Err(QueryError::Shutdown)
}
res = pgb_writer.flush() => {
res?;

// We purposefully don't count flush time into the timer.
//
// The reason is that current compute client will not perform protocol processing
// if the postgres backend process is doing things other than `->smgr_read()`.
// This is especially the case for prefetch.
//
// If the compute doesn't read from the connection, eventually TCP will backpressure
// all the way into our flush call below.
//
// The timer's underlying metric is used for a storage-internal latency SLO and
// we don't want to include latency in it that we can't control.
// And as pointed out above, in this case, we don't control the time that flush will take.
let flushing_timer =
timer.map(|timer| timer.observe_smgr_op_completion_and_start_flushing());

// what we want to do
let flush_fut = pgb_writer.flush();
// metric for how long flushing takes
let flush_fut = match flushing_timer {
Some(flushing_timer) => {
futures::future::Either::Left(flushing_timer.measure(flush_fut))
}
None => futures::future::Either::Right(flush_fut),
};
// do it while respecting cancellation
let _: () = async move {
tokio::select! {
biased;
_ = cancel.cancelled() => {
// We were requested to shut down.
info!("shutdown request received in page handler");
return Err(QueryError::Shutdown)
}
res = flush_fut => {
res?;
}
}
Ok(())
}
// and log the info! line inside the request span
.instrument(span.clone())
.await?;
}
drop(timers);
Ok(())
}

Expand Down
1 change: 1 addition & 0 deletions test_runner/fixtures/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ def counter(name: str) -> str:
counter("pageserver_tenant_throttling_wait_usecs_sum"),
counter("pageserver_tenant_throttling_count"),
counter("pageserver_timeline_wal_records_received"),
counter("pageserver_page_service_pagestream_flush_in_progress_micros"),
*histogram("pageserver_page_service_batch_size"),
*PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS,
# "pageserver_directory_entries_count", -- only used if above a certain threshold
Expand Down
Loading