From 593e35027a088cadeb74b8c6e6f08877495986b3 Mon Sep 17 00:00:00 2001 From: John Spray Date: Wed, 20 Nov 2024 14:57:59 +0000 Subject: [PATCH 01/12] tests: use fewer pageservers in test_sharding_split_smoke (#9804) ## Problem This test uses a gratuitous number of pageservers (16). This works fine when there are plenty of system resources, but causes issues on test runners that have limited resources and run many tests concurrently. Related: https://github.com/neondatabase/neon/issues/9802 ## Summary of changes - Split from 2 shards to 4, instead of 4 to 8 - Don't give every shard a separate pageserver, let two locations share each pageserver. Net result is 4 pageservers instead of 16 --- test_runner/regress/test_sharding.py | 39 ++++++++++------------------ 1 file changed, 14 insertions(+), 25 deletions(-) diff --git a/test_runner/regress/test_sharding.py b/test_runner/regress/test_sharding.py index 0a4a53356d94..84737fc81e2b 100644 --- a/test_runner/regress/test_sharding.py +++ b/test_runner/regress/test_sharding.py @@ -515,11 +515,12 @@ def test_sharding_split_smoke( """ - # We will start with 4 shards and split into 8, then migrate all those - # 8 shards onto separate pageservers - shard_count = 4 - split_shard_count = 8 - neon_env_builder.num_pageservers = split_shard_count * 2 + # Shard count we start with + shard_count = 2 + # Shard count we split into + split_shard_count = 4 + # We will have 2 shards per pageserver once done (including secondaries) + neon_env_builder.num_pageservers = split_shard_count # 1MiB stripes: enable getting some meaningful data distribution without # writing large quantities of data in this test. The stripe size is given @@ -591,7 +592,7 @@ def test_sharding_split_smoke( workload.validate() - assert len(pre_split_pageserver_ids) == 4 + assert len(pre_split_pageserver_ids) == shard_count def shards_on_disk(shard_ids): for pageserver in env.pageservers: @@ -654,9 +655,9 @@ def shards_on_disk(shard_ids): # - shard_count reconciles for the original setup of the tenant # - shard_count reconciles for detaching the original secondary locations during split # - split_shard_count reconciles during shard splitting, for setting up secondaries. - # - shard_count of the child shards will need to fail over to their secondaries - # - shard_count of the child shard secondary locations will get moved to emptier nodes - expect_reconciles = shard_count * 2 + split_shard_count + shard_count * 2 + # - split_shard_count/2 of the child shards will need to fail over to their secondaries (since we have 8 shards and 4 pageservers, only 4 will move) + expect_reconciles = shard_count * 2 + split_shard_count + split_shard_count / 2 + reconcile_ok = env.storage_controller.get_metric_value( "storage_controller_reconcile_complete_total", filter={"status": "ok"} ) @@ -720,22 +721,10 @@ def check_effective_tenant_config(): # dominated by shard count. log.info(f"total: {total}") assert total == { - 1: 1, - 2: 1, - 3: 1, - 4: 1, - 5: 1, - 6: 1, - 7: 1, - 8: 1, - 9: 1, - 10: 1, - 11: 1, - 12: 1, - 13: 1, - 14: 1, - 15: 1, - 16: 1, + 1: 2, + 2: 2, + 3: 2, + 4: 2, } # The controller is not required to lay out the attached locations in any particular way, but From 67f5f83edcf50e14fb269cd8919bbda3601fdaf0 Mon Sep 17 00:00:00 2001 From: John Spray Date: Wed, 20 Nov 2024 15:56:14 +0000 Subject: [PATCH 02/12] pageserver: avoid reading SLRU blocks for GC on shards >0 (#9423) ## Problem SLRU blocks, which can add up to several gigabytes, are currently ingested by all shards, multiplying their capacity cost by the shard count and slowing down ingest. We do this because all shards need the SLRU pages to do timestamp->LSN lookup for GC. Related: https://github.com/neondatabase/neon/issues/7512 ## Summary of changes - On non-zero shards, learn the GC offset from shard 0's index instead of calculating it. - Add a test `test_sharding_gc` that exercises this - Do GC in test_pg_regress as a general smoke test that GC functions run (e.g. this would fail if we were using SLRUs we didn't have) In this PR we are still ingesting SLRUs everywhere, but not using them any more. Part 2 PR (https://github.com/neondatabase/neon/pull/9786) makes the change to not store them at all. ## Checklist before requesting a review - [ ] I have performed a self-review of my code. - [ ] If it is a core feature, I have added thorough tests. - [ ] Do we need to implement analytics? if so did you add the relevant metrics to the dashboard? - [ ] If this PR requires public announcement, mark it with /release-notes label and add several sentences in this section. ## Checklist before merging - [ ] Do not forget to reformat commit message to not include the above checklist --- .../src/tenant/remote_timeline_client.rs | 23 ++++ pageserver/src/tenant/timeline.rs | 116 +++++++++++++----- test_runner/fixtures/remote_storage.py | 16 ++- test_runner/regress/test_pg_regress.py | 4 +- test_runner/regress/test_sharding.py | 110 ++++++++++++++++- 5 files changed, 228 insertions(+), 41 deletions(-) diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 94f42c782782..b910a405471b 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -197,6 +197,7 @@ use utils::backoff::{ self, exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, }; use utils::pausable_failpoint; +use utils::shard::ShardNumber; use std::collections::{HashMap, VecDeque}; use std::sync::atomic::{AtomicU32, Ordering}; @@ -2231,6 +2232,28 @@ impl RemoteTimelineClient { UploadQueue::Initialized(x) => x.no_pending_work(), } } + + /// 'foreign' in the sense that it does not belong to this tenant shard. This method + /// is used during GC for other shards to get the index of shard zero. + pub(crate) async fn download_foreign_index( + &self, + shard_number: ShardNumber, + cancel: &CancellationToken, + ) -> Result<(IndexPart, Generation, std::time::SystemTime), DownloadError> { + let foreign_shard_id = TenantShardId { + shard_number, + shard_count: self.tenant_shard_id.shard_count, + tenant_id: self.tenant_shard_id.tenant_id, + }; + download_index_part( + &self.storage_impl, + &foreign_shard_id, + &self.timeline_id, + Generation::MAX, + cancel, + ) + .await + } } pub(crate) struct UploadQueueAccessor<'a> { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 0eb3de21e932..a4289a222f09 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -38,6 +38,7 @@ use pageserver_api::{ shard::{ShardIdentity, ShardNumber, TenantShardId}, }; use rand::Rng; +use remote_storage::DownloadError; use serde_with::serde_as; use storage_broker::BrokerClientChannel; use tokio::{ @@ -4821,6 +4822,86 @@ impl Timeline { Ok(()) } + async fn find_gc_time_cutoff( + &self, + pitr: Duration, + cancel: &CancellationToken, + ctx: &RequestContext, + ) -> Result, PageReconstructError> { + debug_assert_current_span_has_tenant_and_timeline_id(); + if self.shard_identity.is_shard_zero() { + // Shard Zero has SLRU data and can calculate the PITR time -> LSN mapping itself + let now = SystemTime::now(); + let time_range = if pitr == Duration::ZERO { + humantime::parse_duration(DEFAULT_PITR_INTERVAL).expect("constant is invalid") + } else { + pitr + }; + + // If PITR is so large or `now` is so small that this underflows, we will retain no history (highly unexpected case) + let time_cutoff = now.checked_sub(time_range).unwrap_or(now); + let timestamp = to_pg_timestamp(time_cutoff); + + let time_cutoff = match self.find_lsn_for_timestamp(timestamp, cancel, ctx).await? { + LsnForTimestamp::Present(lsn) => Some(lsn), + LsnForTimestamp::Future(lsn) => { + // The timestamp is in the future. That sounds impossible, + // but what it really means is that there hasn't been + // any commits since the cutoff timestamp. + // + // In this case we should use the LSN of the most recent commit, + // which is implicitly the last LSN in the log. + debug!("future({})", lsn); + Some(self.get_last_record_lsn()) + } + LsnForTimestamp::Past(lsn) => { + debug!("past({})", lsn); + None + } + LsnForTimestamp::NoData(lsn) => { + debug!("nodata({})", lsn); + None + } + }; + Ok(time_cutoff) + } else { + // Shards other than shard zero cannot do timestamp->lsn lookups, and must instead learn their GC cutoff + // from shard zero's index. The index doesn't explicitly tell us the time cutoff, but we may assume that + // the point up to which shard zero's last_gc_cutoff has advanced will either be the time cutoff, or a + // space cutoff that we would also have respected ourselves. + match self + .remote_client + .download_foreign_index(ShardNumber(0), cancel) + .await + { + Ok((index_part, index_generation, _index_mtime)) => { + tracing::info!("GC loaded shard zero metadata (gen {index_generation:?}): latest_gc_cutoff_lsn: {}", + index_part.metadata.latest_gc_cutoff_lsn()); + Ok(Some(index_part.metadata.latest_gc_cutoff_lsn())) + } + Err(DownloadError::NotFound) => { + // This is unexpected, because during timeline creations shard zero persists to remote + // storage before other shards are called, and during timeline deletion non-zeroth shards are + // deleted before the zeroth one. However, it should be harmless: if we somehow end up in this + // state, then shard zero should _eventually_ write an index when it GCs. + tracing::warn!("GC couldn't find shard zero's index for timeline"); + Ok(None) + } + Err(e) => { + // TODO: this function should return a different error type than page reconstruct error + Err(PageReconstructError::Other(anyhow::anyhow!(e))) + } + } + + // TODO: after reading shard zero's GC cutoff, we should validate its generation with the storage + // controller. Otherwise, it is possible that we see the GC cutoff go backwards while shard zero + // is going through a migration if we read the old location's index and it has GC'd ahead of the + // new location. This is legal in principle, but problematic in practice because it might result + // in a timeline creation succeeding on shard zero ('s new location) but then failing on other shards + // because they have GC'd past the branch point. + } + } + /// Find the Lsns above which layer files need to be retained on /// garbage collection. /// @@ -4863,40 +4944,7 @@ impl Timeline { // - if PITR interval is set, then this is our cutoff. // - if PITR interval is not set, then we do a lookup // based on DEFAULT_PITR_INTERVAL, so that size-based retention does not result in keeping history around permanently on idle databases. - let time_cutoff = { - let now = SystemTime::now(); - let time_range = if pitr == Duration::ZERO { - humantime::parse_duration(DEFAULT_PITR_INTERVAL).expect("constant is invalid") - } else { - pitr - }; - - // If PITR is so large or `now` is so small that this underflows, we will retain no history (highly unexpected case) - let time_cutoff = now.checked_sub(time_range).unwrap_or(now); - let timestamp = to_pg_timestamp(time_cutoff); - - match self.find_lsn_for_timestamp(timestamp, cancel, ctx).await? { - LsnForTimestamp::Present(lsn) => Some(lsn), - LsnForTimestamp::Future(lsn) => { - // The timestamp is in the future. That sounds impossible, - // but what it really means is that there hasn't been - // any commits since the cutoff timestamp. - // - // In this case we should use the LSN of the most recent commit, - // which is implicitly the last LSN in the log. - debug!("future({})", lsn); - Some(self.get_last_record_lsn()) - } - LsnForTimestamp::Past(lsn) => { - debug!("past({})", lsn); - None - } - LsnForTimestamp::NoData(lsn) => { - debug!("nodata({})", lsn); - None - } - } - }; + let time_cutoff = self.find_gc_time_cutoff(pitr, cancel, ctx).await?; Ok(match (pitr, time_cutoff) { (Duration::ZERO, Some(time_cutoff)) => { diff --git a/test_runner/fixtures/remote_storage.py b/test_runner/fixtures/remote_storage.py index 702495366169..c630ea98b447 100644 --- a/test_runner/fixtures/remote_storage.py +++ b/test_runner/fixtures/remote_storage.py @@ -77,14 +77,16 @@ def kill(self): class LocalFsStorage: root: Path - def tenant_path(self, tenant_id: TenantId) -> Path: + def tenant_path(self, tenant_id: Union[TenantId, TenantShardId]) -> Path: return self.root / "tenants" / str(tenant_id) - def timeline_path(self, tenant_id: TenantId, timeline_id: TimelineId) -> Path: + def timeline_path( + self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId + ) -> Path: return self.tenant_path(tenant_id) / "timelines" / str(timeline_id) def timeline_latest_generation( - self, tenant_id: TenantId, timeline_id: TimelineId + self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId ) -> Optional[int]: timeline_files = os.listdir(self.timeline_path(tenant_id, timeline_id)) index_parts = [f for f in timeline_files if f.startswith("index_part")] @@ -102,7 +104,9 @@ def parse_gen(filename: str) -> Optional[int]: raise RuntimeError(f"No index_part found for {tenant_id}/{timeline_id}") return generations[-1] - def index_path(self, tenant_id: TenantId, timeline_id: TimelineId) -> Path: + def index_path( + self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId + ) -> Path: latest_gen = self.timeline_latest_generation(tenant_id, timeline_id) if latest_gen is None: filename = TIMELINE_INDEX_PART_FILE_NAME @@ -126,7 +130,9 @@ def remote_layer_path( filename = f"{local_name}-{generation:08x}" return self.timeline_path(tenant_id, timeline_id) / filename - def index_content(self, tenant_id: TenantId, timeline_id: TimelineId) -> Any: + def index_content( + self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId + ) -> Any: with self.index_path(tenant_id, timeline_id).open("r") as f: return json.load(f) diff --git a/test_runner/regress/test_pg_regress.py b/test_runner/regress/test_pg_regress.py index f4698191eb9c..6a5e388c534b 100644 --- a/test_runner/regress/test_pg_regress.py +++ b/test_runner/regress/test_pg_regress.py @@ -110,13 +110,15 @@ def post_checks(env: NeonEnv, test_output_dir: Path, db_name: str, endpoint: End check_restored_datadir_content(test_output_dir, env, endpoint, ignored_files=ignored_files) - # Ensure that compaction works, on a timeline containing all the diversity that postgres regression tests create. + # Ensure that compaction/GC works, on a timeline containing all the diversity that postgres regression tests create. # There should have been compactions mid-test as well, this final check is in addition those. for shard, pageserver in tenant_get_shards(env, env.initial_tenant): pageserver.http_client().timeline_checkpoint( shard, env.initial_timeline, force_repartition=True, force_image_layer_creation=True ) + pageserver.http_client().timeline_gc(shard, env.initial_timeline, None) + # Run the main PostgreSQL regression tests, in src/test/regress. # diff --git a/test_runner/regress/test_sharding.py b/test_runner/regress/test_sharding.py index 84737fc81e2b..3194fe6ec4c0 100644 --- a/test_runner/regress/test_sharding.py +++ b/test_runner/regress/test_sharding.py @@ -19,7 +19,7 @@ wait_for_last_flush_lsn, ) from fixtures.pageserver.utils import assert_prefix_empty, assert_prefix_not_empty -from fixtures.remote_storage import s3_storage +from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind, s3_storage from fixtures.utils import skip_in_debug_build, wait_until from fixtures.workload import Workload from pytest_httpserver import HTTPServer @@ -1674,3 +1674,111 @@ def test_top_tenants(neon_env_builder: NeonEnvBuilder): ) assert len(top["shards"]) == n_tenants - 4 assert set(i["id"] for i in top["shards"]) == set(str(i[0]) for i in tenants[4:]) + + +def test_sharding_gc( + neon_env_builder: NeonEnvBuilder, +): + """ + Exercise GC in a sharded tenant: because only shard 0 holds SLRU content, it acts as + the "leader" for GC, and other shards read its index to learn what LSN they should + GC up to. + """ + + shard_count = 4 + neon_env_builder.num_pageservers = shard_count + neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS) + + TENANT_CONF = { + # small checkpointing and compaction targets to ensure we generate many upload operations + "checkpoint_distance": 128 * 1024, + "compaction_threshold": 1, + "compaction_target_size": 128 * 1024, + # A short PITR horizon, so that we won't have to sleep too long in the test to wait for it to + # happen. + "pitr_interval": "1s", + # disable background compaction and GC. We invoke it manually when we want it to happen. + "gc_period": "0s", + "compaction_period": "0s", + # Disable automatic creation of image layers, as we will create them explicitly when we want them + "image_creation_threshold": 9999, + "image_layer_creation_check_threshold": 0, + "lsn_lease_length": "0s", + } + env = neon_env_builder.init_start( + initial_tenant_shard_count=shard_count, initial_tenant_conf=TENANT_CONF + ) + + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline + + # Create a branch and write some data + workload = Workload(env, tenant_id, timeline_id) + initial_lsn = Lsn(workload.endpoint().safe_psql("SELECT pg_current_wal_lsn()")[0][0]) + log.info(f"Started at LSN: {initial_lsn}") + + workload.init() + + # Write enough data to generate multiple layers + for _i in range(10): + last_lsn = workload.write_rows(32) + + assert last_lsn > initial_lsn + + log.info(f"Wrote up to last LSN: {last_lsn}") + + # Do full image layer generation. When we subsequently wait for PITR, all historic deltas + # should be GC-able + for shard_number in range(shard_count): + shard = TenantShardId(tenant_id, shard_number, shard_count) + env.get_tenant_pageserver(shard).http_client().timeline_compact( + shard, timeline_id, force_image_layer_creation=True + ) + + workload.churn_rows(32) + + time.sleep(5) + + # Invoke GC on a non-zero shard and verify its GC cutoff LSN does not advance + shard_one = TenantShardId(tenant_id, 1, shard_count) + env.get_tenant_pageserver(shard_one).http_client().timeline_gc( + shard_one, timeline_id, gc_horizon=None + ) + + # Check shard 1's index - GC cutoff LSN should not have advanced + assert isinstance(env.pageserver_remote_storage, LocalFsStorage) + shard_1_index = env.pageserver_remote_storage.index_content( + tenant_id=shard_one, timeline_id=timeline_id + ) + shard_1_gc_cutoff_lsn = Lsn(shard_1_index["metadata_bytes"]["latest_gc_cutoff_lsn"]) + log.info(f"Shard 1 cutoff LSN: {shard_1_gc_cutoff_lsn}") + assert shard_1_gc_cutoff_lsn <= last_lsn + + shard_zero = TenantShardId(tenant_id, 0, shard_count) + env.get_tenant_pageserver(shard_zero).http_client().timeline_gc( + shard_zero, timeline_id, gc_horizon=None + ) + + # TODO: observe that GC LSN of shard 0 has moved forward in remote storage + assert isinstance(env.pageserver_remote_storage, LocalFsStorage) + shard_0_index = env.pageserver_remote_storage.index_content( + tenant_id=shard_zero, timeline_id=timeline_id + ) + shard_0_gc_cutoff_lsn = Lsn(shard_0_index["metadata_bytes"]["latest_gc_cutoff_lsn"]) + log.info(f"Shard 0 cutoff LSN: {shard_0_gc_cutoff_lsn}") + assert shard_0_gc_cutoff_lsn >= last_lsn + + # Invoke GC on all other shards and verify their GC cutoff LSNs + for shard_number in range(1, shard_count): + shard = TenantShardId(tenant_id, shard_number, shard_count) + env.get_tenant_pageserver(shard).http_client().timeline_gc( + shard, timeline_id, gc_horizon=None + ) + + # Verify GC cutoff LSN advanced to match shard 0 + shard_index = env.pageserver_remote_storage.index_content( + tenant_id=shard, timeline_id=timeline_id + ) + shard_gc_cutoff_lsn = Lsn(shard_index["metadata_bytes"]["latest_gc_cutoff_lsn"]) + log.info(f"Shard {shard_number} cutoff LSN: {shard_gc_cutoff_lsn}") + assert shard_gc_cutoff_lsn == shard_0_gc_cutoff_lsn From 5ff2f1ee7d2b4b2b6dfb017dfecd5ae3f59cb404 Mon Sep 17 00:00:00 2001 From: John Spray Date: Wed, 20 Nov 2024 17:31:55 +0000 Subject: [PATCH 03/12] pageserver: enable compaction to proceed while live-migrating (#5397) ## Problem Long ago, in #5299 the tenant states for migration are added, but respected only in a coarse-grained way: when hinted not to do deletions, tenants will just avoid doing all GC or compaction. Skipping compaction is not necessary for AttachedMulti, as we will soon become the primary attached location, and it is not a waste of resources to proceed with compaction. Instead, per the RFC https://github.com/neondatabase/neon/pull/5029/files), deletions should be queued up in this state, and executed later when we switch to AttachedSingle. Avoiding compaction in AttachedMulti can have an operational impact if a tenant is under significant write load, as a long-running migration can result in a large accumulation of delta layers with commensurate impact on read latency. Closes: https://github.com/neondatabase/neon/issues/5396 ## Summary of changes - Add a 'config' part to RemoteTimelineClient so that it can be aware of the mode of the tenant it belongs to, and wire this through for construction + updates - Add a special buffer for delayed deletions, and when in AttachedMulti route deletions here instead of into the main remote client queue. This is drained when transitioning to AttachedSingle. If the tenant is detached or our process dies before then, then these objects are leaked. - As a quality of life improvement, also use the remote timeline client's knowledge of the tenant state to avoid submitting remote consistent LSN updates for validation when in AttachedStale (as we know these will fail) ## Checklist before requesting a review - [ ] I have performed a self-review of my code. - [ ] If it is a core feature, I have added thorough tests. - [ ] Do we need to implement analytics? if so did you add the relevant metrics to the dashboard? - [ ] If this PR requires public announcement, mark it with /release-notes label and add several sentences in this section. ## Checklist before merging - [ ] Do not forget to reformat commit message to not include the above checklist --- pageserver/src/tenant.rs | 25 +++- .../src/tenant/remote_timeline_client.rs | 122 ++++++++++++++++-- pageserver/src/tenant/timeline.rs | 9 +- pageserver/src/tenant/timeline/delete.rs | 2 +- pageserver/src/tenant/upload_queue.rs | 7 +- .../regress/test_pageserver_secondary.py | 24 ++++ 6 files changed, 167 insertions(+), 22 deletions(-) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 8e9e3890ba5c..2e5f69e3c942 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -189,6 +189,7 @@ pub struct TenantSharedResources { /// A [`Tenant`] is really an _attached_ tenant. The configuration /// for an attached tenant is a subset of the [`LocationConf`], represented /// in this struct. +#[derive(Clone)] pub(super) struct AttachedTenantConf { tenant_conf: TenantConfOpt, location: AttachedLocationConfig, @@ -1807,6 +1808,7 @@ impl Tenant { self.tenant_shard_id, timeline_id, self.generation, + &self.tenant_conf.load().location, ) } @@ -2527,6 +2529,10 @@ impl Tenant { { let conf = self.tenant_conf.load(); + // If we may not delete layers, then simply skip GC. Even though a tenant + // in AttachedMulti state could do GC and just enqueue the blocked deletions, + // the only advantage to doing it is to perhaps shrink the LayerMap metadata + // a bit sooner than we would achieve by waiting for AttachedSingle status. if !conf.location.may_delete_layers_hint() { info!("Skipping GC in location state {:?}", conf.location); return Ok(GcResult::default()); @@ -2568,7 +2574,14 @@ impl Tenant { { let conf = self.tenant_conf.load(); - if !conf.location.may_delete_layers_hint() || !conf.location.may_upload_layers_hint() { + + // Note that compaction usually requires deletions, but we don't respect + // may_delete_layers_hint here: that is because tenants in AttachedMulti + // should proceed with compaction even if they can't do deletion, to avoid + // accumulating dangerously deep stacks of L0 layers. Deletions will be + // enqueued inside RemoteTimelineClient, and executed layer if/when we transition + // to AttachedSingle state. + if !conf.location.may_upload_layers_hint() { info!("Skipping compaction in location state {:?}", conf.location); return Ok(false); } @@ -3446,6 +3459,7 @@ impl Tenant { // this race is not possible if both request types come from the storage // controller (as they should!) because an exclusive op lock is required // on the storage controller side. + self.tenant_conf.rcu(|inner| { Arc::new(AttachedTenantConf { tenant_conf: new_tenant_conf.clone(), @@ -3455,20 +3469,22 @@ impl Tenant { }) }); + let updated = self.tenant_conf.load().clone(); + self.tenant_conf_updated(&new_tenant_conf); // Don't hold self.timelines.lock() during the notifies. // There's no risk of deadlock right now, but there could be if we consolidate // mutexes in struct Timeline in the future. let timelines = self.list_timelines(); for timeline in timelines { - timeline.tenant_conf_updated(&new_tenant_conf); + timeline.tenant_conf_updated(&updated); } } pub(crate) fn set_new_location_config(&self, new_conf: AttachedTenantConf) { let new_tenant_conf = new_conf.tenant_conf.clone(); - self.tenant_conf.store(Arc::new(new_conf)); + self.tenant_conf.store(Arc::new(new_conf.clone())); self.tenant_conf_updated(&new_tenant_conf); // Don't hold self.timelines.lock() during the notifies. @@ -3476,7 +3492,7 @@ impl Tenant { // mutexes in struct Timeline in the future. let timelines = self.list_timelines(); for timeline in timelines { - timeline.tenant_conf_updated(&new_tenant_conf); + timeline.tenant_conf_updated(&new_conf); } } @@ -4544,6 +4560,7 @@ impl Tenant { self.tenant_shard_id, timeline_id, self.generation, + &self.tenant_conf.load().location, ) } diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index b910a405471b..377bc23542b1 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -241,6 +241,7 @@ use utils::id::{TenantId, TimelineId}; use self::index::IndexPart; +use super::config::AttachedLocationConfig; use super::metadata::MetadataUpdate; use super::storage_layer::{Layer, LayerName, ResidentLayer}; use super::upload_queue::{NotInitialized, SetDeletedFlagProgress}; @@ -302,6 +303,36 @@ pub enum WaitCompletionError { #[derive(Debug, thiserror::Error)] #[error("Upload queue either in unexpected state or hasn't downloaded manifest yet")] pub struct UploadQueueNotReadyError; +/// Behavioral modes that enable seamless live migration. +/// +/// See docs/rfcs/028-pageserver-migration.md to understand how these fit in. +struct RemoteTimelineClientConfig { + /// If this is false, then update to remote_consistent_lsn are dropped rather + /// than being submitted to DeletionQueue for validation. This behavior is + /// used when a tenant attachment is known to have a stale generation number, + /// such that validation attempts will always fail. This is not necessary + /// for correctness, but avoids spamming error statistics with failed validations + /// when doing migrations of tenants. + process_remote_consistent_lsn_updates: bool, + + /// If this is true, then object deletions are held in a buffer in RemoteTimelineClient + /// rather than being submitted to the DeletionQueue. This behavior is used when a tenant + /// is known to be multi-attached, in order to avoid disrupting other attached tenants + /// whose generations' metadata refers to the deleted objects. + block_deletions: bool, +} + +/// RemoteTimelineClientConfig's state is entirely driven by LocationConf, but we do +/// not carry the entire LocationConf structure: it's much more than we need. The From +/// impl extracts the subset of the LocationConf that is interesting to RemoteTimelineClient. +impl From<&AttachedLocationConfig> for RemoteTimelineClientConfig { + fn from(lc: &AttachedLocationConfig) -> Self { + Self { + block_deletions: !lc.may_delete_layers_hint(), + process_remote_consistent_lsn_updates: lc.may_upload_layers_hint(), + } + } +} /// A client for accessing a timeline's data in remote storage. /// @@ -322,7 +353,7 @@ pub struct UploadQueueNotReadyError; /// in the index part file, whenever timeline metadata is uploaded. /// /// Downloads are not queued, they are performed immediately. -pub struct RemoteTimelineClient { +pub(crate) struct RemoteTimelineClient { conf: &'static PageServerConf, runtime: tokio::runtime::Handle, @@ -339,6 +370,9 @@ pub struct RemoteTimelineClient { deletion_queue_client: DeletionQueueClient, + /// Subset of tenant configuration used to control upload behaviors during migrations + config: std::sync::RwLock, + cancel: CancellationToken, } @@ -349,13 +383,14 @@ impl RemoteTimelineClient { /// Note: the caller must initialize the upload queue before any uploads can be scheduled, /// by calling init_upload_queue. /// - pub fn new( + pub(crate) fn new( remote_storage: GenericRemoteStorage, deletion_queue_client: DeletionQueueClient, conf: &'static PageServerConf, tenant_shard_id: TenantShardId, timeline_id: TimelineId, generation: Generation, + location_conf: &AttachedLocationConfig, ) -> RemoteTimelineClient { RemoteTimelineClient { conf, @@ -375,6 +410,7 @@ impl RemoteTimelineClient { &tenant_shard_id, &timeline_id, )), + config: std::sync::RwLock::new(RemoteTimelineClientConfig::from(location_conf)), cancel: CancellationToken::new(), } } @@ -430,6 +466,43 @@ impl RemoteTimelineClient { Ok(()) } + /// Notify this client of a change to its parent tenant's config, as this may cause us to + /// take action (unblocking deletions when transitioning from AttachedMulti to AttachedSingle) + pub(super) fn update_config(&self, location_conf: &AttachedLocationConfig) { + let new_conf = RemoteTimelineClientConfig::from(location_conf); + let unblocked = !new_conf.block_deletions; + + // Update config before draining deletions, so that we don't race with more being + // inserted. This can result in deletions happening our of order, but that does not + // violate any invariants: deletions only need to be ordered relative to upload of the index + // that dereferences the deleted objects, and we are not changing that order. + *self.config.write().unwrap() = new_conf; + + if unblocked { + // If we may now delete layers, drain any that were blocked in our old + // configuration state + let mut queue_locked = self.upload_queue.lock().unwrap(); + + if let Ok(queue) = queue_locked.initialized_mut() { + let blocked_deletions = std::mem::take(&mut queue.blocked_deletions); + for d in blocked_deletions { + if let Err(e) = self.deletion_queue_client.push_layers_sync( + self.tenant_shard_id, + self.timeline_id, + self.generation, + d.layers, + ) { + // This could happen if the pageserver is shut down while a tenant + // is transitioning from a deletion-blocked state: we will leak some + // S3 objects in this case. + warn!("Failed to drain blocked deletions: {}", e); + break; + } + } + } + } + } + /// Returns `None` if nothing is yet uplodaded, `Some(disk_consistent_lsn)` otherwise. pub fn remote_consistent_lsn_projected(&self) -> Option { match &mut *self.upload_queue.lock().unwrap() { @@ -1913,16 +1986,24 @@ impl RemoteTimelineClient { res } UploadOp::Delete(delete) => { - pausable_failpoint!("before-delete-layer-pausable"); - self.deletion_queue_client - .push_layers( - self.tenant_shard_id, - self.timeline_id, - self.generation, - delete.layers.clone(), - ) - .await - .map_err(|e| anyhow::anyhow!(e)) + if self.config.read().unwrap().block_deletions { + let mut queue_locked = self.upload_queue.lock().unwrap(); + if let Ok(queue) = queue_locked.initialized_mut() { + queue.blocked_deletions.push(delete.clone()); + } + Ok(()) + } else { + pausable_failpoint!("before-delete-layer-pausable"); + self.deletion_queue_client + .push_layers( + self.tenant_shard_id, + self.timeline_id, + self.generation, + delete.layers.clone(), + ) + .await + .map_err(|e| anyhow::anyhow!(e)) + } } unexpected @ UploadOp::Barrier(_) | unexpected @ UploadOp::Shutdown => { // unreachable. Barrier operations are handled synchronously in @@ -2029,8 +2110,16 @@ impl RemoteTimelineClient { // Legacy mode: skip validating generation upload_queue.visible_remote_consistent_lsn.store(lsn); None - } else { + } else if self + .config + .read() + .unwrap() + .process_remote_consistent_lsn_updates + { Some((lsn, upload_queue.visible_remote_consistent_lsn.clone())) + } else { + // Our config disables remote_consistent_lsn updates: drop it. + None } } UploadOp::Delete(_) => { @@ -2167,6 +2256,7 @@ impl RemoteTimelineClient { queued_operations: VecDeque::default(), #[cfg(feature = "testing")] dangling_files: HashMap::default(), + blocked_deletions: Vec::new(), shutting_down: false, shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)), }; @@ -2402,6 +2492,7 @@ mod tests { use crate::{ context::RequestContext, tenant::{ + config::AttachmentMode, harness::{TenantHarness, TIMELINE_ID}, storage_layer::layer::local_layer_path, Tenant, Timeline, @@ -2487,6 +2578,10 @@ mod tests { /// Construct a RemoteTimelineClient in an arbitrary generation fn build_client(&self, generation: Generation) -> Arc { + let location_conf = AttachedLocationConfig { + generation, + attach_mode: AttachmentMode::Single, + }; Arc::new(RemoteTimelineClient { conf: self.harness.conf, runtime: tokio::runtime::Handle::current(), @@ -2500,6 +2595,7 @@ mod tests { &self.harness.tenant_shard_id, &TIMELINE_ID, )), + config: std::sync::RwLock::new(RemoteTimelineClientConfig::from(&location_conf)), cancel: CancellationToken::new(), }) } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index a4289a222f09..95864af4d0af 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -273,7 +273,7 @@ pub struct Timeline { /// Remote storage client. /// See [`remote_timeline_client`](super::remote_timeline_client) module comment for details. - pub remote_client: Arc, + pub(crate) remote_client: Arc, // What page versions do we hold in the repository? If we get a // request > last_record_lsn, we need to wait until we receive all @@ -2172,14 +2172,14 @@ impl Timeline { ) } - pub(super) fn tenant_conf_updated(&self, new_conf: &TenantConfOpt) { + pub(super) fn tenant_conf_updated(&self, new_conf: &AttachedTenantConf) { // NB: Most tenant conf options are read by background loops, so, // changes will automatically be picked up. // The threshold is embedded in the metric. So, we need to update it. { let new_threshold = Self::get_evictions_low_residence_duration_metric_threshold( - new_conf, + &new_conf.tenant_conf, &self.conf.default_tenant_conf, ); @@ -2187,6 +2187,9 @@ impl Timeline { let shard_id_str = format!("{}", self.tenant_shard_id.shard_slug()); let timeline_id_str = self.timeline_id.to_string(); + + self.remote_client.update_config(&new_conf.location); + self.metrics .evictions_with_low_residence_duration .write() diff --git a/pageserver/src/tenant/timeline/delete.rs b/pageserver/src/tenant/timeline/delete.rs index 13a8dfa51a2e..67fc710c44ee 100644 --- a/pageserver/src/tenant/timeline/delete.rs +++ b/pageserver/src/tenant/timeline/delete.rs @@ -283,7 +283,7 @@ impl DeleteTimelineFlow { /// Shortcut to create Timeline in stopping state and spawn deletion task. #[instrument(skip_all, fields(%timeline_id))] - pub async fn resume_deletion( + pub(crate) async fn resume_deletion( tenant: Arc, timeline_id: TimelineId, local_metadata: &TimelineMetadata, diff --git a/pageserver/src/tenant/upload_queue.rs b/pageserver/src/tenant/upload_queue.rs index 592f41cb2153..f14bf2f8c381 100644 --- a/pageserver/src/tenant/upload_queue.rs +++ b/pageserver/src/tenant/upload_queue.rs @@ -88,6 +88,9 @@ pub(crate) struct UploadQueueInitialized { #[cfg(feature = "testing")] pub(crate) dangling_files: HashMap, + /// Deletions that are blocked by the tenant configuration + pub(crate) blocked_deletions: Vec, + /// Set to true when we have inserted the `UploadOp::Shutdown` into the `inprogress_tasks`. pub(crate) shutting_down: bool, @@ -180,6 +183,7 @@ impl UploadQueue { queued_operations: VecDeque::new(), #[cfg(feature = "testing")] dangling_files: HashMap::new(), + blocked_deletions: Vec::new(), shutting_down: false, shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)), }; @@ -220,6 +224,7 @@ impl UploadQueue { queued_operations: VecDeque::new(), #[cfg(feature = "testing")] dangling_files: HashMap::new(), + blocked_deletions: Vec::new(), shutting_down: false, shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)), }; @@ -270,7 +275,7 @@ pub(crate) struct UploadTask { /// A deletion of some layers within the lifetime of a timeline. This is not used /// for timeline deletion, which skips this queue and goes directly to DeletionQueue. -#[derive(Debug)] +#[derive(Debug, Clone)] pub(crate) struct Delete { pub(crate) layers: Vec<(LayerName, LayerFileMetadata)>, } diff --git a/test_runner/regress/test_pageserver_secondary.py b/test_runner/regress/test_pageserver_secondary.py index d4aef967353e..12134048e667 100644 --- a/test_runner/regress/test_pageserver_secondary.py +++ b/test_runner/regress/test_pageserver_secondary.py @@ -365,6 +365,19 @@ def caught_up(): workload.validate(pageserver_a.id) workload.validate(pageserver_b.id) + # Force compaction on destination pageserver + pageserver_b.http_client().timeline_compact(tenant_id, timeline_id, force_l0_compaction=True) + + # Destination pageserver is in AttachedMulti, it should have generated deletions but + # not enqueued them yet. + # Check deletion metrics via prometheus - should be 0 since we're in AttachedMulti + assert ( + pageserver_b.http_client().get_metric_value( + "pageserver_deletion_queue_submitted_total", + ) + == 0 + ) + # Revert the origin to secondary log.info("Setting origin to Secondary") pageserver_a.tenant_location_configure( @@ -389,6 +402,17 @@ def caught_up(): }, ) + # Transition to AttachedSingle should have drained deletions generated by doing a compaction + # while in AttachedMulti. + def blocked_deletions_drained(): + submitted = pageserver_b.http_client().get_metric_value( + "pageserver_deletion_queue_submitted_total" + ) + assert submitted is not None + assert submitted > 0 + + wait_until(10, 0.1, blocked_deletions_drained) + workload.churn_rows(64, pageserver_b.id) workload.validate(pageserver_b.id) del workload From f36f0068b83bd536d33c49b238d964dcd96c9479 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Wed, 20 Nov 2024 17:50:39 +0000 Subject: [PATCH 04/12] chore(proxy): demote more logs during successful connection attempts (#9828) Follow up to #9803 See https://github.com/neondatabase/cloud/issues/14378 In collaboration with @cloneable and @awarus, we sifted through logs and simply demoted some logs to debug. This is not at all finished and there are more logs to review, but we ran out of time in the session we organised. In any slightly more nuanced cases, we didn't touch the log, instead leaving a TODO comment. I've also slightly refactored the sql-over-http body read/length reject code. I can split that into a separate PR. It just felt natural after I switched to `read_body_with_limit` as we discussed during the meet. --- libs/pq_proto/src/lib.rs | 1 + proxy/src/bin/local_proxy.rs | 2 +- proxy/src/bin/proxy.rs | 2 +- proxy/src/config.rs | 2 +- proxy/src/control_plane/client/neon.rs | 1 + proxy/src/http/mod.rs | 10 +-- proxy/src/proxy/connect_compute.rs | 15 +++-- proxy/src/proxy/copy_bidirectional.rs | 2 + proxy/src/proxy/handshake.rs | 10 ++- proxy/src/proxy/mod.rs | 2 +- proxy/src/proxy/passthrough.rs | 4 +- proxy/src/proxy/wake_compute.rs | 7 +- proxy/src/rate_limiter/limit_algorithm.rs | 6 +- .../src/rate_limiter/limit_algorithm/aimd.rs | 24 ++++--- proxy/src/redis/cancellation_publisher.rs | 3 +- proxy/src/serverless/backend.rs | 8 +-- proxy/src/serverless/conn_pool.rs | 2 +- proxy/src/serverless/conn_pool_lib.rs | 4 +- proxy/src/serverless/http_conn_pool.rs | 2 +- proxy/src/serverless/local_conn_pool.rs | 4 +- proxy/src/serverless/sql_over_http.rs | 66 +++++++++++-------- 21 files changed, 104 insertions(+), 73 deletions(-) diff --git a/libs/pq_proto/src/lib.rs b/libs/pq_proto/src/lib.rs index 9ffaaba584c2..b9e5387d86dc 100644 --- a/libs/pq_proto/src/lib.rs +++ b/libs/pq_proto/src/lib.rs @@ -184,6 +184,7 @@ pub struct CancelKeyData { impl fmt::Display for CancelKeyData { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // TODO: this is producing strange results, with 0xffffffff........ always in the logs. let hi = (self.backend_pid as u64) << 32; let lo = self.cancel_key as u64; let id = hi | lo; diff --git a/proxy/src/bin/local_proxy.rs b/proxy/src/bin/local_proxy.rs index c4ec1300f27f..968682cf0f75 100644 --- a/proxy/src/bin/local_proxy.rs +++ b/proxy/src/bin/local_proxy.rs @@ -111,7 +111,7 @@ struct SqlOverHttpArgs { sql_over_http_cancel_set_shards: usize, #[clap(long, default_value_t = 10 * 1024 * 1024)] // 10 MiB - sql_over_http_max_request_size_bytes: u64, + sql_over_http_max_request_size_bytes: usize, #[clap(long, default_value_t = 10 * 1024 * 1024)] // 10 MiB sql_over_http_max_response_size_bytes: usize, diff --git a/proxy/src/bin/proxy.rs b/proxy/src/bin/proxy.rs index 232721338d73..45fbe4a39827 100644 --- a/proxy/src/bin/proxy.rs +++ b/proxy/src/bin/proxy.rs @@ -276,7 +276,7 @@ struct SqlOverHttpArgs { sql_over_http_cancel_set_shards: usize, #[clap(long, default_value_t = 10 * 1024 * 1024)] // 10 MiB - sql_over_http_max_request_size_bytes: u64, + sql_over_http_max_request_size_bytes: usize, #[clap(long, default_value_t = 10 * 1024 * 1024)] // 10 MiB sql_over_http_max_response_size_bytes: usize, diff --git a/proxy/src/config.rs b/proxy/src/config.rs index b048c9d38979..8bc8e3f96f59 100644 --- a/proxy/src/config.rs +++ b/proxy/src/config.rs @@ -64,7 +64,7 @@ pub struct HttpConfig { pub pool_options: GlobalConnPoolOptions, pub cancel_set: CancelSet, pub client_conn_threshold: u64, - pub max_request_size_bytes: u64, + pub max_request_size_bytes: usize, pub max_response_size_bytes: usize, } diff --git a/proxy/src/control_plane/client/neon.rs b/proxy/src/control_plane/client/neon.rs index 53f92349267f..757ea6720a64 100644 --- a/proxy/src/control_plane/client/neon.rs +++ b/proxy/src/control_plane/client/neon.rs @@ -380,6 +380,7 @@ impl super::ControlPlaneApi for NeonControlPlaneClient { // after getting back a permit - it's possible the cache was filled // double check if permit.should_check_cache() { + // TODO: if there is something in the cache, mark the permit as success. check_cache!(); } diff --git a/proxy/src/http/mod.rs b/proxy/src/http/mod.rs index b1642cedb301..ed88c7725687 100644 --- a/proxy/src/http/mod.rs +++ b/proxy/src/http/mod.rs @@ -122,18 +122,18 @@ impl Endpoint { } #[derive(Error, Debug)] -pub(crate) enum ReadBodyError { +pub(crate) enum ReadBodyError { #[error("Content length exceeds limit of {limit} bytes")] BodyTooLarge { limit: usize }, #[error(transparent)] - Read(#[from] reqwest::Error), + Read(#[from] E), } -pub(crate) async fn read_body_with_limit( - mut b: impl Body + Unpin, +pub(crate) async fn read_body_with_limit( + mut b: impl Body + Unpin, limit: usize, -) -> Result, ReadBodyError> { +) -> Result, ReadBodyError> { // We could use `b.limited().collect().await.to_bytes()` here // but this ends up being slightly more efficient as far as I can tell. diff --git a/proxy/src/proxy/connect_compute.rs b/proxy/src/proxy/connect_compute.rs index b30aec09c1ef..2e759b0894a2 100644 --- a/proxy/src/proxy/connect_compute.rs +++ b/proxy/src/proxy/connect_compute.rs @@ -117,7 +117,6 @@ where node_info.set_keys(user_info.get_keys()); node_info.allow_self_signed_compute = allow_self_signed_compute; mechanism.update_connect_config(&mut node_info.config); - let retry_type = RetryType::ConnectToCompute; // try once let err = match mechanism @@ -129,7 +128,7 @@ where Metrics::get().proxy.retries_metric.observe( RetriesMetricGroup { outcome: ConnectOutcome::Success, - retry_type, + retry_type: RetryType::ConnectToCompute, }, num_retries.into(), ); @@ -147,7 +146,7 @@ where Metrics::get().proxy.retries_metric.observe( RetriesMetricGroup { outcome: ConnectOutcome::Failed, - retry_type, + retry_type: RetryType::ConnectToCompute, }, num_retries.into(), ); @@ -156,8 +155,9 @@ where node_info } else { // if we failed to connect, it's likely that the compute node was suspended, wake a new compute node - info!("compute node's state has likely changed; requesting a wake-up"); + debug!("compute node's state has likely changed; requesting a wake-up"); let old_node_info = invalidate_cache(node_info); + // TODO: increment num_retries? let mut node_info = wake_compute(&mut num_retries, ctx, user_info, wake_compute_retry_config).await?; node_info.reuse_settings(old_node_info); @@ -169,7 +169,7 @@ where // now that we have a new node, try connect to it repeatedly. // this can error for a few reasons, for instance: // * DNS connection settings haven't quite propagated yet - info!("wake_compute success. attempting to connect"); + debug!("wake_compute success. attempting to connect"); num_retries = 1; loop { match mechanism @@ -181,10 +181,11 @@ where Metrics::get().proxy.retries_metric.observe( RetriesMetricGroup { outcome: ConnectOutcome::Success, - retry_type, + retry_type: RetryType::ConnectToCompute, }, num_retries.into(), ); + // TODO: is this necessary? We have a metric. info!(?num_retries, "connected to compute node after"); return Ok(res); } @@ -194,7 +195,7 @@ where Metrics::get().proxy.retries_metric.observe( RetriesMetricGroup { outcome: ConnectOutcome::Failed, - retry_type, + retry_type: RetryType::ConnectToCompute, }, num_retries.into(), ); diff --git a/proxy/src/proxy/copy_bidirectional.rs b/proxy/src/proxy/copy_bidirectional.rs index 91a3ceff75a7..4e4af8863484 100644 --- a/proxy/src/proxy/copy_bidirectional.rs +++ b/proxy/src/proxy/copy_bidirectional.rs @@ -87,6 +87,8 @@ where transfer_one_direction(cx, &mut compute_to_client, compute, client) .map_err(ErrorSource::from_compute)?; + // TODO: 1 info log, with a enum label for close direction. + // Early termination checks from compute to client. if let TransferState::Done(_) = compute_to_client { if let TransferState::Running(buf) = &client_to_compute { diff --git a/proxy/src/proxy/handshake.rs b/proxy/src/proxy/handshake.rs index 3ada3a99952f..e27c211932ea 100644 --- a/proxy/src/proxy/handshake.rs +++ b/proxy/src/proxy/handshake.rs @@ -5,7 +5,7 @@ use pq_proto::{ }; use thiserror::Error; use tokio::io::{AsyncRead, AsyncWrite}; -use tracing::{info, warn}; +use tracing::{debug, info, warn}; use crate::auth::endpoint_sni; use crate::config::{TlsConfig, PG_ALPN_PROTOCOL}; @@ -199,6 +199,8 @@ pub(crate) async fn handshake( .await?; } + // This log highlights the start of the connection. + // This contains useful information for debugging, not logged elsewhere, like role name and endpoint id. info!( ?version, ?params, @@ -211,7 +213,7 @@ pub(crate) async fn handshake( FeStartupPacket::StartupMessage { params, version } if version.major() == 3 && version > PG_PROTOCOL_LATEST => { - warn!(?version, "unsupported minor version"); + debug!(?version, "unsupported minor version"); // no protocol extensions are supported. // @@ -233,14 +235,16 @@ pub(crate) async fn handshake( info!( ?version, + ?params, session_type = "normal", "successful handshake; unsupported minor version requested" ); break Ok(HandshakeData::Startup(stream, params)); } - FeStartupPacket::StartupMessage { version, .. } => { + FeStartupPacket::StartupMessage { version, params } => { warn!( ?version, + ?params, session_type = "normal", "unsuccessful handshake; unsupported version" ); diff --git a/proxy/src/proxy/mod.rs b/proxy/src/proxy/mod.rs index 4be4006d156e..9415b54a4a44 100644 --- a/proxy/src/proxy/mod.rs +++ b/proxy/src/proxy/mod.rs @@ -254,7 +254,7 @@ pub(crate) async fn handle_client( endpoint_rate_limiter: Arc, conn_gauge: NumClientConnectionsGuard<'static>, ) -> Result>, ClientRequestError> { - info!( + debug!( protocol = %ctx.protocol(), "handling interactive connection from client" ); diff --git a/proxy/src/proxy/passthrough.rs b/proxy/src/proxy/passthrough.rs index e3b473098276..5e07c8eeaebd 100644 --- a/proxy/src/proxy/passthrough.rs +++ b/proxy/src/proxy/passthrough.rs @@ -1,5 +1,5 @@ use tokio::io::{AsyncRead, AsyncWrite}; -use tracing::info; +use tracing::debug; use utils::measured_stream::MeasuredStream; use super::copy_bidirectional::ErrorSource; @@ -45,7 +45,7 @@ pub(crate) async fn proxy_pass( ); // Starting from here we only proxy the client's traffic. - info!("performing the proxy pass..."); + debug!("performing the proxy pass..."); let _ = crate::proxy::copy_bidirectional::copy_bidirectional_client_compute( &mut client, &mut compute, diff --git a/proxy/src/proxy/wake_compute.rs b/proxy/src/proxy/wake_compute.rs index d09e0b1f41b0..8a672d48dc01 100644 --- a/proxy/src/proxy/wake_compute.rs +++ b/proxy/src/proxy/wake_compute.rs @@ -17,7 +17,6 @@ pub(crate) async fn wake_compute( api: &B, config: RetryConfig, ) -> Result { - let retry_type = RetryType::WakeCompute; loop { match api.wake_compute(ctx).await { Err(e) if !should_retry(&e, *num_retries, config) => { @@ -26,7 +25,7 @@ pub(crate) async fn wake_compute( Metrics::get().proxy.retries_metric.observe( RetriesMetricGroup { outcome: ConnectOutcome::Failed, - retry_type, + retry_type: RetryType::WakeCompute, }, (*num_retries).into(), ); @@ -40,10 +39,12 @@ pub(crate) async fn wake_compute( Metrics::get().proxy.retries_metric.observe( RetriesMetricGroup { outcome: ConnectOutcome::Success, - retry_type, + retry_type: RetryType::WakeCompute, }, (*num_retries).into(), ); + // TODO: is this necessary? We have a metric. + // TODO: this log line is misleading as "wake_compute" might return cached (and stale) info. info!(?num_retries, "compute node woken up after"); return Ok(n); } diff --git a/proxy/src/rate_limiter/limit_algorithm.rs b/proxy/src/rate_limiter/limit_algorithm.rs index 16c398f303f4..b74a9ab17e2f 100644 --- a/proxy/src/rate_limiter/limit_algorithm.rs +++ b/proxy/src/rate_limiter/limit_algorithm.rs @@ -195,7 +195,11 @@ impl DynamicLimiter { /// /// Set the outcome to `None` to ignore the job. fn release_inner(&self, start: Instant, outcome: Option) { - tracing::info!("outcome is {:?}", outcome); + if outcome.is_none() { + tracing::warn!("outcome is {:?}", outcome); + } else { + tracing::debug!("outcome is {:?}", outcome); + } if self.config.initial_limit == 0 { return; } diff --git a/proxy/src/rate_limiter/limit_algorithm/aimd.rs b/proxy/src/rate_limiter/limit_algorithm/aimd.rs index 5332a5184f99..3000cc4c2af2 100644 --- a/proxy/src/rate_limiter/limit_algorithm/aimd.rs +++ b/proxy/src/rate_limiter/limit_algorithm/aimd.rs @@ -31,26 +31,32 @@ impl LimitAlgorithm for Aimd { if utilisation > self.utilisation { let limit = old_limit + self.inc; - let increased_limit = limit.clamp(self.min, self.max); - if increased_limit > old_limit { - tracing::info!(increased_limit, "limit increased"); + let new_limit = limit.clamp(self.min, self.max); + if new_limit > old_limit { + tracing::info!(old_limit, new_limit, "limit increased"); + } else { + tracing::debug!(old_limit, new_limit, "limit clamped at max"); } - increased_limit + new_limit } else { old_limit } } Outcome::Overload => { - let limit = old_limit as f32 * self.dec; + let new_limit = old_limit as f32 * self.dec; // Floor instead of round, so the limit reduces even with small numbers. // E.g. round(2 * 0.9) = 2, but floor(2 * 0.9) = 1 - let limit = limit.floor() as usize; + let new_limit = new_limit.floor() as usize; - let limit = limit.clamp(self.min, self.max); - tracing::info!(limit, "limit decreased"); - limit + let new_limit = new_limit.clamp(self.min, self.max); + if new_limit < old_limit { + tracing::info!(old_limit, new_limit, "limit decreased"); + } else { + tracing::debug!(old_limit, new_limit, "limit clamped at min"); + } + new_limit } } } diff --git a/proxy/src/redis/cancellation_publisher.rs b/proxy/src/redis/cancellation_publisher.rs index 000024697159..7392b0d316fb 100644 --- a/proxy/src/redis/cancellation_publisher.rs +++ b/proxy/src/redis/cancellation_publisher.rs @@ -121,6 +121,7 @@ impl RedisPublisherClient { cancel_key_data: CancelKeyData, session_id: Uuid, ) -> anyhow::Result<()> { + // TODO: review redundant error duplication logs. if !self.limiter.check() { tracing::info!("Rate limit exceeded. Skipping cancellation message"); return Err(anyhow::anyhow!("Rate limit exceeded")); @@ -146,7 +147,7 @@ impl CancellationPublisherMut for RedisPublisherClient { tracing::info!("publishing cancellation key to Redis"); match self.try_publish_internal(cancel_key_data, session_id).await { Ok(()) => { - tracing::info!("cancellation key successfuly published to Redis"); + tracing::debug!("cancellation key successfuly published to Redis"); Ok(()) } Err(e) => { diff --git a/proxy/src/serverless/backend.rs b/proxy/src/serverless/backend.rs index d9dcf6fbb791..7df978f84cbc 100644 --- a/proxy/src/serverless/backend.rs +++ b/proxy/src/serverless/backend.rs @@ -167,10 +167,10 @@ impl PoolingBackend { force_new: bool, ) -> Result, HttpConnError> { let maybe_client = if force_new { - info!("pool: pool is disabled"); + debug!("pool: pool is disabled"); None } else { - info!("pool: looking for an existing connection"); + debug!("pool: looking for an existing connection"); self.pool.get(ctx, &conn_info)? }; @@ -204,14 +204,14 @@ impl PoolingBackend { ctx: &RequestContext, conn_info: ConnInfo, ) -> Result, HttpConnError> { - info!("pool: looking for an existing connection"); + debug!("pool: looking for an existing connection"); if let Ok(Some(client)) = self.http_conn_pool.get(ctx, &conn_info) { return Ok(client); } let conn_id = uuid::Uuid::new_v4(); tracing::Span::current().record("conn_id", display(conn_id)); - info!(%conn_id, "pool: opening a new connection '{conn_info}'"); + debug!(%conn_id, "pool: opening a new connection '{conn_info}'"); let backend = self.auth_backend.as_ref().map(|()| ComputeCredentials { info: ComputeUserInfo { user: conn_info.user_info.user.clone(), diff --git a/proxy/src/serverless/conn_pool.rs b/proxy/src/serverless/conn_pool.rs index 07ba1ae9af4d..f716326a6855 100644 --- a/proxy/src/serverless/conn_pool.rs +++ b/proxy/src/serverless/conn_pool.rs @@ -243,7 +243,7 @@ mod tests { }, cancel_set: CancelSet::new(0), client_conn_threshold: u64::MAX, - max_request_size_bytes: u64::MAX, + max_request_size_bytes: usize::MAX, max_response_size_bytes: usize::MAX, })); let pool = GlobalConnPool::new(config); diff --git a/proxy/src/serverless/conn_pool_lib.rs b/proxy/src/serverless/conn_pool_lib.rs index fe3c422c3b9e..c5db025870a1 100644 --- a/proxy/src/serverless/conn_pool_lib.rs +++ b/proxy/src/serverless/conn_pool_lib.rs @@ -232,7 +232,7 @@ impl EndpointConnPool { // do logging outside of the mutex if returned { - info!(%conn_id, "{pool_name}: returning connection '{conn_info}' back to the pool, total_conns={total_conns}, for this (db, user)={per_db_size}"); + debug!(%conn_id, "{pool_name}: returning connection '{conn_info}' back to the pool, total_conns={total_conns}, for this (db, user)={per_db_size}"); } else { info!(%conn_id, "{pool_name}: throwing away connection '{conn_info}' because pool is full, total_conns={total_conns}"); } @@ -409,7 +409,7 @@ impl GlobalConnPool { "pid", tracing::field::display(client.inner.get_process_id()), ); - info!( + debug!( cold_start_info = ColdStartInfo::HttpPoolHit.as_str(), "pool: reusing connection '{conn_info}'" ); diff --git a/proxy/src/serverless/http_conn_pool.rs b/proxy/src/serverless/http_conn_pool.rs index bc86c4b1cdc1..e9455420c0f9 100644 --- a/proxy/src/serverless/http_conn_pool.rs +++ b/proxy/src/serverless/http_conn_pool.rs @@ -227,7 +227,7 @@ impl GlobalConnPool { }; tracing::Span::current().record("conn_id", tracing::field::display(client.conn_id)); - info!( + debug!( cold_start_info = ColdStartInfo::HttpPoolHit.as_str(), "pool: reusing connection '{conn_info}'" ); diff --git a/proxy/src/serverless/local_conn_pool.rs b/proxy/src/serverless/local_conn_pool.rs index cadcbd7530e9..310af08221ac 100644 --- a/proxy/src/serverless/local_conn_pool.rs +++ b/proxy/src/serverless/local_conn_pool.rs @@ -29,7 +29,7 @@ use tokio_postgres::tls::NoTlsStream; use tokio_postgres::types::ToSql; use tokio_postgres::{AsyncMessage, Socket}; use tokio_util::sync::CancellationToken; -use tracing::{error, info, info_span, warn, Instrument}; +use tracing::{debug, error, info, info_span, warn, Instrument}; use super::backend::HttpConnError; use super::conn_pool_lib::{ @@ -110,7 +110,7 @@ impl LocalConnPool { "pid", tracing::field::display(client.inner.get_process_id()), ); - info!( + debug!( cold_start_info = ColdStartInfo::HttpPoolHit.as_str(), "local_pool: reusing connection '{conn_info}'" ); diff --git a/proxy/src/serverless/sql_over_http.rs b/proxy/src/serverless/sql_over_http.rs index 36d859590221..ab75086884a7 100644 --- a/proxy/src/serverless/sql_over_http.rs +++ b/proxy/src/serverless/sql_over_http.rs @@ -8,7 +8,7 @@ use http::header::AUTHORIZATION; use http::Method; use http_body_util::combinators::BoxBody; use http_body_util::{BodyExt, Full}; -use hyper::body::{Body, Incoming}; +use hyper::body::Incoming; use hyper::http::{HeaderName, HeaderValue}; use hyper::{header, HeaderMap, Request, Response, StatusCode}; use pq_proto::StartupMessageParamsBuilder; @@ -18,7 +18,7 @@ use tokio::time; use tokio_postgres::error::{DbError, ErrorPosition, SqlState}; use tokio_postgres::{GenericClient, IsolationLevel, NoTls, ReadyForQueryStatus, Transaction}; use tokio_util::sync::CancellationToken; -use tracing::{error, info}; +use tracing::{debug, error, info}; use typed_json::json; use url::Url; use urlencoding; @@ -36,6 +36,7 @@ use crate::auth::{endpoint_sni, ComputeUserInfoParseError}; use crate::config::{AuthenticationConfig, HttpConfig, ProxyConfig, TlsConfig}; use crate::context::RequestContext; use crate::error::{ErrorKind, ReportableError, UserFacingError}; +use crate::http::{read_body_with_limit, ReadBodyError}; use crate::metrics::{HttpDirection, Metrics}; use crate::proxy::{run_until_cancelled, NeonOptions}; use crate::serverless::backend::HttpConnError; @@ -357,8 +358,6 @@ pub(crate) enum SqlOverHttpError { ConnectCompute(#[from] HttpConnError), #[error("{0}")] ConnInfo(#[from] ConnInfoError), - #[error("request is too large (max is {0} bytes)")] - RequestTooLarge(u64), #[error("response is too large (max is {0} bytes)")] ResponseTooLarge(usize), #[error("invalid isolation level")] @@ -377,7 +376,6 @@ impl ReportableError for SqlOverHttpError { SqlOverHttpError::ReadPayload(e) => e.get_error_kind(), SqlOverHttpError::ConnectCompute(e) => e.get_error_kind(), SqlOverHttpError::ConnInfo(e) => e.get_error_kind(), - SqlOverHttpError::RequestTooLarge(_) => ErrorKind::User, SqlOverHttpError::ResponseTooLarge(_) => ErrorKind::User, SqlOverHttpError::InvalidIsolationLevel => ErrorKind::User, SqlOverHttpError::Postgres(p) => p.get_error_kind(), @@ -393,7 +391,6 @@ impl UserFacingError for SqlOverHttpError { SqlOverHttpError::ReadPayload(p) => p.to_string(), SqlOverHttpError::ConnectCompute(c) => c.to_string_client(), SqlOverHttpError::ConnInfo(c) => c.to_string_client(), - SqlOverHttpError::RequestTooLarge(_) => self.to_string(), SqlOverHttpError::ResponseTooLarge(_) => self.to_string(), SqlOverHttpError::InvalidIsolationLevel => self.to_string(), SqlOverHttpError::Postgres(p) => p.to_string(), @@ -406,13 +403,12 @@ impl UserFacingError for SqlOverHttpError { impl HttpCodeError for SqlOverHttpError { fn get_http_status_code(&self) -> StatusCode { match self { - SqlOverHttpError::ReadPayload(_) => StatusCode::BAD_REQUEST, + SqlOverHttpError::ReadPayload(e) => e.get_http_status_code(), SqlOverHttpError::ConnectCompute(h) => match h.get_error_kind() { ErrorKind::User => StatusCode::BAD_REQUEST, _ => StatusCode::INTERNAL_SERVER_ERROR, }, SqlOverHttpError::ConnInfo(_) => StatusCode::BAD_REQUEST, - SqlOverHttpError::RequestTooLarge(_) => StatusCode::PAYLOAD_TOO_LARGE, SqlOverHttpError::ResponseTooLarge(_) => StatusCode::INSUFFICIENT_STORAGE, SqlOverHttpError::InvalidIsolationLevel => StatusCode::BAD_REQUEST, SqlOverHttpError::Postgres(_) => StatusCode::BAD_REQUEST, @@ -426,19 +422,41 @@ impl HttpCodeError for SqlOverHttpError { pub(crate) enum ReadPayloadError { #[error("could not read the HTTP request body: {0}")] Read(#[from] hyper::Error), + #[error("request is too large (max is {limit} bytes)")] + BodyTooLarge { limit: usize }, #[error("could not parse the HTTP request body: {0}")] Parse(#[from] serde_json::Error), } +impl From> for ReadPayloadError { + fn from(value: ReadBodyError) -> Self { + match value { + ReadBodyError::BodyTooLarge { limit } => Self::BodyTooLarge { limit }, + ReadBodyError::Read(e) => Self::Read(e), + } + } +} + impl ReportableError for ReadPayloadError { fn get_error_kind(&self) -> ErrorKind { match self { ReadPayloadError::Read(_) => ErrorKind::ClientDisconnect, + ReadPayloadError::BodyTooLarge { .. } => ErrorKind::User, ReadPayloadError::Parse(_) => ErrorKind::User, } } } +impl HttpCodeError for ReadPayloadError { + fn get_http_status_code(&self) -> StatusCode { + match self { + ReadPayloadError::Read(_) => StatusCode::BAD_REQUEST, + ReadPayloadError::BodyTooLarge { .. } => StatusCode::PAYLOAD_TOO_LARGE, + ReadPayloadError::Parse(_) => StatusCode::BAD_REQUEST, + } + } +} + #[derive(Debug, thiserror::Error)] pub(crate) enum SqlOverHttpCancel { #[error("query was cancelled")] @@ -580,28 +598,20 @@ async fn handle_db_inner( let parsed_headers = HttpHeaders::try_parse(headers)?; - let request_content_length = match request.body().size_hint().upper() { - Some(v) => v, - None => config.http_config.max_request_size_bytes + 1, - }; - info!(request_content_length, "request size in bytes"); - Metrics::get() - .proxy - .http_conn_content_length_bytes - .observe(HttpDirection::Request, request_content_length as f64); - - // we don't have a streaming request support yet so this is to prevent OOM - // from a malicious user sending an extremely large request body - if request_content_length > config.http_config.max_request_size_bytes { - return Err(SqlOverHttpError::RequestTooLarge( - config.http_config.max_request_size_bytes, - )); - } - let fetch_and_process_request = Box::pin( async { - let body = request.into_body().collect().await?.to_bytes(); - info!(length = body.len(), "request payload read"); + let body = read_body_with_limit( + request.into_body(), + config.http_config.max_request_size_bytes, + ) + .await?; + + Metrics::get() + .proxy + .http_conn_content_length_bytes + .observe(HttpDirection::Request, body.len() as f64); + + debug!(length = body.len(), "request payload read"); let payload: Payload = serde_json::from_slice(&body)?; Ok::(payload) // Adjust error type accordingly } From ee26f09e45e72eab940e7721ba9f0a674e84b827 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Wed, 20 Nov 2024 18:33:05 +0000 Subject: [PATCH 05/12] pageserver: remove shard split hard link assertion (#9829) ## Problem We were hitting this assertion in debug mode tests sometimes. This case was being hit when the parent shard has no resident layers. For instance, this is the case on split retry where the previous attempt shut-down the parent and deleted local state for it. If the logical size calculation does not download some layers before we get to the hardlinking, then the assertion is hit. ## Summary of Changes Remove the assertion. It's fine for the ancestor to not have any resident layers at the time of the split. Closes https://github.com/neondatabase/neon/issues/9412 --- pageserver/src/tenant/mgr.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 4fc9d740c838..92b2200542df 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -1719,10 +1719,11 @@ impl TenantManager { parent_layers.push(relative_path.to_owned()); } } - debug_assert!( - !parent_layers.is_empty(), - "shutdown cannot empty the layermap" - ); + + if parent_layers.is_empty() { + tracing::info!("Ancestor shard has no resident layer to hard link"); + } + (parent_timelines, parent_layers) }; From 811fab136fc82aa8b5c85f93dc00d19851c07387 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Wed, 20 Nov 2024 20:31:02 +0100 Subject: [PATCH 06/12] scrubber: allow restricting find_garbage to a partial tenant id prefix (#9814) Adds support to the `find_garbage` command to restrict itself to a partial tenant ID prefix, say `a`, and then it only traverses tenants with IDs starting with `a`. One can now pass the `--tenant-id-prefix` parameter. That way, one can shard the `find_garbage` command and make it run in parallel. The PR also does a change of how `remote_storage` first removes trailing `/`s, only to then add them in the listing function. It turns out that this isn't neccessary and it prevents the prefix functionality from working. S3 doesn't do this either. --- libs/remote_storage/src/azure_blob.rs | 24 ++++++++---------------- storage_scrubber/src/garbage.rs | 15 ++++++++++++--- storage_scrubber/src/main.rs | 13 ++++++++++++- storage_scrubber/src/metadata_stream.rs | 13 ++++++++++++- 4 files changed, 44 insertions(+), 21 deletions(-) diff --git a/libs/remote_storage/src/azure_blob.rs b/libs/remote_storage/src/azure_blob.rs index f98d16789c4b..1c0d43d479a1 100644 --- a/libs/remote_storage/src/azure_blob.rs +++ b/libs/remote_storage/src/azure_blob.rs @@ -97,10 +97,7 @@ impl AzureBlobStorage { pub fn relative_path_to_name(&self, path: &RemotePath) -> String { assert_eq!(std::path::MAIN_SEPARATOR, REMOTE_STORAGE_PREFIX_SEPARATOR); - let path_string = path - .get_path() - .as_str() - .trim_end_matches(REMOTE_STORAGE_PREFIX_SEPARATOR); + let path_string = path.get_path().as_str(); match &self.prefix_in_container { Some(prefix) => { if prefix.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) { @@ -277,19 +274,14 @@ impl RemoteStorage for AzureBlobStorage { cancel: &CancellationToken, ) -> impl Stream> { // get the passed prefix or if it is not set use prefix_in_bucket value - let list_prefix = prefix - .map(|p| self.relative_path_to_name(p)) - .or_else(|| self.prefix_in_container.clone()) - .map(|mut p| { - // required to end with a separator - // otherwise request will return only the entry of a prefix - if matches!(mode, ListingMode::WithDelimiter) - && !p.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) - { - p.push(REMOTE_STORAGE_PREFIX_SEPARATOR); + let list_prefix = prefix.map(|p| self.relative_path_to_name(p)).or_else(|| { + self.prefix_in_container.clone().map(|mut s| { + if !s.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) { + s.push(REMOTE_STORAGE_PREFIX_SEPARATOR); } - p - }); + s + }) + }); async_stream::stream! { let _permit = self.permit(RequestKind::List, cancel).await?; diff --git a/storage_scrubber/src/garbage.rs b/storage_scrubber/src/garbage.rs index 91668a42a705..b026efbc3b48 100644 --- a/storage_scrubber/src/garbage.rs +++ b/storage_scrubber/src/garbage.rs @@ -21,7 +21,7 @@ use utils::{backoff, id::TenantId}; use crate::{ cloud_admin_api::{CloudAdminApiClient, MaybeDeleted, ProjectData}, init_remote, list_objects_with_retries, - metadata_stream::{stream_tenant_timelines, stream_tenants}, + metadata_stream::{stream_tenant_timelines, stream_tenants_maybe_prefix}, BucketConfig, ConsoleConfig, NodeKind, TenantShardTimelineId, TraversingDepth, MAX_RETRIES, }; @@ -118,9 +118,17 @@ pub async fn find_garbage( console_config: ConsoleConfig, depth: TraversingDepth, node_kind: NodeKind, + tenant_id_prefix: Option, output_path: String, ) -> anyhow::Result<()> { - let garbage = find_garbage_inner(bucket_config, console_config, depth, node_kind).await?; + let garbage = find_garbage_inner( + bucket_config, + console_config, + depth, + node_kind, + tenant_id_prefix, + ) + .await?; let serialized = serde_json::to_vec_pretty(&garbage)?; tokio::fs::write(&output_path, &serialized).await?; @@ -152,6 +160,7 @@ async fn find_garbage_inner( console_config: ConsoleConfig, depth: TraversingDepth, node_kind: NodeKind, + tenant_id_prefix: Option, ) -> anyhow::Result { // Construct clients for S3 and for Console API let (remote_client, target) = init_remote(bucket_config.clone(), node_kind).await?; @@ -178,7 +187,7 @@ async fn find_garbage_inner( // Enumerate Tenants in S3, and check if each one exists in Console tracing::info!("Finding all tenants in {}...", bucket_config.desc_str()); - let tenants = stream_tenants(&remote_client, &target); + let tenants = stream_tenants_maybe_prefix(&remote_client, &target, tenant_id_prefix); let tenants_checked = tenants.map_ok(|t| { let api_client = cloud_admin_api_client.clone(); let console_cache = console_cache.clone(); diff --git a/storage_scrubber/src/main.rs b/storage_scrubber/src/main.rs index 0ffb57098444..92979d609e49 100644 --- a/storage_scrubber/src/main.rs +++ b/storage_scrubber/src/main.rs @@ -54,6 +54,8 @@ enum Command { node_kind: NodeKind, #[arg(short, long, default_value_t=TraversingDepth::Tenant)] depth: TraversingDepth, + #[arg(short, long, default_value=None)] + tenant_id_prefix: Option, #[arg(short, long, default_value_t = String::from("garbage.json"))] output_path: String, }, @@ -209,10 +211,19 @@ async fn main() -> anyhow::Result<()> { Command::FindGarbage { node_kind, depth, + tenant_id_prefix, output_path, } => { let console_config = ConsoleConfig::from_env()?; - find_garbage(bucket_config, console_config, depth, node_kind, output_path).await + find_garbage( + bucket_config, + console_config, + depth, + node_kind, + tenant_id_prefix, + output_path, + ) + .await } Command::PurgeGarbage { input_path, diff --git a/storage_scrubber/src/metadata_stream.rs b/storage_scrubber/src/metadata_stream.rs index efda7c213da7..47447d681c9d 100644 --- a/storage_scrubber/src/metadata_stream.rs +++ b/storage_scrubber/src/metadata_stream.rs @@ -17,9 +17,20 @@ use utils::id::{TenantId, TimelineId}; pub fn stream_tenants<'a>( remote_client: &'a GenericRemoteStorage, target: &'a RootTarget, +) -> impl Stream> + 'a { + stream_tenants_maybe_prefix(remote_client, target, None) +} +/// Given a remote storage and a target, output a stream of TenantIds discovered via listing prefixes +pub fn stream_tenants_maybe_prefix<'a>( + remote_client: &'a GenericRemoteStorage, + target: &'a RootTarget, + tenant_id_prefix: Option, ) -> impl Stream> + 'a { try_stream! { - let tenants_target = target.tenants_root(); + let mut tenants_target = target.tenants_root(); + if let Some(tenant_id_prefix) = tenant_id_prefix { + tenants_target.prefix_in_bucket += &tenant_id_prefix; + } let mut tenants_stream = std::pin::pin!(stream_objects_with_retries(remote_client, ListingMode::WithDelimiter, &tenants_target)); while let Some(chunk) = tenants_stream.next().await { From 313ebfdb88b7ef5d2f75d4d4c3ccacd7250fe861 Mon Sep 17 00:00:00 2001 From: Vadim Kharitonov Date: Wed, 20 Nov 2024 20:36:23 +0100 Subject: [PATCH 07/12] [proxy] chore: allow bypassing empty `params` to `/sql` endpoint (#9827) ## Problem ``` curl -H "Neon-Connection-String: postgresql://neondb_owner:PASSWORD@ep-autumn-rain-a58lubg0.us-east-2.aws.neon.tech/neondb?sslmode=require" https://ep-autumn-rain-a58lubg0.us-east-2.aws.neon.tech/sql -d '{"query":"SELECT 1","params":[]}' ``` For such a query, I also need to send `params`. Do I really need it? ## Summary of changes I've marked `params` as optional --- proxy/src/serverless/sql_over_http.rs | 61 +++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/proxy/src/serverless/sql_over_http.rs b/proxy/src/serverless/sql_over_http.rs index ab75086884a7..1b17495c5da3 100644 --- a/proxy/src/serverless/sql_over_http.rs +++ b/proxy/src/serverless/sql_over_http.rs @@ -48,6 +48,7 @@ use crate::usage_metrics::{MetricCounter, MetricCounterRecorder}; struct QueryData { query: String, #[serde(deserialize_with = "bytes_to_pg_text")] + #[serde(default)] params: Vec>, #[serde(default)] array_mode: Option, @@ -1105,3 +1106,63 @@ impl Discard<'_> { } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_payload() { + let payload = "{\"query\":\"SELECT * FROM users WHERE name = ?\",\"params\":[\"test\"],\"arrayMode\":true}"; + let deserialized_payload: Payload = serde_json::from_str(payload).unwrap(); + + match deserialized_payload { + Payload::Single(QueryData { + query, + params, + array_mode, + }) => { + assert_eq!(query, "SELECT * FROM users WHERE name = ?"); + assert_eq!(params, vec![Some(String::from("test"))]); + assert!(array_mode.unwrap()); + } + Payload::Batch(_) => { + panic!("deserialization failed: case with single query, one param, and array mode") + } + } + + let payload = "{\"queries\":[{\"query\":\"SELECT * FROM users0 WHERE name = ?\",\"params\":[\"test0\"], \"arrayMode\":false},{\"query\":\"SELECT * FROM users1 WHERE name = ?\",\"params\":[\"test1\"],\"arrayMode\":true}]}"; + let deserialized_payload: Payload = serde_json::from_str(payload).unwrap(); + + match deserialized_payload { + Payload::Batch(BatchQueryData { queries }) => { + assert_eq!(queries.len(), 2); + for (i, query) in queries.into_iter().enumerate() { + assert_eq!( + query.query, + format!("SELECT * FROM users{i} WHERE name = ?") + ); + assert_eq!(query.params, vec![Some(format!("test{i}"))]); + assert_eq!(query.array_mode.unwrap(), i > 0); + } + } + Payload::Single(_) => panic!("deserialization failed: case with multiple queries"), + } + + let payload = "{\"query\":\"SELECT 1\"}"; + let deserialized_payload: Payload = serde_json::from_str(payload).unwrap(); + + match deserialized_payload { + Payload::Single(QueryData { + query, + params, + array_mode, + }) => { + assert_eq!(query, "SELECT 1"); + assert_eq!(params, vec![]); + assert!(array_mode.is_none()); + } + Payload::Batch(_) => panic!("deserialization failed: case with only one query"), + } + } +} From 2d6bf176a0698258d17def3011aedcc836a5427f Mon Sep 17 00:00:00 2001 From: Ivan Efremov Date: Wed, 20 Nov 2024 21:36:29 +0200 Subject: [PATCH 08/12] proxy: Refactor http conn pool (#9785) - Use the same ConnPoolEntry for http connection pool. - Rename EndpointConnPool to the HttpConnPool. - Narrow clone bound for client Fixes #9284 --- proxy/src/serverless/backend.rs | 13 +- proxy/src/serverless/conn_pool.rs | 6 +- proxy/src/serverless/conn_pool_lib.rs | 171 +++++++++++--------- proxy/src/serverless/http_conn_pool.rs | 201 +++++++----------------- proxy/src/serverless/local_conn_pool.rs | 1 + proxy/src/serverless/mod.rs | 2 +- proxy/src/serverless/sql_over_http.rs | 1 + 7 files changed, 172 insertions(+), 223 deletions(-) diff --git a/proxy/src/serverless/backend.rs b/proxy/src/serverless/backend.rs index 7df978f84cbc..3037e208889a 100644 --- a/proxy/src/serverless/backend.rs +++ b/proxy/src/serverless/backend.rs @@ -12,8 +12,8 @@ use tracing::field::display; use tracing::{debug, info}; use super::conn_pool::poll_client; -use super::conn_pool_lib::{Client, ConnInfo, GlobalConnPool}; -use super::http_conn_pool::{self, poll_http2_client, Send}; +use super::conn_pool_lib::{Client, ConnInfo, EndpointConnPool, GlobalConnPool}; +use super::http_conn_pool::{self, poll_http2_client, HttpConnPool, Send}; use super::local_conn_pool::{self, LocalConnPool, EXT_NAME, EXT_SCHEMA, EXT_VERSION}; use crate::auth::backend::local::StaticAuthRules; use crate::auth::backend::{ComputeCredentials, ComputeUserInfo}; @@ -36,9 +36,10 @@ use crate::rate_limiter::EndpointRateLimiter; use crate::types::{EndpointId, Host, LOCAL_PROXY_SUFFIX}; pub(crate) struct PoolingBackend { - pub(crate) http_conn_pool: Arc>, + pub(crate) http_conn_pool: Arc>>, pub(crate) local_pool: Arc>, - pub(crate) pool: Arc>, + pub(crate) pool: + Arc>>, pub(crate) config: &'static ProxyConfig, pub(crate) auth_backend: &'static crate::auth::Backend<'static, ()>, @@ -474,7 +475,7 @@ impl ShouldRetryWakeCompute for LocalProxyConnError { } struct TokioMechanism { - pool: Arc>, + pool: Arc>>, conn_info: ConnInfo, conn_id: uuid::Uuid, @@ -524,7 +525,7 @@ impl ConnectMechanism for TokioMechanism { } struct HyperMechanism { - pool: Arc>, + pool: Arc>>, conn_info: ConnInfo, conn_id: uuid::Uuid, diff --git a/proxy/src/serverless/conn_pool.rs b/proxy/src/serverless/conn_pool.rs index f716326a6855..bd262f45edd3 100644 --- a/proxy/src/serverless/conn_pool.rs +++ b/proxy/src/serverless/conn_pool.rs @@ -19,7 +19,8 @@ use { }; use super::conn_pool_lib::{ - Client, ClientDataEnum, ClientInnerCommon, ClientInnerExt, ConnInfo, GlobalConnPool, + Client, ClientDataEnum, ClientInnerCommon, ClientInnerExt, ConnInfo, EndpointConnPool, + GlobalConnPool, }; use crate::context::RequestContext; use crate::control_plane::messages::MetricsAuxInfo; @@ -52,7 +53,7 @@ impl fmt::Display for ConnInfo { } pub(crate) fn poll_client( - global_pool: Arc>, + global_pool: Arc>>, ctx: &RequestContext, conn_info: ConnInfo, client: C, @@ -167,6 +168,7 @@ pub(crate) fn poll_client( Client::new(inner, conn_info, pool_clone) } +#[derive(Clone)] pub(crate) struct ClientDataRemote { session: tokio::sync::watch::Sender, cancel: CancellationToken, diff --git a/proxy/src/serverless/conn_pool_lib.rs b/proxy/src/serverless/conn_pool_lib.rs index c5db025870a1..fe1d2563bca1 100644 --- a/proxy/src/serverless/conn_pool_lib.rs +++ b/proxy/src/serverless/conn_pool_lib.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::marker::PhantomData; use std::ops::Deref; use std::sync::atomic::{self, AtomicUsize}; use std::sync::{Arc, Weak}; @@ -43,13 +44,14 @@ impl ConnInfo { } } +#[derive(Clone)] pub(crate) enum ClientDataEnum { Remote(ClientDataRemote), Local(ClientDataLocal), - #[allow(dead_code)] Http(ClientDataHttp), } +#[derive(Clone)] pub(crate) struct ClientInnerCommon { pub(crate) inner: C, pub(crate) aux: MetricsAuxInfo, @@ -91,6 +93,7 @@ pub(crate) struct ConnPoolEntry { pub(crate) struct EndpointConnPool { pools: HashMap<(DbName, RoleName), DbUserConnPool>, total_conns: usize, + /// max # connections per endpoint max_conns: usize, _guard: HttpEndpointPoolsGuard<'static>, global_connections_count: Arc, @@ -317,24 +320,49 @@ impl DbUserConn for DbUserConnPool { } } -pub(crate) struct GlobalConnPool { +pub(crate) trait EndpointConnPoolExt { + fn clear_closed(&mut self) -> usize; + fn total_conns(&self) -> usize; +} + +impl EndpointConnPoolExt for EndpointConnPool { + fn clear_closed(&mut self) -> usize { + let mut clients_removed: usize = 0; + for db_pool in self.pools.values_mut() { + clients_removed += db_pool.clear_closed_clients(&mut self.total_conns); + } + clients_removed + } + + fn total_conns(&self) -> usize { + self.total_conns + } +} + +pub(crate) struct GlobalConnPool +where + C: ClientInnerExt, + P: EndpointConnPoolExt, +{ // endpoint -> per-endpoint connection pool // // That should be a fairly conteded map, so return reference to the per-endpoint // pool as early as possible and release the lock. - global_pool: DashMap>>>, + pub(crate) global_pool: DashMap>>, /// Number of endpoint-connection pools /// /// [`DashMap::len`] iterates over all inner pools and acquires a read lock on each. /// That seems like far too much effort, so we're using a relaxed increment counter instead. /// It's only used for diagnostics. - global_pool_size: AtomicUsize, + pub(crate) global_pool_size: AtomicUsize, /// Total number of connections in the pool - global_connections_count: Arc, + pub(crate) global_connections_count: Arc, + + pub(crate) config: &'static crate::config::HttpConfig, - config: &'static crate::config::HttpConfig, + _marker: PhantomData, } #[derive(Debug, Clone, Copy)] @@ -357,7 +385,11 @@ pub struct GlobalConnPoolOptions { pub max_total_conns: usize, } -impl GlobalConnPool { +impl GlobalConnPool +where + C: ClientInnerExt, + P: EndpointConnPoolExt, +{ pub(crate) fn new(config: &'static crate::config::HttpConfig) -> Arc { let shards = config.pool_options.pool_shards; Arc::new(Self { @@ -365,6 +397,7 @@ impl GlobalConnPool { global_pool_size: AtomicUsize::new(0), config, global_connections_count: Arc::new(AtomicUsize::new(0)), + _marker: PhantomData, }) } @@ -378,60 +411,6 @@ impl GlobalConnPool { self.config.pool_options.idle_timeout } - pub(crate) fn get( - self: &Arc, - ctx: &RequestContext, - conn_info: &ConnInfo, - ) -> Result>, HttpConnError> { - let mut client: Option> = None; - let Some(endpoint) = conn_info.endpoint_cache_key() else { - return Ok(None); - }; - - let endpoint_pool = self.get_or_create_endpoint_pool(&endpoint); - if let Some(entry) = endpoint_pool - .write() - .get_conn_entry(conn_info.db_and_user()) - { - client = Some(entry.conn); - } - let endpoint_pool = Arc::downgrade(&endpoint_pool); - - // ok return cached connection if found and establish a new one otherwise - if let Some(mut client) = client { - if client.inner.is_closed() { - info!("pool: cached connection '{conn_info}' is closed, opening a new one"); - return Ok(None); - } - tracing::Span::current() - .record("conn_id", tracing::field::display(client.get_conn_id())); - tracing::Span::current().record( - "pid", - tracing::field::display(client.inner.get_process_id()), - ); - debug!( - cold_start_info = ColdStartInfo::HttpPoolHit.as_str(), - "pool: reusing connection '{conn_info}'" - ); - - match client.get_data() { - ClientDataEnum::Local(data) => { - data.session().send(ctx.session_id())?; - } - - ClientDataEnum::Remote(data) => { - data.session().send(ctx.session_id())?; - } - ClientDataEnum::Http(_) => (), - } - - ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit); - ctx.success(); - return Ok(Some(Client::new(client, conn_info.clone(), endpoint_pool))); - } - Ok(None) - } - pub(crate) fn shutdown(&self) { // drops all strong references to endpoint-pools self.global_pool.clear(); @@ -464,17 +443,10 @@ impl GlobalConnPool { // if the current endpoint pool is unique (no other strong or weak references) // then it is currently not in use by any connections. if let Some(pool) = Arc::get_mut(x.get_mut()) { - let EndpointConnPool { - pools, total_conns, .. - } = pool.get_mut(); - - // ensure that closed clients are removed - for db_pool in pools.values_mut() { - clients_removed += db_pool.clear_closed_clients(total_conns); - } + let endpoints = pool.get_mut(); + clients_removed = endpoints.clear_closed(); - // we only remove this pool if it has no active connections - if *total_conns == 0 { + if endpoints.total_conns() == 0 { info!("pool: discarding pool for endpoint {endpoint}"); return false; } @@ -510,6 +482,62 @@ impl GlobalConnPool { info!("pool: performed global pool gc. size now {global_pool_size}"); } } +} + +impl GlobalConnPool> { + pub(crate) fn get( + self: &Arc, + ctx: &RequestContext, + conn_info: &ConnInfo, + ) -> Result>, HttpConnError> { + let mut client: Option> = None; + let Some(endpoint) = conn_info.endpoint_cache_key() else { + return Ok(None); + }; + + let endpoint_pool = self.get_or_create_endpoint_pool(&endpoint); + if let Some(entry) = endpoint_pool + .write() + .get_conn_entry(conn_info.db_and_user()) + { + client = Some(entry.conn); + } + let endpoint_pool = Arc::downgrade(&endpoint_pool); + + // ok return cached connection if found and establish a new one otherwise + if let Some(mut client) = client { + if client.inner.is_closed() { + info!("pool: cached connection '{conn_info}' is closed, opening a new one"); + return Ok(None); + } + tracing::Span::current() + .record("conn_id", tracing::field::display(client.get_conn_id())); + tracing::Span::current().record( + "pid", + tracing::field::display(client.inner.get_process_id()), + ); + debug!( + cold_start_info = ColdStartInfo::HttpPoolHit.as_str(), + "pool: reusing connection '{conn_info}'" + ); + + match client.get_data() { + ClientDataEnum::Local(data) => { + data.session().send(ctx.session_id())?; + } + + ClientDataEnum::Remote(data) => { + data.session().send(ctx.session_id())?; + } + ClientDataEnum::Http(_) => (), + } + + ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit); + ctx.success(); + return Ok(Some(Client::new(client, conn_info.clone(), endpoint_pool))); + } + Ok(None) + } pub(crate) fn get_or_create_endpoint_pool( self: &Arc, @@ -556,7 +584,6 @@ impl GlobalConnPool { pool } } - pub(crate) struct Client { span: Span, inner: Option>, diff --git a/proxy/src/serverless/http_conn_pool.rs b/proxy/src/serverless/http_conn_pool.rs index e9455420c0f9..fde38d0de390 100644 --- a/proxy/src/serverless/http_conn_pool.rs +++ b/proxy/src/serverless/http_conn_pool.rs @@ -2,16 +2,17 @@ use std::collections::VecDeque; use std::sync::atomic::{self, AtomicUsize}; use std::sync::{Arc, Weak}; -use dashmap::DashMap; use hyper::client::conn::http2; use hyper_util::rt::{TokioExecutor, TokioIo}; use parking_lot::RwLock; -use rand::Rng; use tokio::net::TcpStream; use tracing::{debug, error, info, info_span, Instrument}; use super::backend::HttpConnError; -use super::conn_pool_lib::{ClientInnerExt, ConnInfo}; +use super::conn_pool_lib::{ + ClientDataEnum, ClientInnerCommon, ClientInnerExt, ConnInfo, ConnPoolEntry, + EndpointConnPoolExt, GlobalConnPool, +}; use crate::context::RequestContext; use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo}; use crate::metrics::{HttpEndpointPoolsGuard, Metrics}; @@ -23,17 +24,11 @@ pub(crate) type Connect = http2::Connection, hyper::body::Incoming, TokioExecutor>; #[derive(Clone)] -pub(crate) struct ConnPoolEntry { - conn: C, - conn_id: uuid::Uuid, - aux: MetricsAuxInfo, -} - pub(crate) struct ClientDataHttp(); // Per-endpoint connection pool // Number of open connections is limited by the `max_conns_per_endpoint`. -pub(crate) struct EndpointConnPool { +pub(crate) struct HttpConnPool { // TODO(conrad): // either we should open more connections depending on stream count // (not exposed by hyper, need our own counter) @@ -48,14 +43,19 @@ pub(crate) struct EndpointConnPool { global_connections_count: Arc, } -impl EndpointConnPool { +impl HttpConnPool { fn get_conn_entry(&mut self) -> Option> { let Self { conns, .. } = self; loop { let conn = conns.pop_front()?; - if !conn.conn.is_closed() { - conns.push_back(conn.clone()); + if !conn.conn.inner.is_closed() { + let new_conn = ConnPoolEntry { + conn: conn.conn.clone(), + _last_access: std::time::Instant::now(), + }; + + conns.push_back(new_conn); return Some(conn); } } @@ -69,7 +69,7 @@ impl EndpointConnPool { } = self; let old_len = conns.len(); - conns.retain(|conn| conn.conn_id != conn_id); + conns.retain(|entry| entry.conn.conn_id != conn_id); let new_len = conns.len(); let removed = old_len - new_len; if removed > 0 { @@ -84,7 +84,22 @@ impl EndpointConnPool { } } -impl Drop for EndpointConnPool { +impl EndpointConnPoolExt for HttpConnPool { + fn clear_closed(&mut self) -> usize { + let Self { conns, .. } = self; + let old_len = conns.len(); + conns.retain(|entry| !entry.conn.inner.is_closed()); + + let new_len = conns.len(); + old_len - new_len + } + + fn total_conns(&self) -> usize { + self.conns.len() + } +} + +impl Drop for HttpConnPool { fn drop(&mut self) { if !self.conns.is_empty() { self.global_connections_count @@ -98,117 +113,7 @@ impl Drop for EndpointConnPool { } } -pub(crate) struct GlobalConnPool { - // endpoint -> per-endpoint connection pool - // - // That should be a fairly conteded map, so return reference to the per-endpoint - // pool as early as possible and release the lock. - global_pool: DashMap>>>, - - /// Number of endpoint-connection pools - /// - /// [`DashMap::len`] iterates over all inner pools and acquires a read lock on each. - /// That seems like far too much effort, so we're using a relaxed increment counter instead. - /// It's only used for diagnostics. - global_pool_size: AtomicUsize, - - /// Total number of connections in the pool - global_connections_count: Arc, - - config: &'static crate::config::HttpConfig, -} - -impl GlobalConnPool { - pub(crate) fn new(config: &'static crate::config::HttpConfig) -> Arc { - let shards = config.pool_options.pool_shards; - Arc::new(Self { - global_pool: DashMap::with_shard_amount(shards), - global_pool_size: AtomicUsize::new(0), - config, - global_connections_count: Arc::new(AtomicUsize::new(0)), - }) - } - - pub(crate) fn shutdown(&self) { - // drops all strong references to endpoint-pools - self.global_pool.clear(); - } - - pub(crate) async fn gc_worker(&self, mut rng: impl Rng) { - let epoch = self.config.pool_options.gc_epoch; - let mut interval = tokio::time::interval(epoch / (self.global_pool.shards().len()) as u32); - loop { - interval.tick().await; - - let shard = rng.gen_range(0..self.global_pool.shards().len()); - self.gc(shard); - } - } - - fn gc(&self, shard: usize) { - debug!(shard, "pool: performing epoch reclamation"); - - // acquire a random shard lock - let mut shard = self.global_pool.shards()[shard].write(); - - let timer = Metrics::get() - .proxy - .http_pool_reclaimation_lag_seconds - .start_timer(); - let current_len = shard.len(); - let mut clients_removed = 0; - shard.retain(|endpoint, x| { - // if the current endpoint pool is unique (no other strong or weak references) - // then it is currently not in use by any connections. - if let Some(pool) = Arc::get_mut(x.get_mut()) { - let EndpointConnPool { conns, .. } = pool.get_mut(); - - let old_len = conns.len(); - - conns.retain(|conn| !conn.conn.is_closed()); - - let new_len = conns.len(); - let removed = old_len - new_len; - clients_removed += removed; - - // we only remove this pool if it has no active connections - if conns.is_empty() { - info!("pool: discarding pool for endpoint {endpoint}"); - return false; - } - } - - true - }); - - let new_len = shard.len(); - drop(shard); - timer.observe(); - - // Do logging outside of the lock. - if clients_removed > 0 { - let size = self - .global_connections_count - .fetch_sub(clients_removed, atomic::Ordering::Relaxed) - - clients_removed; - Metrics::get() - .proxy - .http_pool_opened_connections - .get_metric() - .dec_by(clients_removed as i64); - info!("pool: performed global pool gc. removed {clients_removed} clients, total number of clients in pool is {size}"); - } - let removed = current_len - new_len; - - if removed > 0 { - let global_pool_size = self - .global_pool_size - .fetch_sub(removed, atomic::Ordering::Relaxed) - - removed; - info!("pool: performed global pool gc. size now {global_pool_size}"); - } - } - +impl GlobalConnPool> { #[expect(unused_results)] pub(crate) fn get( self: &Arc, @@ -226,27 +131,28 @@ impl GlobalConnPool { return result; }; - tracing::Span::current().record("conn_id", tracing::field::display(client.conn_id)); + tracing::Span::current().record("conn_id", tracing::field::display(client.conn.conn_id)); debug!( cold_start_info = ColdStartInfo::HttpPoolHit.as_str(), "pool: reusing connection '{conn_info}'" ); ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit); ctx.success(); - Ok(Some(Client::new(client.conn, client.aux))) + + Ok(Some(Client::new(client.conn.clone()))) } fn get_or_create_endpoint_pool( self: &Arc, endpoint: &EndpointCacheKey, - ) -> Arc>> { + ) -> Arc>> { // fast path if let Some(pool) = self.global_pool.get(endpoint) { return pool.clone(); } // slow path - let new_pool = Arc::new(RwLock::new(EndpointConnPool { + let new_pool = Arc::new(RwLock::new(HttpConnPool { conns: VecDeque::new(), _guard: Metrics::get().proxy.http_endpoint_pools.guard(), global_connections_count: self.global_connections_count.clone(), @@ -279,7 +185,7 @@ impl GlobalConnPool { } pub(crate) fn poll_http2_client( - global_pool: Arc>, + global_pool: Arc>>, ctx: &RequestContext, conn_info: &ConnInfo, client: Send, @@ -299,11 +205,15 @@ pub(crate) fn poll_http2_client( let pool = match conn_info.endpoint_cache_key() { Some(endpoint) => { let pool = global_pool.get_or_create_endpoint_pool(&endpoint); - - pool.write().conns.push_back(ConnPoolEntry { - conn: client.clone(), - conn_id, + let client = ClientInnerCommon { + inner: client.clone(), aux: aux.clone(), + conn_id, + data: ClientDataEnum::Http(ClientDataHttp()), + }; + pool.write().conns.push_back(ConnPoolEntry { + conn: client, + _last_access: std::time::Instant::now(), }); Metrics::get() .proxy @@ -335,23 +245,30 @@ pub(crate) fn poll_http2_client( .instrument(span), ); - Client::new(client, aux) + let client = ClientInnerCommon { + inner: client, + aux, + conn_id, + data: ClientDataEnum::Http(ClientDataHttp()), + }; + + Client::new(client) } pub(crate) struct Client { - pub(crate) inner: C, - aux: MetricsAuxInfo, + pub(crate) inner: ClientInnerCommon, } impl Client { - pub(self) fn new(inner: C, aux: MetricsAuxInfo) -> Self { - Self { inner, aux } + pub(self) fn new(inner: ClientInnerCommon) -> Self { + Self { inner } } pub(crate) fn metrics(&self) -> Arc { + let aux = &self.inner.aux; USAGE_METRICS.register(Ids { - endpoint_id: self.aux.endpoint_id, - branch_id: self.aux.branch_id, + endpoint_id: aux.endpoint_id, + branch_id: aux.branch_id, }) } } diff --git a/proxy/src/serverless/local_conn_pool.rs b/proxy/src/serverless/local_conn_pool.rs index 310af08221ac..9abe35db08ed 100644 --- a/proxy/src/serverless/local_conn_pool.rs +++ b/proxy/src/serverless/local_conn_pool.rs @@ -44,6 +44,7 @@ pub(crate) const EXT_NAME: &str = "pg_session_jwt"; pub(crate) const EXT_VERSION: &str = "0.1.2"; pub(crate) const EXT_SCHEMA: &str = "auth"; +#[derive(Clone)] pub(crate) struct ClientDataLocal { session: tokio::sync::watch::Sender, cancel: CancellationToken, diff --git a/proxy/src/serverless/mod.rs b/proxy/src/serverless/mod.rs index 59247f03bf0d..77025f419d4a 100644 --- a/proxy/src/serverless/mod.rs +++ b/proxy/src/serverless/mod.rs @@ -88,7 +88,7 @@ pub async fn task_main( } }); - let http_conn_pool = http_conn_pool::GlobalConnPool::new(&config.http_config); + let http_conn_pool = conn_pool_lib::GlobalConnPool::new(&config.http_config); { let http_conn_pool = Arc::clone(&http_conn_pool); tokio::spawn(async move { diff --git a/proxy/src/serverless/sql_over_http.rs b/proxy/src/serverless/sql_over_http.rs index 1b17495c5da3..03b37bccd568 100644 --- a/proxy/src/serverless/sql_over_http.rs +++ b/proxy/src/serverless/sql_over_http.rs @@ -779,6 +779,7 @@ async fn handle_auth_broker_inner( let _metrics = client.metrics(); Ok(client + .inner .inner .send_request(req) .await From 59c2c3f8ad551b9ca71e4c21862a63585977fbde Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Thu, 21 Nov 2024 05:46:01 +0100 Subject: [PATCH 09/12] compute_ctl: print OpenTelemetry errors via tracing, not stdout (#9830) Before, `OpenTelemetry` errors were printed to stdout/stderr directly, causing one of the few log lines without a timestamp, like: ``` OpenTelemetry trace error occurred. error sending request for url (http://localhost:4318/v1/traces) ``` Now, we print: ``` 2024-11-21T02:24:20.511160Z INFO OpenTelemetry error: error sending request for url (http://localhost:4318/v1/traces) ``` I found this while investigating #9731. --- compute_tools/src/bin/compute_ctl.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index 284db005c86c..4689cc2b83d3 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -105,6 +105,11 @@ fn main() -> Result<()> { fn init() -> Result<(String, clap::ArgMatches)> { init_tracing_and_logging(DEFAULT_LOG_LEVEL)?; + opentelemetry::global::set_error_handler(|err| { + tracing::info!("OpenTelemetry error: {err}"); + }) + .expect("global error handler lock poisoned"); + let mut signals = Signals::new([SIGINT, SIGTERM, SIGQUIT])?; thread::spawn(move || { for sig in signals.forever() { From 42bda5d6323d6655dd866c01dfa680f8479d7e43 Mon Sep 17 00:00:00 2001 From: John Spray Date: Thu, 21 Nov 2024 08:31:24 +0000 Subject: [PATCH 10/12] pageserver: revise metrics lifetime for SecondaryTenant (#9818) ## Problem We saw a scale test failure when one shard went secondary->attached->secondary in a short period of time -- the metrics for the shard failed a validation assertion that is meant to ensure the size metric matches the sum of layer sizes in the SecondaryDetail struct. This appears to be due to two SecondaryTenants being alive at the same time -- the first one was shut down but still had its contributions to the metrics. Closes: https://github.com/neondatabase/neon/issues/9628 ## Summary of changes - Refactor code for validating metrics and call it in shutdown as well as during downloads - Move code for dropping per-tenant secondary metrics from drop() into shutdown(), so that once shutdown() completes it is definitely safe to instantiate another SecondaryTenant for the same tenant. --- pageserver/src/tenant/secondary.rs | 30 +++++++++++------ pageserver/src/tenant/secondary/downloader.rs | 32 ++++++++----------- 2 files changed, 35 insertions(+), 27 deletions(-) diff --git a/pageserver/src/tenant/secondary.rs b/pageserver/src/tenant/secondary.rs index 1331c07d05cf..3df89a928cb2 100644 --- a/pageserver/src/tenant/secondary.rs +++ b/pageserver/src/tenant/secondary.rs @@ -111,15 +111,6 @@ pub(crate) struct SecondaryTenant { pub(super) heatmap_total_size_metric: UIntGauge, } -impl Drop for SecondaryTenant { - fn drop(&mut self) { - let tenant_id = self.tenant_shard_id.tenant_id.to_string(); - let shard_id = format!("{}", self.tenant_shard_id.shard_slug()); - let _ = SECONDARY_RESIDENT_PHYSICAL_SIZE.remove_label_values(&[&tenant_id, &shard_id]); - let _ = SECONDARY_HEATMAP_TOTAL_SIZE.remove_label_values(&[&tenant_id, &shard_id]); - } -} - impl SecondaryTenant { pub(crate) fn new( tenant_shard_id: TenantShardId, @@ -167,6 +158,13 @@ impl SecondaryTenant { // Wait for any secondary downloader work to complete self.gate.close().await; + + self.validate_metrics(); + + let tenant_id = self.tenant_shard_id.tenant_id.to_string(); + let shard_id = format!("{}", self.tenant_shard_id.shard_slug()); + let _ = SECONDARY_RESIDENT_PHYSICAL_SIZE.remove_label_values(&[&tenant_id, &shard_id]); + let _ = SECONDARY_HEATMAP_TOTAL_SIZE.remove_label_values(&[&tenant_id, &shard_id]); } pub(crate) fn set_config(&self, config: &SecondaryLocationConfig) { @@ -254,6 +252,20 @@ impl SecondaryTenant { .await .expect("secondary eviction should not have panicked"); } + + /// Exhaustive check that incrementally updated metrics match the actual state. + #[cfg(feature = "testing")] + fn validate_metrics(&self) { + let detail = self.detail.lock().unwrap(); + let resident_size = detail.total_resident_size(); + + assert_eq!(resident_size, self.resident_size_metric.get()); + } + + #[cfg(not(feature = "testing"))] + fn validate_metrics(&self) { + // No-op in non-testing builds + } } /// The SecondaryController is a pseudo-rpc client for administrative control of secondary mode downloads, diff --git a/pageserver/src/tenant/secondary/downloader.rs b/pageserver/src/tenant/secondary/downloader.rs index 82c570268612..7443261a9c00 100644 --- a/pageserver/src/tenant/secondary/downloader.rs +++ b/pageserver/src/tenant/secondary/downloader.rs @@ -242,6 +242,19 @@ impl SecondaryDetail { } } + #[cfg(feature = "testing")] + pub(crate) fn total_resident_size(&self) -> u64 { + self.timelines + .values() + .map(|tl| { + tl.on_disk_layers + .values() + .map(|v| v.metadata.file_size) + .sum::() + }) + .sum::() + } + pub(super) fn evict_layer( &mut self, name: LayerName, @@ -763,24 +776,7 @@ impl<'a> TenantDownloader<'a> { } // Metrics consistency check in testing builds - if cfg!(feature = "testing") { - let detail = self.secondary_state.detail.lock().unwrap(); - let resident_size = detail - .timelines - .values() - .map(|tl| { - tl.on_disk_layers - .values() - .map(|v| v.metadata.file_size) - .sum::() - }) - .sum::(); - assert_eq!( - resident_size, - self.secondary_state.resident_size_metric.get() - ); - } - + self.secondary_state.validate_metrics(); // Only update last_etag after a full successful download: this way will not skip // the next download, even if the heatmap's actual etag is unchanged. self.secondary_state.detail.lock().unwrap().last_download = Some(DownloadSummary { From e82deb2cccc25d8737148e0db27b993cc4df0f55 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 21 Nov 2024 11:16:00 +0100 Subject: [PATCH 11/12] high-resolution CPU usage --- libs/metrics/src/more_process_metrics.rs | 40 ++++++++++++++++++- .../test_pageserver_getpage_merge.py | 2 +- 2 files changed, 39 insertions(+), 3 deletions(-) diff --git a/libs/metrics/src/more_process_metrics.rs b/libs/metrics/src/more_process_metrics.rs index 920724fdecb4..13a745e03148 100644 --- a/libs/metrics/src/more_process_metrics.rs +++ b/libs/metrics/src/more_process_metrics.rs @@ -2,14 +2,28 @@ // This module has heavy inspiration from the prometheus crate's `process_collector.rs`. +use once_cell::sync::Lazy; +use prometheus::Gauge; + use crate::UIntGauge; pub struct Collector { descs: Vec, vmlck: crate::UIntGauge, + cpu_seconds_highres: Gauge, } -const NMETRICS: usize = 1; +const NMETRICS: usize = 2; + +static CLK_TCK_F64: Lazy = Lazy::new(|| { + let long = unsafe { libc::sysconf(libc::_SC_CLK_TCK) }; + if long == -1 { + panic!("sysconf(_SC_CLK_TCK) failed"); + } + let convertible_to_f64: i32 = + i32::try_from(long).expect("sysconf(_SC_CLK_TCK) is larger than i32"); + convertible_to_f64 as f64 +}); impl prometheus::core::Collector for Collector { fn desc(&self) -> Vec<&prometheus::core::Desc> { @@ -27,6 +41,12 @@ impl prometheus::core::Collector for Collector { mfs.extend(self.vmlck.collect()) } } + if let Ok(stat) = myself.stat() { + let cpu_seconds = stat.utime + stat.stime; + self.cpu_seconds_highres + .set(cpu_seconds as f64 / *CLK_TCK_F64); + mfs.extend(self.cpu_seconds_highres.collect()); + } mfs } } @@ -43,7 +63,23 @@ impl Collector { .cloned(), ); - Self { descs, vmlck } + let cpu_seconds_highres = Gauge::new( + "libmetrics_process_cpu_seconds_highres", + "Total user and system CPU time spent in seconds.\ + Sub-second resolution, hence better than `process_cpu_seconds_total`.", + ) + .unwrap(); + descs.extend( + prometheus::core::Collector::desc(&cpu_seconds_highres) + .into_iter() + .cloned(), + ); + + Self { + descs, + vmlck, + cpu_seconds_highres, + } } } diff --git a/test_runner/performance/pageserver/test_pageserver_getpage_merge.py b/test_runner/performance/pageserver/test_pageserver_getpage_merge.py index be7cf66c7957..c8522af476bc 100644 --- a/test_runner/performance/pageserver/test_pageserver_getpage_merge.py +++ b/test_runner/performance/pageserver/test_pageserver_getpage_merge.py @@ -146,7 +146,7 @@ def get_metrics() -> Metrics: ).value, compute_getpage_count=compute_getpage_count, pageserver_cpu_seconds_total=pageserver_metrics.query_one( - "process_cpu_seconds_total" + "libmetrics_process_cpu_seconds_highres" ).value, ) From 3375f289902261dd9d1c58fb3c32add28d081341 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 21 Nov 2024 11:21:50 +0100 Subject: [PATCH 12/12] pytest.approx; https://github.com/neondatabase/neon/pull/9820#discussion_r1850679974 --- .../pageserver/test_pageserver_getpage_merge.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/test_runner/performance/pageserver/test_pageserver_getpage_merge.py b/test_runner/performance/pageserver/test_pageserver_getpage_merge.py index c8522af476bc..272446b73c6d 100644 --- a/test_runner/performance/pageserver/test_pageserver_getpage_merge.py +++ b/test_runner/performance/pageserver/test_pageserver_getpage_merge.py @@ -176,11 +176,10 @@ def workload() -> Metrics: # # Sanity-checks on the collected data # - def close_enough(a, b): - return (a / b > 0.99 and a / b < 1.01) and (b / a > 0.99 and b / a < 1.01) - # assert that getpage counts roughly match between compute and ps - assert close_enough(metrics.pageserver_getpage_count, metrics.compute_getpage_count) + assert metrics.pageserver_getpage_count == pytest.approx( + metrics.compute_getpage_count, rel=0.01 + ) # # Record the results