diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index 6b670de2ea85..b178d7abd6d6 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -37,6 +37,7 @@ use std::collections::HashMap; use std::fs::File; use std::path::Path; use std::process::exit; +use std::str::FromStr; use std::sync::atomic::Ordering; use std::sync::{mpsc, Arc, Condvar, Mutex, RwLock}; use std::{thread, time::Duration}; @@ -322,8 +323,15 @@ fn wait_spec( } else { spec_set = false; } + let connstr = Url::parse(connstr).context("cannot parse connstr as a URL")?; + let conn_conf = postgres::config::Config::from_str(connstr.as_str()) + .context("cannot build postgres config from connstr")?; + let tokio_conn_conf = tokio_postgres::config::Config::from_str(connstr.as_str()) + .context("cannot build tokio postgres config from connstr")?; let compute_node = ComputeNode { - connstr: Url::parse(connstr).context("cannot parse connstr as a URL")?, + connstr, + conn_conf, + tokio_conn_conf, pgdata: pgdata.to_string(), pgbin: pgbin.to_string(), pgversion: get_pg_version_string(pgbin), diff --git a/compute_tools/src/catalog.rs b/compute_tools/src/catalog.rs index 08ae8bf44d71..72198a947944 100644 --- a/compute_tools/src/catalog.rs +++ b/compute_tools/src/catalog.rs @@ -6,7 +6,6 @@ use tokio::{ process::Command, spawn, }; -use tokio_postgres::connect; use tokio_stream::{self as stream, StreamExt}; use tokio_util::codec::{BytesCodec, FramedRead}; use tracing::warn; @@ -16,10 +15,8 @@ use crate::pg_helpers::{get_existing_dbs_async, get_existing_roles_async, postgr use compute_api::responses::CatalogObjects; pub async fn get_dbs_and_roles(compute: &Arc) -> anyhow::Result { - let connstr = compute.connstr.clone(); - - let (client, connection): (tokio_postgres::Client, _) = - connect(connstr.as_str(), NoTls).await?; + let conf = compute.get_tokio_conn_conf(Some("compute_ctl:get_dbs_and_roles")); + let (client, connection): (tokio_postgres::Client, _) = conf.connect(NoTls).await?; spawn(async move { if let Err(e) = connection.await { diff --git a/compute_tools/src/checker.rs b/compute_tools/src/checker.rs index cec2b1bed833..62d61a8bc987 100644 --- a/compute_tools/src/checker.rs +++ b/compute_tools/src/checker.rs @@ -9,7 +9,8 @@ use crate::compute::ComputeNode; #[instrument(skip_all)] pub async fn check_writability(compute: &ComputeNode) -> Result<()> { // Connect to the database. - let (client, connection) = tokio_postgres::connect(compute.connstr.as_str(), NoTls).await?; + let conf = compute.get_tokio_conn_conf(Some("compute_ctl:availability_checker")); + let (client, connection) = conf.connect(NoTls).await?; if client.is_closed() { return Err(anyhow!("connection to postgres closed")); } diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 1a026a40143a..da1caf1a9b2f 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -20,8 +20,9 @@ use futures::future::join_all; use futures::stream::FuturesUnordered; use futures::StreamExt; use nix::unistd::Pid; +use postgres; use postgres::error::SqlState; -use postgres::{Client, NoTls}; +use postgres::NoTls; use tracing::{debug, error, info, instrument, warn}; use utils::id::{TenantId, TimelineId}; use utils::lsn::Lsn; @@ -58,6 +59,10 @@ pub static PG_PID: AtomicU32 = AtomicU32::new(0); pub struct ComputeNode { // Url type maintains proper escaping pub connstr: url::Url, + // We connect to Postgres from many different places, so build configs once + // and reuse them where needed. + pub conn_conf: postgres::config::Config, + pub tokio_conn_conf: tokio_postgres::config::Config, pub pgdata: String, pub pgbin: String, pub pgversion: String, @@ -800,10 +805,10 @@ impl ComputeNode { /// version. In the future, it may upgrade all 3rd-party extensions. #[instrument(skip_all)] pub fn post_apply_config(&self) -> Result<()> { - let connstr = self.connstr.clone(); + let conf = self.get_conn_conf(Some("compute_ctl:post_apply_config")); thread::spawn(move || { let func = || { - let mut client = Client::connect(connstr.as_str(), NoTls)?; + let mut client = conf.connect(NoTls)?; handle_neon_extension_upgrade(&mut client) .context("handle_neon_extension_upgrade")?; Ok::<_, anyhow::Error>(()) @@ -815,12 +820,27 @@ impl ComputeNode { Ok(()) } + pub fn get_conn_conf(&self, application_name: Option<&str>) -> postgres::Config { + let mut conf = self.conn_conf.clone(); + if let Some(application_name) = application_name { + conf.application_name(application_name); + } + conf + } + + pub fn get_tokio_conn_conf(&self, application_name: Option<&str>) -> tokio_postgres::Config { + let mut conf = self.tokio_conn_conf.clone(); + if let Some(application_name) = application_name { + conf.application_name(application_name); + } + conf + } + async fn get_maintenance_client( conf: &tokio_postgres::Config, ) -> Result { let mut conf = conf.clone(); - - conf.application_name("apply_config"); + conf.application_name("compute_ctl:apply_config"); let (client, conn) = match conf.connect(NoTls).await { // If connection fails, it may be the old node with `zenith_admin` superuser. @@ -837,6 +857,7 @@ impl ComputeNode { e ); let mut zenith_admin_conf = postgres::config::Config::from(conf.clone()); + zenith_admin_conf.application_name("compute_ctl:apply_config"); zenith_admin_conf.user("zenith_admin"); let mut client = @@ -1134,8 +1155,7 @@ impl ComputeNode { /// Do initial configuration of the already started Postgres. #[instrument(skip_all)] pub fn apply_config(&self, compute_state: &ComputeState) -> Result<()> { - let mut conf = tokio_postgres::Config::from_str(self.connstr.as_str()).unwrap(); - conf.application_name("apply_config"); + let conf = self.get_tokio_conn_conf(Some("compute_ctl:apply_config")); let conf = Arc::new(conf); let spec = Arc::new( @@ -1161,7 +1181,7 @@ impl ComputeNode { thread::spawn(move || { let conf = conf.as_ref().clone(); let mut conf = postgres::config::Config::from(conf); - conf.application_name("migrations"); + conf.application_name("compute_ctl:migrations"); let mut client = conf.connect(NoTls)?; handle_migrations(&mut client).context("apply_config handle_migrations") @@ -1369,9 +1389,9 @@ impl ComputeNode { } self.post_apply_config()?; - let connstr = self.connstr.clone(); + let conf = self.get_conn_conf(None); thread::spawn(move || { - let res = get_installed_extensions(&connstr); + let res = get_installed_extensions(conf); match res { Ok(extensions) => { info!( @@ -1510,7 +1530,8 @@ impl ComputeNode { /// Select `pg_stat_statements` data and return it as a stringified JSON pub async fn collect_insights(&self) -> String { let mut result_rows: Vec = Vec::new(); - let connect_result = tokio_postgres::connect(self.connstr.as_str(), NoTls).await; + let conf = self.get_tokio_conn_conf(Some("compute_ctl:collect_insights")); + let connect_result = conf.connect(NoTls).await; let (client, connection) = connect_result.unwrap(); tokio::spawn(async move { if let Err(e) = connection.await { @@ -1636,10 +1657,9 @@ LIMIT 100", privileges: &[Privilege], role_name: &PgIdent, ) -> Result<()> { - use tokio_postgres::config::Config; use tokio_postgres::NoTls; - let mut conf = Config::from_str(self.connstr.as_str()).unwrap(); + let mut conf = self.get_tokio_conn_conf(Some("compute_ctl:set_role_grants")); conf.dbname(db_name); let (db_client, conn) = conf @@ -1676,10 +1696,9 @@ LIMIT 100", db_name: &PgIdent, ext_version: ExtVersion, ) -> Result { - use tokio_postgres::config::Config; use tokio_postgres::NoTls; - let mut conf = Config::from_str(self.connstr.as_str()).unwrap(); + let mut conf = self.get_tokio_conn_conf(Some("compute_ctl:install_extension")); conf.dbname(db_name); let (db_client, conn) = conf diff --git a/compute_tools/src/http/api.rs b/compute_tools/src/http/api.rs index a6c6cff20af5..7fa6426d8f9e 100644 --- a/compute_tools/src/http/api.rs +++ b/compute_tools/src/http/api.rs @@ -295,12 +295,11 @@ async fn routes(req: Request, compute: &Arc) -> Response render_json(Body::from(serde_json::to_string(&res).unwrap())), diff --git a/compute_tools/src/installed_extensions.rs b/compute_tools/src/installed_extensions.rs index f473c29a558e..5f62f08858a7 100644 --- a/compute_tools/src/installed_extensions.rs +++ b/compute_tools/src/installed_extensions.rs @@ -10,8 +10,6 @@ use metrics::core::Collector; use metrics::{register_uint_gauge_vec, UIntGaugeVec}; use once_cell::sync::Lazy; -use crate::pg_helpers::postgres_conf_for_db; - /// We don't reuse get_existing_dbs() just for code clarity /// and to make database listing query here more explicit. /// @@ -41,14 +39,16 @@ fn list_dbs(client: &mut Client) -> Result> { /// /// Same extension can be installed in multiple databases with different versions, /// we only keep the highest and lowest version across all databases. -pub fn get_installed_extensions(connstr: &url::Url) -> Result { - let mut client = Client::connect(connstr.as_str(), NoTls)?; +pub fn get_installed_extensions(mut conf: postgres::config::Config) -> Result { + conf.application_name("compute_ctl:get_installed_extensions"); + let mut client = conf.connect(NoTls)?; + let databases: Vec = list_dbs(&mut client)?; let mut extensions_map: HashMap = HashMap::new(); for db in databases.iter() { - let config = postgres_conf_for_db(connstr, db)?; - let mut db_client = config.connect(NoTls)?; + conf.dbname(db); + let mut db_client = conf.connect(NoTls)?; let extensions: Vec<(String, String)> = db_client .query( "SELECT extname, extversion FROM pg_catalog.pg_extension;", @@ -82,7 +82,7 @@ pub fn get_installed_extensions(connstr: &url::Url) -> Result = None; @@ -57,7 +54,7 @@ fn watch_compute_activity(compute: &ComputeNode) { info!("connection to Postgres is closed, trying to reconnect"); // Connection is closed, reconnect and try again. - client = Client::connect(connstr, NoTls); + client = conf.connect(NoTls); continue; } @@ -196,7 +193,7 @@ fn watch_compute_activity(compute: &ComputeNode) { debug!("could not connect to Postgres: {}, retrying", e); // Establish a new connection and try again. - client = Client::connect(connstr, NoTls); + client = conf.connect(NoTls); } } }