From fcfd1c7d0a47e08ad874b14bbfc7ec32a1efb24f Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Wed, 11 Dec 2024 13:23:19 +0100 Subject: [PATCH 1/3] pageserver: don't drop multixact slrus on non zero shards --- pageserver/src/walingest.rs | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index b7712cfac71c..e5b23fed5155 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -1049,16 +1049,18 @@ impl WalIngest { // Delete all the segments except the last one. The last segment can still // contain, possibly partially, valid data. - while segment != endsegment { - modification - .drop_slru_segment(SlruKind::MultiXactMembers, segment as u32, ctx) - .await?; + if modification.tline.get_shard_identity().is_shard_zero() { + while segment != endsegment { + modification + .drop_slru_segment(SlruKind::MultiXactMembers, segment as u32, ctx) + .await?; - /* move to next segment, handling wraparound correctly */ - if segment == maxsegment { - segment = 0; - } else { - segment += 1; + /* move to next segment, handling wraparound correctly */ + if segment == maxsegment { + segment = 0; + } else { + segment += 1; + } } } From fde1046278bbf432666e93c5d88d4366fb86ce9b Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Wed, 11 Dec 2024 12:35:02 +0000 Subject: [PATCH 2/3] wal_decoder: fix compact key protobuf encoding (#10074) ## Problem Protobuf doesn't support 128 bit integers, so we encode the keys as two 64 bit integers. Issue is that when we split the 128 bit compact key we use signed 64 bit integers to represent the two halves. This may result in a negative lower half when relnode is larger than `0x00800000`. When we convert the lower half to an i128 we get a negative `CompactKey`. ## Summary of Changes Use unsigned integers when encoding into Protobuf. ## Deployment * Prod: We disabled the interpreted proto, so no compat concerns. * Staging: Disable the interpreted proto, do one release, and then release the fixed version. We do this because a negative int32 will convert to a large uint32 value and could give a key in the actual pageserver space. In production we would around this by adding new fields to the proto and deprecating the old ones, but we can make our lives easy here. * Pre-prod: Same as staging --- libs/pageserver_api/src/key.rs | 2 +- libs/wal_decoder/proto/interpreted_wal.proto | 4 +- libs/wal_decoder/src/wire_format.rs | 65 +++++++++++++++++++- 3 files changed, 66 insertions(+), 5 deletions(-) diff --git a/libs/pageserver_api/src/key.rs b/libs/pageserver_api/src/key.rs index 37dff6fe4647..373329c9b464 100644 --- a/libs/pageserver_api/src/key.rs +++ b/libs/pageserver_api/src/key.rs @@ -24,7 +24,7 @@ pub struct Key { /// When working with large numbers of Keys in-memory, it is more efficient to handle them as i128 than as /// a struct of fields. -#[derive(Clone, Copy, Hash, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize)] +#[derive(Clone, Copy, Hash, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize, Debug)] pub struct CompactKey(i128); /// The storage key size. diff --git a/libs/wal_decoder/proto/interpreted_wal.proto b/libs/wal_decoder/proto/interpreted_wal.proto index 0393392c1aa4..d68484d30f7b 100644 --- a/libs/wal_decoder/proto/interpreted_wal.proto +++ b/libs/wal_decoder/proto/interpreted_wal.proto @@ -37,7 +37,7 @@ message ValueMeta { } message CompactKey { - int64 high = 1; - int64 low = 2; + uint64 high = 1; + uint64 low = 2; } diff --git a/libs/wal_decoder/src/wire_format.rs b/libs/wal_decoder/src/wire_format.rs index 5a343054c3f1..944ee5c91974 100644 --- a/libs/wal_decoder/src/wire_format.rs +++ b/libs/wal_decoder/src/wire_format.rs @@ -236,8 +236,8 @@ impl From for proto::ValueMeta { impl From for proto::CompactKey { fn from(value: CompactKey) -> Self { proto::CompactKey { - high: (value.raw() >> 64) as i64, - low: value.raw() as i64, + high: (value.raw() >> 64) as u64, + low: value.raw() as u64, } } } @@ -354,3 +354,64 @@ impl From for CompactKey { (((value.high as i128) << 64) | (value.low as i128)).into() } } + +#[test] +fn test_compact_key_with_large_relnode() { + use pageserver_api::key::Key; + + let inputs = vec![ + Key { + field1: 0, + field2: 0x100, + field3: 0x200, + field4: 0, + field5: 0x10, + field6: 0x5, + }, + Key { + field1: 0, + field2: 0x100, + field3: 0x200, + field4: 0x007FFFFF, + field5: 0x10, + field6: 0x5, + }, + Key { + field1: 0, + field2: 0x100, + field3: 0x200, + field4: 0x00800000, + field5: 0x10, + field6: 0x5, + }, + Key { + field1: 0, + field2: 0x100, + field3: 0x200, + field4: 0x00800001, + field5: 0x10, + field6: 0x5, + }, + Key { + field1: 0, + field2: 0xFFFFFFFF, + field3: 0xFFFFFFFF, + field4: 0xFFFFFFFF, + field5: 0x0, + field6: 0x0, + }, + ]; + + for input in inputs { + assert!(input.is_valid_key_on_write_path()); + let compact = input.to_compact(); + let proto: proto::CompactKey = compact.into(); + let from_proto: CompactKey = proto.into(); + + assert_eq!( + compact, from_proto, + "Round trip failed for key with relnode={:#x}", + input.field4 + ); + } +} From c4ce4ac25ad95c28e7de8b4d2d0bc244caeb52d6 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 11 Dec 2024 14:37:08 +0100 Subject: [PATCH 3/3] page_service: don't count time spent in Batcher towards smgr latency metrics (#10075) ## Problem With pipelining enabled, the time a request spends in the batcher stage counts towards the smgr op latency. If pipelining is disabled, that time is not accounted for. In practice, this results in a jump in smgr getpage latencies in various dashboards and degrades the internal SLO. ## Solution In a similar vein to #10042 and with a similar rationale, this PR stops counting the time spent in batcher stage towards smgr op latency. The smgr op latency metric is reduced to the actual execution time. Time spent in batcher stage is tracked in a separate histogram. I expect to remove that histogram after batching rollout is complete, but it will be helpful in the meantime to reason about the rollout. --- pageserver/src/metrics.rs | 170 +++++++++++++++++++++--------- pageserver/src/page_service.rs | 13 ++- pageserver/src/tenant/throttle.rs | 17 +-- test_runner/fixtures/metrics.py | 1 + 4 files changed, 143 insertions(+), 58 deletions(-) diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 96ee1578563b..b4e20cb8b90e 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -16,7 +16,6 @@ use postgres_backend::{is_expected_io_error, QueryError}; use pq_proto::framed::ConnectionError; use strum::{EnumCount, VariantNames}; use strum_macros::{IntoStaticStr, VariantNames}; -use tracing::warn; use utils::id::TimelineId; /// Prometheus histogram buckets (in seconds) for operations in the critical @@ -1225,32 +1224,58 @@ pub(crate) mod virtual_file_io_engine { pub(crate) struct SmgrOpTimer(Option); pub(crate) struct SmgrOpTimerInner { - global_latency_histo: Histogram, + global_execution_latency_histo: Histogram, + per_timeline_execution_latency_histo: Option, - // Optional because not all op types are tracked per-timeline - per_timeline_latency_histo: Option, + global_batch_wait_time: Histogram, + per_timeline_batch_wait_time: Histogram, global_flush_in_progress_micros: IntCounter, per_timeline_flush_in_progress_micros: IntCounter, - start: Instant, - throttled: Duration, - op: SmgrQueryType, + timings: SmgrOpTimerState, +} + +#[derive(Debug)] +enum SmgrOpTimerState { + Received { + received_at: Instant, + }, + ThrottleDoneExecutionStarting { + received_at: Instant, + throttle_started_at: Instant, + started_execution_at: Instant, + }, } pub(crate) struct SmgrOpFlushInProgress { - base: Instant, + flush_started_at: Instant, global_micros: IntCounter, per_timeline_micros: IntCounter, } impl SmgrOpTimer { - pub(crate) fn deduct_throttle(&mut self, throttle: &Option) { - let Some(throttle) = throttle else { - return; - }; + pub(crate) fn observe_throttle_done_execution_starting(&mut self, throttle: &ThrottleResult) { let inner = self.0.as_mut().expect("other public methods consume self"); - inner.throttled += *throttle; + match (&mut inner.timings, throttle) { + (SmgrOpTimerState::Received { received_at }, throttle) => match throttle { + ThrottleResult::NotThrottled { start } => { + inner.timings = SmgrOpTimerState::ThrottleDoneExecutionStarting { + received_at: *received_at, + throttle_started_at: *start, + started_execution_at: *start, + }; + } + ThrottleResult::Throttled { start, end } => { + inner.timings = SmgrOpTimerState::ThrottleDoneExecutionStarting { + received_at: *start, + throttle_started_at: *start, + started_execution_at: *end, + }; + } + }, + (x, _) => panic!("called in unexpected state: {x:?}"), + } } pub(crate) fn observe_smgr_op_completion_and_start_flushing(mut self) -> SmgrOpFlushInProgress { @@ -1263,7 +1288,7 @@ impl SmgrOpTimer { .. } = inner; SmgrOpFlushInProgress { - base: flush_start, + flush_started_at: flush_start, global_micros: global_flush_in_progress_micros, per_timeline_micros: per_timeline_flush_in_progress_micros, } @@ -1274,32 +1299,42 @@ impl SmgrOpTimer { let inner = self.0.take()?; let now = Instant::now(); - let elapsed = now - inner.start; - - let elapsed = match elapsed.checked_sub(inner.throttled) { - Some(elapsed) => elapsed, - None => { - use utils::rate_limit::RateLimit; - static LOGGED: Lazy>> = - Lazy::new(|| { - Mutex::new(enum_map::EnumMap::from_array(std::array::from_fn(|_| { - RateLimit::new(Duration::from_secs(10)) - }))) - }); - let mut guard = LOGGED.lock().unwrap(); - let rate_limit = &mut guard[inner.op]; - rate_limit.call(|| { - warn!(op=?inner.op, ?elapsed, ?inner.throttled, "implementation error: time spent throttled exceeds total request wall clock time"); - }); - elapsed // un-throttled time, more info than just saturating to 0 + + let batch; + let execution; + let throttle; + match inner.timings { + SmgrOpTimerState::Received { received_at } => { + batch = (now - received_at).as_secs_f64(); + // TODO: use label for dropped requests. + // This is quite rare in practice, only during tenant/pageservers shutdown. + throttle = Duration::ZERO; + execution = Duration::ZERO.as_secs_f64(); } - }; + SmgrOpTimerState::ThrottleDoneExecutionStarting { + received_at, + throttle_started_at, + started_execution_at, + } => { + batch = (throttle_started_at - received_at).as_secs_f64(); + throttle = started_execution_at - throttle_started_at; + execution = (now - started_execution_at).as_secs_f64(); + } + } + + // update time spent in batching + inner.global_batch_wait_time.observe(batch); + inner.per_timeline_batch_wait_time.observe(batch); - let elapsed = elapsed.as_secs_f64(); + // time spent in throttle metric is updated by throttle impl + let _ = throttle; - inner.global_latency_histo.observe(elapsed); - if let Some(per_timeline_getpage_histo) = &inner.per_timeline_latency_histo { - per_timeline_getpage_histo.observe(elapsed); + // update metrics for execution latency + inner.global_execution_latency_histo.observe(execution); + if let Some(per_timeline_execution_latency_histo) = + &inner.per_timeline_execution_latency_histo + { + per_timeline_execution_latency_histo.observe(execution); } Some((now, inner)) @@ -1325,12 +1360,12 @@ impl SmgrOpFlushInProgress { // Last call is tracked in `now`. let mut observe_guard = scopeguard::guard( || { - let elapsed = now - self.base; + let elapsed = now - self.flush_started_at; self.global_micros .inc_by(u64::try_from(elapsed.as_micros()).unwrap()); self.per_timeline_micros .inc_by(u64::try_from(elapsed.as_micros()).unwrap()); - self.base = now; + self.flush_started_at = now; }, |mut observe| { observe(); @@ -1377,6 +1412,8 @@ pub(crate) struct SmgrQueryTimePerTimeline { per_timeline_batch_size: Histogram, global_flush_in_progress_micros: IntCounter, per_timeline_flush_in_progress_micros: IntCounter, + global_batch_wait_time: Histogram, + per_timeline_batch_wait_time: Histogram, } static SMGR_QUERY_STARTED_GLOBAL: Lazy = Lazy::new(|| { @@ -1399,12 +1436,15 @@ static SMGR_QUERY_STARTED_PER_TENANT_TIMELINE: Lazy = Lazy::new(| .expect("failed to define a metric") }); +// Alias so all histograms recording per-timeline smgr timings use the same buckets. +static SMGR_QUERY_TIME_PER_TENANT_TIMELINE_BUCKETS: &[f64] = CRITICAL_OP_BUCKETS; + static SMGR_QUERY_TIME_PER_TENANT_TIMELINE: Lazy = Lazy::new(|| { register_histogram_vec!( "pageserver_smgr_query_seconds", - "Time spent on smgr query handling, aggegated by query type and tenant/timeline.", + "Time spent _executing_ smgr query handling, excluding batch and throttle delays.", &["smgr_query_type", "tenant_id", "shard_id", "timeline_id"], - CRITICAL_OP_BUCKETS.into(), + SMGR_QUERY_TIME_PER_TENANT_TIMELINE_BUCKETS.into(), ) .expect("failed to define a metric") }); @@ -1462,7 +1502,7 @@ static SMGR_QUERY_TIME_GLOBAL_BUCKETS: Lazy> = Lazy::new(|| { static SMGR_QUERY_TIME_GLOBAL: Lazy = Lazy::new(|| { register_histogram_vec!( "pageserver_smgr_query_seconds_global", - "Time spent on smgr query handling, aggregated by query type.", + "Like pageserver_smgr_query_seconds, but aggregated to instance level.", &["smgr_query_type"], SMGR_QUERY_TIME_GLOBAL_BUCKETS.clone(), ) @@ -1559,6 +1599,25 @@ static PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS_GLOBAL: Lazy = Lazy .expect("failed to define a metric") }); +static PAGE_SERVICE_SMGR_BATCH_WAIT_TIME: Lazy = Lazy::new(|| { + register_histogram_vec!( + "pageserver_page_service_pagestream_batch_wait_time_seconds", + "Time a request spent waiting in its batch until the batch moved to throttle&execution.", + &["tenant_id", "shard_id", "timeline_id"], + SMGR_QUERY_TIME_PER_TENANT_TIMELINE_BUCKETS.into(), + ) + .expect("failed to define a metric") +}); + +static PAGE_SERVICE_SMGR_BATCH_WAIT_TIME_GLOBAL: Lazy = Lazy::new(|| { + register_histogram!( + "pageserver_page_service_pagestream_batch_wait_time_seconds_global", + "Like pageserver_page_service_pagestream_batch_wait_time_seconds, but aggregated to instance level.", + SMGR_QUERY_TIME_GLOBAL_BUCKETS.to_vec(), + ) + .expect("failed to define a metric") +}); + impl SmgrQueryTimePerTimeline { pub(crate) fn new(tenant_shard_id: &TenantShardId, timeline_id: &TimelineId) -> Self { let tenant_id = tenant_shard_id.tenant_id.to_string(); @@ -1599,6 +1658,11 @@ impl SmgrQueryTimePerTimeline { .get_metric_with_label_values(&[&tenant_id, &shard_slug, &timeline_id]) .unwrap(); + let global_batch_wait_time = PAGE_SERVICE_SMGR_BATCH_WAIT_TIME_GLOBAL.clone(); + let per_timeline_batch_wait_time = PAGE_SERVICE_SMGR_BATCH_WAIT_TIME + .get_metric_with_label_values(&[&tenant_id, &shard_slug, &timeline_id]) + .unwrap(); + let global_flush_in_progress_micros = PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS_GLOBAL.clone(); let per_timeline_flush_in_progress_micros = PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS @@ -1614,9 +1678,11 @@ impl SmgrQueryTimePerTimeline { per_timeline_batch_size, global_flush_in_progress_micros, per_timeline_flush_in_progress_micros, + global_batch_wait_time, + per_timeline_batch_wait_time, } } - pub(crate) fn start_smgr_op(&self, op: SmgrQueryType, started_at: Instant) -> SmgrOpTimer { + pub(crate) fn start_smgr_op(&self, op: SmgrQueryType, received_at: Instant) -> SmgrOpTimer { self.global_started[op as usize].inc(); let per_timeline_latency_histo = if matches!(op, SmgrQueryType::GetPageAtLsn) { @@ -1627,15 +1693,15 @@ impl SmgrQueryTimePerTimeline { }; SmgrOpTimer(Some(SmgrOpTimerInner { - global_latency_histo: self.global_latency[op as usize].clone(), - per_timeline_latency_histo, - start: started_at, - op, - throttled: Duration::ZERO, + global_execution_latency_histo: self.global_latency[op as usize].clone(), + per_timeline_execution_latency_histo: per_timeline_latency_histo, + timings: SmgrOpTimerState::Received { received_at }, global_flush_in_progress_micros: self.global_flush_in_progress_micros.clone(), per_timeline_flush_in_progress_micros: self .per_timeline_flush_in_progress_micros .clone(), + global_batch_wait_time: self.global_batch_wait_time.clone(), + per_timeline_batch_wait_time: self.per_timeline_batch_wait_time.clone(), })) } @@ -2889,6 +2955,11 @@ impl TimelineMetrics { shard_id, timeline_id, ]); + let _ = PAGE_SERVICE_SMGR_BATCH_WAIT_TIME.remove_label_values(&[ + tenant_id, + shard_id, + timeline_id, + ]); } } @@ -2919,6 +2990,7 @@ use crate::context::{PageContentKind, RequestContext}; use crate::task_mgr::TaskKind; use crate::tenant::mgr::TenantSlot; use crate::tenant::tasks::BackgroundLoopKind; +use crate::tenant::throttle::ThrottleResult; use crate::tenant::Timeline; /// Maintain a per timeline gauge in addition to the global gauge. @@ -3773,6 +3845,7 @@ pub fn preinitialize_metrics(conf: &'static PageServerConf) { &REMOTE_ONDEMAND_DOWNLOADED_BYTES, &CIRCUIT_BREAKERS_BROKEN, &CIRCUIT_BREAKERS_UNBROKEN, + &PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS_GLOBAL, ] .into_iter() .for_each(|c| { @@ -3820,6 +3893,7 @@ pub fn preinitialize_metrics(conf: &'static PageServerConf) { &WAL_REDO_BYTES_HISTOGRAM, &WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM, &PAGE_SERVICE_BATCH_SIZE_GLOBAL, + &PAGE_SERVICE_SMGR_BATCH_WAIT_TIME_GLOBAL, ] .into_iter() .for_each(|h| { diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 97d94bbe7f33..d00ec11a7611 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -575,7 +575,10 @@ enum BatchedFeMessage { } impl BatchedFeMessage { - async fn throttle(&mut self, cancel: &CancellationToken) -> Result<(), QueryError> { + async fn throttle_and_record_start_processing( + &mut self, + cancel: &CancellationToken, + ) -> Result<(), QueryError> { let (shard, tokens, timers) = match self { BatchedFeMessage::Exists { shard, timer, .. } | BatchedFeMessage::Nblocks { shard, timer, .. } @@ -603,7 +606,7 @@ impl BatchedFeMessage { } }; for timer in timers { - timer.deduct_throttle(&throttled); + timer.observe_throttle_done_execution_starting(&throttled); } Ok(()) } @@ -1230,7 +1233,7 @@ impl PageServerHandler { } }; - if let Err(cancelled) = msg.throttle(&self.cancel).await { + if let Err(cancelled) = msg.throttle_and_record_start_processing(&self.cancel).await { break cancelled; } @@ -1397,7 +1400,9 @@ impl PageServerHandler { return Err(e); } }; - batch.throttle(&self.cancel).await?; + batch + .throttle_and_record_start_processing(&self.cancel) + .await?; self.pagesteam_handle_batched_message(pgb_writer, batch, &cancel, &ctx) .await?; } diff --git a/pageserver/src/tenant/throttle.rs b/pageserver/src/tenant/throttle.rs index 54c0e59daaf6..8ab6a0e0600d 100644 --- a/pageserver/src/tenant/throttle.rs +++ b/pageserver/src/tenant/throttle.rs @@ -58,6 +58,11 @@ pub struct Stats { pub sum_throttled_usecs: u64, } +pub enum ThrottleResult { + NotThrottled { start: Instant }, + Throttled { start: Instant, end: Instant }, +} + impl Throttle where M: Metric, @@ -122,15 +127,15 @@ where self.inner.load().rate_limiter.steady_rps() } - pub async fn throttle(&self, key_count: usize) -> Option { + pub async fn throttle(&self, key_count: usize) -> ThrottleResult { let inner = self.inner.load_full(); // clones the `Inner` Arc + let start = std::time::Instant::now(); + if !inner.enabled { - return None; + return ThrottleResult::NotThrottled { start }; } - let start = std::time::Instant::now(); - self.metric.accounting_start(); self.count_accounted_start.fetch_add(1, Ordering::Relaxed); let did_throttle = inner.rate_limiter.acquire(key_count).await; @@ -145,9 +150,9 @@ where .fetch_add(wait_time.as_micros() as u64, Ordering::Relaxed); let observation = Observation { wait_time }; self.metric.observe_throttling(&observation); - Some(wait_time) + ThrottleResult::Throttled { start, end: now } } else { - None + ThrottleResult::NotThrottled { start } } } } diff --git a/test_runner/fixtures/metrics.py b/test_runner/fixtures/metrics.py index a591e088eff7..c5295360c339 100644 --- a/test_runner/fixtures/metrics.py +++ b/test_runner/fixtures/metrics.py @@ -178,6 +178,7 @@ def counter(name: str) -> str: counter("pageserver_timeline_wal_records_received"), counter("pageserver_page_service_pagestream_flush_in_progress_micros"), *histogram("pageserver_page_service_batch_size"), + *histogram("pageserver_page_service_pagestream_batch_wait_time_seconds"), *PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS, # "pageserver_directory_entries_count", -- only used if above a certain threshold # "pageserver_broken_tenants_count" -- used only for broken