From 56065a87d5185fa182bd9bcca79d6b09f47598e5 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Mon, 25 Nov 2024 19:54:48 +0100 Subject: [PATCH] pageserver: add wal receiver proto tenant conf override --- control_plane/src/pageserver.rs | 5 +++++ libs/pageserver_api/src/config.rs | 3 +++ libs/pageserver_api/src/models.rs | 2 ++ pageserver/src/tenant.rs | 1 + pageserver/src/tenant/config.rs | 8 ++++++++ pageserver/src/tenant/timeline.rs | 18 +++++++++++++++++- .../regress/test_attach_tenant_config.py | 4 ++++ 7 files changed, 40 insertions(+), 1 deletion(-) diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index ae5e22ddc6cb..1d1455b95b99 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -415,6 +415,11 @@ impl PageServerNode { .map(|x| x.parse::()) .transpose() .context("Failed to parse 'timeline_offloading' as bool")?, + wal_receiver_protocol_override: settings + .remove("wal_receiver_protocol_override") + .map(serde_json::from_str) + .transpose() + .context("parse `wal_receiver_protocol_override` from json")?, }; if !settings.is_empty() { bail!("Unrecognized tenant settings: {settings:?}") diff --git a/libs/pageserver_api/src/config.rs b/libs/pageserver_api/src/config.rs index 0abca5cdc29a..721d97404ba7 100644 --- a/libs/pageserver_api/src/config.rs +++ b/libs/pageserver_api/src/config.rs @@ -278,6 +278,8 @@ pub struct TenantConfigToml { /// Enable auto-offloading of timelines. /// (either this flag or the pageserver-global one need to be set) pub timeline_offloading: bool, + + pub wal_receiver_protocol_override: Option, } pub mod defaults { @@ -510,6 +512,7 @@ impl Default for TenantConfigToml { lsn_lease_length: LsnLease::DEFAULT_LENGTH, lsn_lease_length_for_ts: LsnLease::DEFAULT_LENGTH_FOR_TS, timeline_offloading: false, + wal_receiver_protocol_override: None, } } } diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 1b86bfd91ad5..42c5d10c053b 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -23,6 +23,7 @@ use utils::{ completion, id::{NodeId, TenantId, TimelineId}, lsn::Lsn, + postgres_client::PostgresClientProtocol, serde_system_time, }; @@ -352,6 +353,7 @@ pub struct TenantConfig { pub lsn_lease_length: Option, pub lsn_lease_length_for_ts: Option, pub timeline_offloading: Option, + pub wal_receiver_protocol_override: Option, } /// The policy for the aux file storage. diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 0214ee68fa08..bddcb534a195 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -5344,6 +5344,7 @@ pub(crate) mod harness { lsn_lease_length: Some(tenant_conf.lsn_lease_length), lsn_lease_length_for_ts: Some(tenant_conf.lsn_lease_length_for_ts), timeline_offloading: Some(tenant_conf.timeline_offloading), + wal_receiver_protocol_override: tenant_conf.wal_receiver_protocol_override, } } } diff --git a/pageserver/src/tenant/config.rs b/pageserver/src/tenant/config.rs index 4d6176bfd9d3..5d3ac5a8e335 100644 --- a/pageserver/src/tenant/config.rs +++ b/pageserver/src/tenant/config.rs @@ -19,6 +19,7 @@ use serde_json::Value; use std::num::NonZeroU64; use std::time::Duration; use utils::generation::Generation; +use utils::postgres_client::PostgresClientProtocol; #[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq, Eq)] pub(crate) enum AttachmentMode { @@ -353,6 +354,9 @@ pub struct TenantConfOpt { #[serde(skip_serializing_if = "Option::is_none")] #[serde(default)] pub timeline_offloading: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + pub wal_receiver_protocol_override: Option, } impl TenantConfOpt { @@ -418,6 +422,9 @@ impl TenantConfOpt { timeline_offloading: self .lazy_slru_download .unwrap_or(global_conf.timeline_offloading), + wal_receiver_protocol_override: self + .wal_receiver_protocol_override + .or(global_conf.wal_receiver_protocol_override), } } } @@ -472,6 +479,7 @@ impl From for models::TenantConfig { lsn_lease_length: value.lsn_lease_length.map(humantime), lsn_lease_length_for_ts: value.lsn_lease_length_for_ts.map(humantime), timeline_offloading: value.timeline_offloading, + wal_receiver_protocol_override: value.wal_receiver_protocol_override, } } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index c1ff0f426d5c..afd4664d01cf 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -50,6 +50,7 @@ use tokio_util::sync::CancellationToken; use tracing::*; use utils::{ fs_ext, pausable_failpoint, + postgres_client::PostgresClientProtocol, sync::gate::{Gate, GateGuard}, }; use wal_decoder::serialized_batch::SerializedValueBatch; @@ -2178,6 +2179,21 @@ impl Timeline { ) } + /// Resolve the effective WAL receiver protocol to use for this tenant. + /// + /// Priority order is: + /// 1. Tenant config override + /// 2. Default value for tenant config override + /// 3. Pageserver config override + /// 4. Pageserver config default + pub fn resolve_wal_receiver_protocol(&self) -> PostgresClientProtocol { + let tenant_conf = self.tenant_conf.load().tenant_conf.clone(); + tenant_conf + .wal_receiver_protocol_override + .or(self.conf.default_tenant_conf.wal_receiver_protocol_override) + .unwrap_or(self.conf.wal_receiver_protocol) + } + pub(super) fn tenant_conf_updated(&self, new_conf: &AttachedTenantConf) { // NB: Most tenant conf options are read by background loops, so, // changes will automatically be picked up. @@ -2470,7 +2486,7 @@ impl Timeline { *guard = Some(WalReceiver::start( Arc::clone(self), WalReceiverConf { - protocol: self.conf.wal_receiver_protocol, + protocol: self.resolve_wal_receiver_protocol(), wal_connect_timeout, lagging_wal_timeout, max_lsn_wal_lag, diff --git a/test_runner/regress/test_attach_tenant_config.py b/test_runner/regress/test_attach_tenant_config.py index 5744c445f671..670c2698f5aa 100644 --- a/test_runner/regress/test_attach_tenant_config.py +++ b/test_runner/regress/test_attach_tenant_config.py @@ -174,6 +174,10 @@ def test_fully_custom_config(positive_env: NeonEnv): "lsn_lease_length": "1m", "lsn_lease_length_for_ts": "5s", "timeline_offloading": True, + "wal_receiver_protocol_override": { + "type": "interpreted", + "args": {"format": "bincode", "compression": {"zstd": {"level": 1}}}, + }, } vps_http = env.storage_controller.pageserver_api()