diff --git a/Cargo.lock b/Cargo.lock index 1cfa70e9c11a..730c5535683d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -413,6 +413,7 @@ dependencies = [ "thiserror", "toml", "url", + "walkeeper", "workspace_hack", "zenith_utils", ] diff --git a/control_plane/Cargo.toml b/control_plane/Cargo.toml index 5e972200c20e..eff6b3ef2dc3 100644 --- a/control_plane/Cargo.toml +++ b/control_plane/Cargo.toml @@ -17,5 +17,6 @@ url = "2.2.2" reqwest = { version = "0.11", default-features = false, features = ["blocking", "json", "rustls-tls"] } pageserver = { path = "../pageserver" } +walkeeper = { path = "../walkeeper" } zenith_utils = { path = "../zenith_utils" } workspace_hack = { path = "../workspace_hack" } diff --git a/control_plane/src/safekeeper.rs b/control_plane/src/safekeeper.rs index 351d1efbbca9..969e2cd5312c 100644 --- a/control_plane/src/safekeeper.rs +++ b/control_plane/src/safekeeper.rs @@ -14,8 +14,9 @@ use postgres::Config; use reqwest::blocking::{Client, RequestBuilder, Response}; use reqwest::{IntoUrl, Method}; use thiserror::Error; +use walkeeper::http::models::TimelineCreateRequest; use zenith_utils::http::error::HttpErrorBody; -use zenith_utils::zid::ZNodeId; +use zenith_utils::zid::{ZNodeId, ZTenantId, ZTimelineId}; use crate::local_env::{LocalEnv, SafekeeperConf}; use crate::storage::PageServerNode; @@ -261,4 +262,25 @@ impl SafekeeperNode { .error_from_body()?; Ok(()) } + + pub fn timeline_create( + &self, + tenant_id: ZTenantId, + timeline_id: ZTimelineId, + peer_ids: Vec, + ) -> Result<()> { + Ok(self + .http_request( + Method::POST, + format!("{}/{}", self.http_base_url, "timeline"), + ) + .json(&TimelineCreateRequest { + tenant_id, + timeline_id, + peer_ids, + }) + .send()? + .error_from_body()? + .json()?) + } } diff --git a/walkeeper/src/bin/safekeeper.rs b/walkeeper/src/bin/safekeeper.rs index f3cc3c01659e..087f686bb1db 100644 --- a/walkeeper/src/bin/safekeeper.rs +++ b/walkeeper/src/bin/safekeeper.rs @@ -11,7 +11,7 @@ use std::io::{ErrorKind, Write}; use std::path::{Path, PathBuf}; use std::thread; use tracing::*; -use walkeeper::timeline::{CreateControlFile, FileStorage}; +use walkeeper::timeline::FileStorage; use zenith_utils::http::endpoint; use zenith_utils::zid::ZNodeId; use zenith_utils::{logging, tcp_listener, GIT_VERSION}; @@ -102,7 +102,7 @@ fn main() -> Result<()> { .get_matches(); if let Some(addr) = arg_matches.value_of("dump-control-file") { - let state = FileStorage::load_control_file(Path::new(addr), CreateControlFile::False)?; + let state = FileStorage::load_control_file(Path::new(addr))?; let json = serde_json::to_string(&state)?; print!("{}", json); return Ok(()); diff --git a/walkeeper/src/handler.rs b/walkeeper/src/handler.rs index 21890764dbb5..a8d915b01e31 100644 --- a/walkeeper/src/handler.rs +++ b/walkeeper/src/handler.rs @@ -19,7 +19,6 @@ use zenith_utils::pq_proto::{BeMessage, FeStartupPacket, RowDescriptor, INT4_OID use zenith_utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId}; use crate::callmemaybe::CallmeEvent; -use crate::timeline::CreateControlFile; use tokio::sync::mpsc::UnboundedSender; /// Safekeeper handler of postgres commands @@ -109,18 +108,8 @@ impl postgres_backend::Handler for SafekeeperPostgresHandler { let tenantid = self.ztenantid.context("tenantid is required")?; let timelineid = self.ztimelineid.context("timelineid is required")?; if self.timeline.is_none() { - // START_WAL_PUSH is the only command that initializes the timeline in production. - // There is also JSON_CTRL command, which should initialize the timeline for testing. - let create_control_file = match cmd { - SafekeeperPostgresCommand::StartWalPush { .. } - | SafekeeperPostgresCommand::JSONCtrl { .. } => CreateControlFile::True, - _ => CreateControlFile::False, - }; - self.timeline.set( - &self.conf, - ZTenantTimelineId::new(tenantid, timelineid), - create_control_file, - )?; + self.timeline + .set(&self.conf, ZTenantTimelineId::new(tenantid, timelineid))?; } } } diff --git a/walkeeper/src/http/mod.rs b/walkeeper/src/http/mod.rs index c82d1c0362ed..4c0be17ecdec 100644 --- a/walkeeper/src/http/mod.rs +++ b/walkeeper/src/http/mod.rs @@ -1,2 +1,3 @@ +pub mod models; pub mod routes; pub use routes::make_router; diff --git a/walkeeper/src/http/models.rs b/walkeeper/src/http/models.rs new file mode 100644 index 000000000000..8a6ed7a81261 --- /dev/null +++ b/walkeeper/src/http/models.rs @@ -0,0 +1,9 @@ +use serde::{Deserialize, Serialize}; +use zenith_utils::zid::{ZNodeId, ZTenantId, ZTimelineId}; + +#[derive(Serialize, Deserialize)] +pub struct TimelineCreateRequest { + pub tenant_id: ZTenantId, + pub timeline_id: ZTimelineId, + pub peer_ids: Vec, +} diff --git a/walkeeper/src/http/routes.rs b/walkeeper/src/http/routes.rs index a93d5ca6b3ac..192fd866af42 100644 --- a/walkeeper/src/http/routes.rs +++ b/walkeeper/src/http/routes.rs @@ -1,8 +1,10 @@ use hyper::{Body, Request, Response, StatusCode}; + use serde::Serialize; use serde::Serializer; use std::fmt::Display; use std::sync::Arc; +use zenith_utils::http::json::json_request; use zenith_utils::http::{RequestExt, RouterBuilder}; use zenith_utils::lsn::Lsn; use zenith_utils::zid::ZNodeId; @@ -10,7 +12,6 @@ use zenith_utils::zid::ZTenantTimelineId; use crate::safekeeper::Term; use crate::safekeeper::TermHistory; -use crate::timeline::CreateControlFile; use crate::timeline::GlobalTimelines; use crate::SafeKeeperConf; use zenith_utils::http::endpoint; @@ -19,6 +20,8 @@ use zenith_utils::http::json::json_response; use zenith_utils::http::request::parse_request_param; use zenith_utils::zid::{ZTenantId, ZTimelineId}; +use super::models::TimelineCreateRequest; + #[derive(Debug, Serialize)] struct SafekeeperStatus { id: ZNodeId, @@ -78,8 +81,7 @@ async fn timeline_status_handler(request: Request) -> Result) -> Result) -> Result, ApiError> { + let request_data: TimelineCreateRequest = json_request(&mut request).await?; + + let zttid = ZTenantTimelineId { + tenant_id: request_data.tenant_id, + timeline_id: request_data.timeline_id, + }; + GlobalTimelines::create(get_conf(&request), zttid, request_data.peer_ids) + .map_err(ApiError::from_err)?; + + Ok(json_response(StatusCode::CREATED, ())?) +} + /// Safekeeper http router. pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder { let router = endpoint::make_router(); @@ -110,4 +125,5 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder "/v1/timeline/:tenant_id/:timeline_id", timeline_status_handler, ) + .post("/v1/timeline", timeline_create_handler) } diff --git a/walkeeper/src/safekeeper.rs b/walkeeper/src/safekeeper.rs index ff57aa1cdac1..36d81d5dee2c 100644 --- a/walkeeper/src/safekeeper.rs +++ b/walkeeper/src/safekeeper.rs @@ -10,6 +10,8 @@ use std::cmp::min; use std::fmt; use std::io::Read; use tracing::*; +use zenith_utils::zid::ZNodeId; +use zenith_utils::zid::ZTenantTimelineId; use lazy_static::lazy_static; @@ -26,12 +28,13 @@ use zenith_utils::pq_proto::ZenithFeedback; use zenith_utils::zid::{ZTenantId, ZTimelineId}; pub const SK_MAGIC: u32 = 0xcafeceefu32; -pub const SK_FORMAT_VERSION: u32 = 3; +pub const SK_FORMAT_VERSION: u32 = 4; const SK_PROTOCOL_VERSION: u32 = 1; const UNKNOWN_SERVER_VERSION: u32 = 0; /// Consensus logical timestamp. pub type Term = u64; +const INVALID_TERM: Term = 0; #[derive(Debug, Clone, Copy, Serialize, Deserialize)] pub struct TermSwitchEntry { @@ -129,18 +132,47 @@ pub struct ServerInfo { /// Postgres server version pub pg_version: u32, pub system_id: SystemId, - #[serde(with = "hex")] - pub tenant_id: ZTenantId, - /// Zenith timelineid - #[serde(with = "hex")] - pub timeline_id: ZTimelineId, pub wal_seg_size: u32, } +/// Data published by safekeeper to the peers +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PeerInfo { + /// LSN up to which safekeeper offloaded WAL to s3. + s3_wal_lsn: Lsn, + /// Term of the last entry. + term: Term, + /// LSN of the last record. + flush_lsn: Lsn, + /// Up to which LSN safekeeper regards its WAL as committed. + commit_lsn: Lsn, +} + +impl PeerInfo { + fn new() -> Self { + Self { + s3_wal_lsn: Lsn(0), + term: INVALID_TERM, + flush_lsn: Lsn(0), + commit_lsn: Lsn(0), + } + } +} + +// vector-based node id -> peer state map with very limited functionality we +// need/ +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Peers(pub Vec<(ZNodeId, PeerInfo)>); + /// Persistent information stored on safekeeper node /// On disk data is prefixed by magic and format version and followed by checksum. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct SafeKeeperState { + #[serde(with = "hex")] + pub tenant_id: ZTenantId, + /// Zenith timelineid + #[serde(with = "hex")] + pub timeline_id: ZTimelineId, /// persistent acceptor state pub acceptor_state: AcceptorState, /// information about server @@ -157,11 +189,18 @@ pub struct SafeKeeperState { // Safekeeper starts receiving WAL from this LSN, zeros before it ought to // be skipped during decoding. pub wal_start_lsn: Lsn, + // Peers and their state as we remember it. Knowing peers themselves is + // fundamental; but state is saved here only for informational purposes and + // obviously can be stale. (Currently not saved at all, but let's provision + // place to have less file version upgrades). + pub peers: Peers, } impl SafeKeeperState { - pub fn new() -> SafeKeeperState { + pub fn new(zttid: &ZTenantTimelineId, peers: Vec) -> SafeKeeperState { SafeKeeperState { + tenant_id: zttid.tenant_id, + timeline_id: zttid.timeline_id, acceptor_state: AcceptorState { term: 0, term_history: TermHistory::empty(), @@ -169,21 +208,19 @@ impl SafeKeeperState { server: ServerInfo { pg_version: UNKNOWN_SERVER_VERSION, /* Postgres server version */ system_id: 0, /* Postgres system identifier */ - tenant_id: ZTenantId::from([0u8; 16]), - timeline_id: ZTimelineId::from([0u8; 16]), wal_seg_size: 0, }, proposer_uuid: [0; 16], commit_lsn: Lsn(0), /* part of WAL acknowledged by quorum */ truncate_lsn: Lsn(0), /* minimal LSN which may be needed for recovery of some safekeeper */ wal_start_lsn: Lsn(0), + peers: Peers(peers.iter().map(|p| (*p, PeerInfo::new())).collect()), } } -} -impl Default for SafeKeeperState { - fn default() -> Self { - Self::new() + #[cfg(test)] + pub fn empty() -> Self { + SafeKeeperState::new(&ZTenantTimelineId::empty(), vec![]) } } @@ -502,10 +539,8 @@ where storage: ST, state: SafeKeeperState, ) -> SafeKeeper { - if state.server.timeline_id != ZTimelineId::from([0u8; 16]) - && ztli != state.server.timeline_id - { - panic!("Calling SafeKeeper::new with inconsistent ztli ({}) and SafeKeeperState.server.timeline_id ({})", ztli, state.server.timeline_id); + if ztli != state.timeline_id { + panic!("Calling SafeKeeper::new with inconsistent ztli ({}) and SafeKeeperState.timeline_id ({})", ztli, state.timeline_id); } SafeKeeper { flush_lsn, @@ -570,18 +605,30 @@ where msg.pg_version, self.s.server.pg_version ); } + if msg.tenant_id != self.s.tenant_id { + bail!( + "invalid tenant ID, got {}, expected {}", + msg.tenant_id, + self.s.tenant_id + ); + } + if msg.ztli != self.s.timeline_id { + bail!( + "invalid timeline ID, got {}, expected {}", + msg.ztli, + self.s.timeline_id + ); + } // set basic info about server, if not yet self.s.server.system_id = msg.system_id; - self.s.server.tenant_id = msg.tenant_id; - self.s.server.timeline_id = msg.ztli; self.s.server.wal_seg_size = msg.wal_seg_size; self.storage .persist(&self.s) .context("failed to persist shared state")?; self.metrics = SafeKeeperMetricsBuilder { - ztli: self.s.server.timeline_id, + ztli: self.s.timeline_id, flush_lsn: self.flush_lsn, commit_lsn: self.commit_lsn, } @@ -813,10 +860,10 @@ mod tests { #[test] fn test_voting() { let storage = InMemoryStorage { - persisted_state: SafeKeeperState::new(), + persisted_state: SafeKeeperState::empty(), }; let ztli = ZTimelineId::from([0u8; 16]); - let mut sk = SafeKeeper::new(ztli, Lsn(0), storage, SafeKeeperState::new()); + let mut sk = SafeKeeper::new(ztli, Lsn(0), storage, SafeKeeperState::empty()); // check voting for 1 is ok let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: 1 }); @@ -844,10 +891,10 @@ mod tests { #[test] fn test_epoch_switch() { let storage = InMemoryStorage { - persisted_state: SafeKeeperState::new(), + persisted_state: SafeKeeperState::empty(), }; let ztli = ZTimelineId::from([0u8; 16]); - let mut sk = SafeKeeper::new(ztli, Lsn(0), storage, SafeKeeperState::new()); + let mut sk = SafeKeeper::new(ztli, Lsn(0), storage, SafeKeeperState::empty()); let mut ar_hdr = AppendRequestHeader { term: 1, diff --git a/walkeeper/src/timeline.rs b/walkeeper/src/timeline.rs index d237c9a14f39..03162dd13b67 100644 --- a/walkeeper/src/timeline.rs +++ b/walkeeper/src/timeline.rs @@ -17,7 +17,7 @@ use tracing::*; use zenith_metrics::{register_histogram_vec, Histogram, HistogramVec, DISK_WRITE_SECONDS_BUCKETS}; use zenith_utils::bin_ser::LeSer; use zenith_utils::lsn::Lsn; -use zenith_utils::zid::ZTenantTimelineId; +use zenith_utils::zid::{ZNodeId, ZTenantTimelineId}; use crate::callmemaybe::{CallmeEvent, SubscriptionStateKey}; use crate::safekeeper::{ @@ -93,13 +93,6 @@ struct SharedState { pageserver_connstr: Option, } -// A named boolean. -#[derive(Debug)] -pub enum CreateControlFile { - True, - False, -} - lazy_static! { static ref PERSIST_CONTROL_FILE_SECONDS: HistogramVec = register_histogram_vec!( "safekeeper_persist_control_file_seconds", @@ -111,14 +104,31 @@ lazy_static! { } impl SharedState { - /// Restore SharedState from control file. - /// If create=false and file doesn't exist, bails out. - fn create_restore( + /// Initialize timeline state, creating control file + fn create( conf: &SafeKeeperConf, zttid: &ZTenantTimelineId, - create: CreateControlFile, + peer_ids: Vec, ) -> Result { - let state = FileStorage::load_control_file_conf(conf, zttid, create) + let state = SafeKeeperState::new(zttid, peer_ids); + let file_storage = FileStorage::new(zttid, conf); + let mut sk = SafeKeeper::new(zttid.timeline_id, Lsn(0), file_storage, state); + sk.storage.persist(&sk.s)?; + + Ok(Self { + notified_commit_lsn: Lsn(0), + sk, + replicas: Vec::new(), + active: false, + num_computes: 0, + pageserver_connstr: None, + }) + } + + /// Restore SharedState from control file. + /// If file doesn't exist, bails out. + fn restore(conf: &SafeKeeperConf, zttid: &ZTenantTimelineId) -> Result { + let state = FileStorage::load_control_file_conf(conf, zttid) .context("failed to load from control file")?; let file_storage = FileStorage::new(zttid, conf); let flush_lsn = if state.server.wal_seg_size != 0 { @@ -456,27 +466,14 @@ impl Timeline { // Utilities needed by various Connection-like objects pub trait TimelineTools { - fn set( - &mut self, - conf: &SafeKeeperConf, - zttid: ZTenantTimelineId, - create: CreateControlFile, - ) -> Result<()>; + fn set(&mut self, conf: &SafeKeeperConf, zttid: ZTenantTimelineId) -> Result<()>; fn get(&self) -> &Arc; } impl TimelineTools for Option> { - fn set( - &mut self, - conf: &SafeKeeperConf, - zttid: ZTenantTimelineId, - create: CreateControlFile, - ) -> Result<()> { - // We will only set the timeline once. If it were to ever change, - // anyone who cloned the Arc would be out of date. - assert!(self.is_none()); - *self = Some(GlobalTimelines::get(conf, zttid, create)?); + fn set(&mut self, conf: &SafeKeeperConf, zttid: ZTenantTimelineId) -> Result<()> { + *self = Some(GlobalTimelines::get(conf, zttid)?); Ok(()) } @@ -494,30 +491,38 @@ lazy_static! { pub struct GlobalTimelines; impl GlobalTimelines { - /// Get a timeline with control file loaded from the global TIMELINES map. - /// If control file doesn't exist and create=false, bails out. - pub fn get( + pub fn create( conf: &SafeKeeperConf, zttid: ZTenantTimelineId, - create: CreateControlFile, + peer_ids: Vec, ) -> Result> { let mut timelines = TIMELINES.lock().unwrap(); + match timelines.get(&zttid) { + Some(_) => bail!("timeline {} already exists", zttid), + None => { + // TODO: check directory existence + let dir = conf.timeline_dir(&zttid); + fs::create_dir_all(dir)?; + let shared_state = SharedState::create(conf, &zttid, peer_ids) + .context("failed to create shared state")?; + + let new_tli = Arc::new(Timeline::new(zttid, shared_state)); + timelines.insert(zttid, Arc::clone(&new_tli)); + Ok(new_tli) + } + } + } + + /// Get a timeline with control file loaded from the global TIMELINES map. + /// If control file doesn't exist, bails out. + pub fn get(conf: &SafeKeeperConf, zttid: ZTenantTimelineId) -> Result> { + let mut timelines = TIMELINES.lock().unwrap(); match timelines.get(&zttid) { Some(result) => Ok(Arc::clone(result)), None => { - if let CreateControlFile::True = create { - let dir = conf.timeline_dir(&zttid); - info!( - "creating timeline dir {}, create is {:?}", - dir.display(), - create - ); - fs::create_dir_all(dir)?; - } - - let shared_state = SharedState::create_restore(conf, &zttid, create) - .context("failed to restore shared state")?; + let shared_state = + SharedState::restore(conf, &zttid).context("failed to restore shared state")?; let new_tli = Arc::new(Timeline::new(zttid, shared_state)); timelines.insert(zttid, Arc::clone(&new_tli)); @@ -571,28 +576,23 @@ impl FileStorage { fn load_control_file_conf( conf: &SafeKeeperConf, zttid: &ZTenantTimelineId, - create: CreateControlFile, ) -> Result { let path = conf.timeline_dir(zttid).join(CONTROL_FILE_NAME); - Self::load_control_file(path, create) + Self::load_control_file(path) } /// Read in the control file. - /// If create=false and file doesn't exist, bails out. - pub fn load_control_file>( - control_file_path: P, - create: CreateControlFile, - ) -> Result { + /// If file doesn't exist, bails out. + pub fn load_control_file>(control_file_path: P) -> Result { info!( - "loading control file {}, create={:?}", + "loading control file {}", control_file_path.as_ref().display(), - create, ); let mut control_file = OpenOptions::new() .read(true) .write(true) - .create(matches!(create, CreateControlFile::True)) + .create(false) .open(&control_file_path) .with_context(|| { format!( @@ -602,40 +602,32 @@ impl FileStorage { })?; // Empty file is legit on 'create', don't try to deser from it. - let state = if control_file.metadata().unwrap().len() == 0 { - if let CreateControlFile::False = create { - bail!("control file is empty"); - } - SafeKeeperState::new() - } else { - let mut buf = Vec::new(); - control_file - .read_to_end(&mut buf) - .context("failed to read control file")?; + let mut buf = Vec::new(); + control_file + .read_to_end(&mut buf) + .context("failed to read control file")?; - let calculated_checksum = crc32c::crc32c(&buf[..buf.len() - CHECKSUM_SIZE]); + let calculated_checksum = crc32c::crc32c(&buf[..buf.len() - CHECKSUM_SIZE]); - let expected_checksum_bytes: &[u8; CHECKSUM_SIZE] = - buf[buf.len() - CHECKSUM_SIZE..].try_into()?; - let expected_checksum = u32::from_le_bytes(*expected_checksum_bytes); + let expected_checksum_bytes: &[u8; CHECKSUM_SIZE] = + buf[buf.len() - CHECKSUM_SIZE..].try_into()?; + let expected_checksum = u32::from_le_bytes(*expected_checksum_bytes); + + ensure!( + calculated_checksum == expected_checksum, + format!( + "safekeeper control file checksum mismatch: expected {} got {}", + expected_checksum, calculated_checksum + ) + ); - ensure!( - calculated_checksum == expected_checksum, + let state = FileStorage::deser_sk_state(&mut &buf[..buf.len() - CHECKSUM_SIZE]) + .with_context(|| { format!( - "safekeeper control file checksum mismatch: expected {} got {}", - expected_checksum, calculated_checksum + "while reading control file {}", + control_file_path.as_ref().display(), ) - ); - - FileStorage::deser_sk_state(&mut &buf[..buf.len() - CHECKSUM_SIZE]).with_context( - || { - format!( - "while reading control file {}", - control_file_path.as_ref().display(), - ) - }, - )? - }; + })?; Ok(state) } @@ -850,7 +842,7 @@ mod test { use super::FileStorage; use crate::{ safekeeper::{SafeKeeperState, Storage}, - timeline::{CreateControlFile, CONTROL_FILE_NAME}, + timeline::CONTROL_FILE_NAME, SafeKeeperConf, ZTenantTimelineId, }; use anyhow::Result; @@ -865,15 +857,24 @@ mod test { } } - fn load_from_control_file( + fn create( conf: &SafeKeeperConf, zttid: &ZTenantTimelineId, - create: CreateControlFile, ) -> Result<(FileStorage, SafeKeeperState)> { fs::create_dir_all(&conf.timeline_dir(zttid)).expect("failed to create timeline dir"); + let state = SafeKeeperState::empty(); + let mut storage = FileStorage::new(zttid, conf); + storage.persist(&state)?; + Ok((storage, state)) + } + + fn load_from_control_file( + conf: &SafeKeeperConf, + zttid: &ZTenantTimelineId, + ) -> Result<(FileStorage, SafeKeeperState)> { Ok(( FileStorage::new(zttid, conf), - FileStorage::load_control_file_conf(conf, zttid, create)?, + FileStorage::load_control_file_conf(conf, zttid)?, )) } @@ -882,16 +883,13 @@ mod test { let conf = stub_conf(); let zttid = ZTenantTimelineId::generate(); { - let (mut storage, mut state) = - load_from_control_file(&conf, &zttid, CreateControlFile::True) - .expect("failed to read state"); + let (mut storage, mut state) = create(&conf, &zttid).expect("failed to create state"); // change something state.wal_start_lsn = Lsn(42); storage.persist(&state).expect("failed to persist state"); } - let (_, state) = load_from_control_file(&conf, &zttid, CreateControlFile::False) - .expect("failed to read state"); + let (_, state) = load_from_control_file(&conf, &zttid).expect("failed to read state"); assert_eq!(state.wal_start_lsn, Lsn(42)); } @@ -900,9 +898,7 @@ mod test { let conf = stub_conf(); let zttid = ZTenantTimelineId::generate(); { - let (mut storage, mut state) = - load_from_control_file(&conf, &zttid, CreateControlFile::True) - .expect("failed to read state"); + let (mut storage, mut state) = create(&conf, &zttid).expect("failed to read state"); // change something state.wal_start_lsn = Lsn(42); storage.persist(&state).expect("failed to persist state"); @@ -912,7 +908,7 @@ mod test { data[0] += 1; // change the first byte of the file to fail checksum validation fs::write(&control_path, &data).expect("failed to write control file"); - match load_from_control_file(&conf, &zttid, CreateControlFile::False) { + match load_from_control_file(&conf, &zttid) { Err(err) => assert!(err .to_string() .contains("safekeeper control file checksum mismatch")), diff --git a/walkeeper/src/upgrade.rs b/walkeeper/src/upgrade.rs index 913bd02c1ea8..5d1aa654b7d7 100644 --- a/walkeeper/src/upgrade.rs +++ b/walkeeper/src/upgrade.rs @@ -1,6 +1,6 @@ //! Code to deal with safekeeper control file upgrades use crate::safekeeper::{ - AcceptorState, PgUuid, SafeKeeperState, ServerInfo, Term, TermHistory, TermSwitchEntry, + AcceptorState, Peers, PgUuid, SafeKeeperState, ServerInfo, Term, TermHistory, TermSwitchEntry, }; use anyhow::{bail, Result}; use serde::{Deserialize, Serialize}; @@ -26,7 +26,7 @@ struct SafeKeeperStateV1 { /// persistent acceptor state acceptor_state: AcceptorStateV1, /// information about server - server: ServerInfo, + server: ServerInfoV2, /// Unique id of the last *elected* proposer we dealed with. Not needed /// for correctness, exists for monitoring purposes. proposer_uuid: PgUuid, @@ -70,6 +70,39 @@ pub struct SafeKeeperStateV2 { pub wal_start_lsn: Lsn, } +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct ServerInfoV3 { + /// Postgres server version + pub pg_version: u32, + pub system_id: SystemId, + #[serde(with = "hex")] + pub tenant_id: ZTenantId, + /// Zenith timelineid + #[serde(with = "hex")] + pub timeline_id: ZTimelineId, + pub wal_seg_size: u32, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SafeKeeperStateV3 { + /// persistent acceptor state + pub acceptor_state: AcceptorState, + /// information about server + pub server: ServerInfoV3, + /// Unique id of the last *elected* proposer we dealed with. Not needed + /// for correctness, exists for monitoring purposes. + #[serde(with = "hex")] + pub proposer_uuid: PgUuid, + /// part of WAL acknowledged by quorum and available locally + pub commit_lsn: Lsn, + /// minimal LSN which may be needed for recovery of some safekeeper (end_lsn + /// of last record streamed to everyone) + pub truncate_lsn: Lsn, + // Safekeeper starts receiving WAL from this LSN, zeros before it ought to + // be skipped during decoding. + pub wal_start_lsn: Lsn, +} + pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result { // migrate to storing full term history if version == 1 { @@ -83,12 +116,19 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result }]), }; return Ok(SafeKeeperState { + tenant_id: oldstate.server.tenant_id, + timeline_id: oldstate.server.ztli, acceptor_state: ac, - server: oldstate.server.clone(), + server: ServerInfo { + pg_version: oldstate.server.pg_version, + system_id: oldstate.server.system_id, + wal_seg_size: oldstate.server.wal_seg_size, + }, proposer_uuid: oldstate.proposer_uuid, commit_lsn: oldstate.commit_lsn, truncate_lsn: oldstate.truncate_lsn, wal_start_lsn: oldstate.wal_start_lsn, + peers: Peers(vec![]), }); // migrate to hexing some zids } else if version == 2 { @@ -97,17 +137,18 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result let server = ServerInfo { pg_version: oldstate.server.pg_version, system_id: oldstate.server.system_id, - tenant_id: oldstate.server.tenant_id, - timeline_id: oldstate.server.ztli, wal_seg_size: oldstate.server.wal_seg_size, }; return Ok(SafeKeeperState { + tenant_id: oldstate.server.tenant_id, + timeline_id: oldstate.server.ztli, acceptor_state: oldstate.acceptor_state, server, proposer_uuid: oldstate.proposer_uuid, commit_lsn: oldstate.commit_lsn, truncate_lsn: oldstate.truncate_lsn, wal_start_lsn: oldstate.wal_start_lsn, + peers: Peers(vec![]), }); } bail!("unsupported safekeeper control file version {}", version) diff --git a/zenith/src/main.rs b/zenith/src/main.rs index 6928c6f542b2..bf4c30e4d822 100644 --- a/zenith/src/main.rs +++ b/zenith/src/main.rs @@ -16,6 +16,7 @@ use walkeeper::defaults::{ DEFAULT_HTTP_LISTEN_PORT as DEFAULT_SAFEKEEPER_HTTP_PORT, DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT, }; + use zenith_utils::auth::{Claims, Scope}; use zenith_utils::postgres_backend::AuthType; use zenith_utils::zid::{ZNodeId, ZTenantId, ZTimelineId}; @@ -421,6 +422,16 @@ fn handle_init(init_match: &ArgMatches) -> Result<()> { exit(1); } + // If any safekeepers configured, grab generated timeline ID of the main + // branch from the pageserver and initialize timeline on safekeepers. + if !env.safekeepers.is_empty() { + start_all(&[], &env)?; + let main_timeline_id = + pageserver.branch_list(&env.default_tenantid.unwrap())?[0].timeline_id; + safekeepers_create_timeline(env.default_tenantid.unwrap(), main_timeline_id, &env)?; + stop_all(false, &env)?; + } + Ok(()) } @@ -448,6 +459,9 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &local_env::LocalEnv) -> Result println!("using tenant id {}", tenantid); pageserver.tenant_create(tenantid)?; println!("tenant successfully created on the pageserver"); + // Create timeline of new tenant main branch on safekeepers. + let timeline_id = pageserver.branch_list(&tenantid)?[0].timeline_id; + safekeepers_create_timeline(env.default_tenantid.unwrap(), timeline_id, env)?; } Some((sub_name, _)) => bail!("Unexpected tenant subcommand '{}'", sub_name), None => bail!("no tenant subcommand provided"), @@ -469,6 +483,7 @@ fn handle_branch(branch_match: &ArgMatches, env: &local_env::LocalEnv) -> Result "Created branch '{}' at {:?} for tenant: {}", branch.name, branch.latest_valid_lsn, tenantid, ); + safekeepers_create_timeline(tenantid, branch.timeline_id, env)?; } else { // No arguments, list branches for tenant let branches = pageserver.branch_list(&tenantid)?; @@ -687,12 +702,30 @@ fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul Ok(()) } +/// Create timeline on all safekeepers from env; they must be already running. +fn safekeepers_create_timeline( + tenant_id: ZTenantId, + timeline_id: ZTimelineId, + env: &local_env::LocalEnv, +) -> Result<()> { + let peer_ids: Vec = env.safekeepers.iter().map(|s| s.id).collect(); + for node in env.safekeepers.iter() { + let safekeeper = SafekeeperNode::from_env(env, node); + safekeeper.timeline_create(tenant_id, timeline_id, peer_ids.clone())?; + } + Ok(()) +} + fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { + start_all(&pageserver_config_overrides(sub_match), env) +} + +fn start_all(config_overrides: &[&str], env: &local_env::LocalEnv) -> Result<()> { let pageserver = PageServerNode::from_env(env); // Postgres nodes are not started automatically - if let Err(e) = pageserver.start(&pageserver_config_overrides(sub_match)) { + if let Err(e) = pageserver.start(config_overrides) { eprintln!("pageserver start failed: {}", e); exit(1); } @@ -708,8 +741,10 @@ fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result } fn handle_stop_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { - let immediate = sub_match.value_of("stop-mode") == Some("immediate"); + stop_all(sub_match.value_of("stop-mode") == Some("immediate"), env) +} +fn stop_all(immediate: bool, env: &local_env::LocalEnv) -> Result<()> { let pageserver = PageServerNode::from_env(env); // Stop all compute nodes diff --git a/zenith_utils/src/zid.rs b/zenith_utils/src/zid.rs index d8536797329a..2dedd1f69a7c 100644 --- a/zenith_utils/src/zid.rs +++ b/zenith_utils/src/zid.rs @@ -213,6 +213,10 @@ impl ZTenantTimelineId { pub fn generate() -> Self { Self::new(ZTenantId::generate(), ZTimelineId::generate()) } + + pub fn empty() -> Self { + Self::new(ZTenantId::from([0u8; 16]), ZTimelineId::from([0u8; 16])) + } } impl fmt::Display for ZTenantTimelineId {