Skip to content

Commit

Permalink
Explicitly create timeline on safekeepers, storing peer ids.
Browse files Browse the repository at this point in the history
Now timeline on safekeepers must be created with http request before using it;
prevents accidental creation by compute. Knowing peers provides a part of
foundation for peer recovery (calculating min horizons like truncate_lsn for WAL
truncation and commit_lsn for sync-safekeepers replacement) and proper
membership change.

Zenith CLI is adapted accordingly: it creates timelines on pageserver init,
tenant creation and branch creation. Currently it means it can't do these
operations while any safekeeper is down, but eventually safekeeper should get
info about assignd timelines through centralized broker.
  • Loading branch information
arssher committed Feb 16, 2022
1 parent 95fab10 commit 6ba1c8c
Show file tree
Hide file tree
Showing 13 changed files with 309 additions and 147 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions control_plane/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ url = "2.2.2"
reqwest = { version = "0.11", default-features = false, features = ["blocking", "json", "rustls-tls"] }

pageserver = { path = "../pageserver" }
walkeeper = { path = "../walkeeper" }
zenith_utils = { path = "../zenith_utils" }
workspace_hack = { path = "../workspace_hack" }
24 changes: 23 additions & 1 deletion control_plane/src/safekeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ use postgres::Config;
use reqwest::blocking::{Client, RequestBuilder, Response};
use reqwest::{IntoUrl, Method};
use thiserror::Error;
use walkeeper::http::models::TimelineCreateRequest;
use zenith_utils::http::error::HttpErrorBody;
use zenith_utils::zid::ZNodeId;
use zenith_utils::zid::{ZNodeId, ZTenantId, ZTimelineId};

use crate::local_env::{LocalEnv, SafekeeperConf};
use crate::storage::PageServerNode;
Expand Down Expand Up @@ -261,4 +262,25 @@ impl SafekeeperNode {
.error_from_body()?;
Ok(())
}

pub fn timeline_create(
&self,
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
peer_ids: Vec<ZNodeId>,
) -> Result<()> {
Ok(self
.http_request(
Method::POST,
format!("{}/{}", self.http_base_url, "timeline"),
)
.json(&TimelineCreateRequest {
tenant_id,
timeline_id,
peer_ids,
})
.send()?
.error_from_body()?
.json()?)
}
}
4 changes: 2 additions & 2 deletions walkeeper/src/bin/safekeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::io::{ErrorKind, Write};
use std::path::{Path, PathBuf};
use std::thread;
use tracing::*;
use walkeeper::timeline::{CreateControlFile, FileStorage};
use walkeeper::timeline::FileStorage;
use zenith_utils::http::endpoint;
use zenith_utils::zid::ZNodeId;
use zenith_utils::{logging, tcp_listener, GIT_VERSION};
Expand Down Expand Up @@ -102,7 +102,7 @@ fn main() -> Result<()> {
.get_matches();

if let Some(addr) = arg_matches.value_of("dump-control-file") {
let state = FileStorage::load_control_file(Path::new(addr), CreateControlFile::False)?;
let state = FileStorage::load_control_file(Path::new(addr))?;
let json = serde_json::to_string(&state)?;
print!("{}", json);
return Ok(());
Expand Down
15 changes: 2 additions & 13 deletions walkeeper/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use zenith_utils::pq_proto::{BeMessage, FeStartupPacket, RowDescriptor, INT4_OID
use zenith_utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId};

use crate::callmemaybe::CallmeEvent;
use crate::timeline::CreateControlFile;
use tokio::sync::mpsc::UnboundedSender;

/// Safekeeper handler of postgres commands
Expand Down Expand Up @@ -109,18 +108,8 @@ impl postgres_backend::Handler for SafekeeperPostgresHandler {
let tenantid = self.ztenantid.context("tenantid is required")?;
let timelineid = self.ztimelineid.context("timelineid is required")?;
if self.timeline.is_none() {
// START_WAL_PUSH is the only command that initializes the timeline in production.
// There is also JSON_CTRL command, which should initialize the timeline for testing.
let create_control_file = match cmd {
SafekeeperPostgresCommand::StartWalPush { .. }
| SafekeeperPostgresCommand::JSONCtrl { .. } => CreateControlFile::True,
_ => CreateControlFile::False,
};
self.timeline.set(
&self.conf,
ZTenantTimelineId::new(tenantid, timelineid),
create_control_file,
)?;
self.timeline
.set(&self.conf, ZTenantTimelineId::new(tenantid, timelineid))?;
}
}
}
Expand Down
1 change: 1 addition & 0 deletions walkeeper/src/http/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod models;
pub mod routes;
pub use routes::make_router;
9 changes: 9 additions & 0 deletions walkeeper/src/http/models.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
use serde::{Deserialize, Serialize};
use zenith_utils::zid::{ZNodeId, ZTenantId, ZTimelineId};

#[derive(Serialize, Deserialize)]
pub struct TimelineCreateRequest {
pub tenant_id: ZTenantId,
pub timeline_id: ZTimelineId,
pub peer_ids: Vec<ZNodeId>,
}
22 changes: 19 additions & 3 deletions walkeeper/src/http/routes.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
use hyper::{Body, Request, Response, StatusCode};

use serde::Serialize;
use serde::Serializer;
use std::fmt::Display;
use std::sync::Arc;
use zenith_utils::http::json::json_request;
use zenith_utils::http::{RequestExt, RouterBuilder};
use zenith_utils::lsn::Lsn;
use zenith_utils::zid::ZNodeId;
use zenith_utils::zid::ZTenantTimelineId;

use crate::safekeeper::Term;
use crate::safekeeper::TermHistory;
use crate::timeline::CreateControlFile;
use crate::timeline::GlobalTimelines;
use crate::SafeKeeperConf;
use zenith_utils::http::endpoint;
Expand All @@ -19,6 +20,8 @@ use zenith_utils::http::json::json_response;
use zenith_utils::http::request::parse_request_param;
use zenith_utils::zid::{ZTenantId, ZTimelineId};

use super::models::TimelineCreateRequest;

#[derive(Debug, Serialize)]
struct SafekeeperStatus {
id: ZNodeId,
Expand Down Expand Up @@ -78,8 +81,7 @@ async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body
parse_request_param(&request, "timeline_id")?,
);

let tli = GlobalTimelines::get(get_conf(&request), zttid, CreateControlFile::False)
.map_err(ApiError::from_err)?;
let tli = GlobalTimelines::get(get_conf(&request), zttid).map_err(ApiError::from_err)?;
let sk_state = tli.get_info();
let flush_lsn = tli.get_end_of_wal();

Expand All @@ -100,6 +102,19 @@ async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body
Ok(json_response(StatusCode::OK, status)?)
}

async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
let request_data: TimelineCreateRequest = json_request(&mut request).await?;

let zttid = ZTenantTimelineId {
tenant_id: request_data.tenant_id,
timeline_id: request_data.timeline_id,
};
GlobalTimelines::create(get_conf(&request), zttid, request_data.peer_ids)
.map_err(ApiError::from_err)?;

Ok(json_response(StatusCode::CREATED, ())?)
}

/// Safekeeper http router.
pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError> {
let router = endpoint::make_router();
Expand All @@ -110,4 +125,5 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
"/v1/timeline/:tenant_id/:timeline_id",
timeline_status_handler,
)
.post("/v1/timeline", timeline_create_handler)
}
95 changes: 71 additions & 24 deletions walkeeper/src/safekeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use std::cmp::min;
use std::fmt;
use std::io::Read;
use tracing::*;
use zenith_utils::zid::ZNodeId;
use zenith_utils::zid::ZTenantTimelineId;

use lazy_static::lazy_static;

Expand All @@ -26,12 +28,13 @@ use zenith_utils::pq_proto::ZenithFeedback;
use zenith_utils::zid::{ZTenantId, ZTimelineId};

pub const SK_MAGIC: u32 = 0xcafeceefu32;
pub const SK_FORMAT_VERSION: u32 = 3;
pub const SK_FORMAT_VERSION: u32 = 4;
const SK_PROTOCOL_VERSION: u32 = 1;
const UNKNOWN_SERVER_VERSION: u32 = 0;

/// Consensus logical timestamp.
pub type Term = u64;
const INVALID_TERM: Term = 0;

#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct TermSwitchEntry {
Expand Down Expand Up @@ -129,18 +132,47 @@ pub struct ServerInfo {
/// Postgres server version
pub pg_version: u32,
pub system_id: SystemId,
#[serde(with = "hex")]
pub tenant_id: ZTenantId,
/// Zenith timelineid
#[serde(with = "hex")]
pub timeline_id: ZTimelineId,
pub wal_seg_size: u32,
}

/// Data published by safekeeper to the peers
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PeerInfo {
/// LSN up to which safekeeper offloaded WAL to s3.
s3_wal_lsn: Lsn,
/// Term of the last entry.
term: Term,
/// LSN of the last record.
flush_lsn: Lsn,
/// Up to which LSN safekeeper regards its WAL as committed.
commit_lsn: Lsn,
}

impl PeerInfo {
fn new() -> Self {
Self {
s3_wal_lsn: Lsn(0),
term: INVALID_TERM,
flush_lsn: Lsn(0),
commit_lsn: Lsn(0),
}
}
}

// vector-based node id -> peer state map with very limited functionality we
// need/
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Peers(pub Vec<(ZNodeId, PeerInfo)>);

/// Persistent information stored on safekeeper node
/// On disk data is prefixed by magic and format version and followed by checksum.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SafeKeeperState {
#[serde(with = "hex")]
pub tenant_id: ZTenantId,
/// Zenith timelineid
#[serde(with = "hex")]
pub timeline_id: ZTimelineId,
/// persistent acceptor state
pub acceptor_state: AcceptorState,
/// information about server
Expand All @@ -157,33 +189,38 @@ pub struct SafeKeeperState {
// Safekeeper starts receiving WAL from this LSN, zeros before it ought to
// be skipped during decoding.
pub wal_start_lsn: Lsn,
// Peers and their state as we remember it. Knowing peers themselves is
// fundamental; but state is saved here only for informational purposes and
// obviously can be stale. (Currently not saved at all, but let's provision
// place to have less file version upgrades).
pub peers: Peers,
}

impl SafeKeeperState {
pub fn new() -> SafeKeeperState {
pub fn new(zttid: &ZTenantTimelineId, peers: Vec<ZNodeId>) -> SafeKeeperState {
SafeKeeperState {
tenant_id: zttid.tenant_id,
timeline_id: zttid.timeline_id,
acceptor_state: AcceptorState {
term: 0,
term_history: TermHistory::empty(),
},
server: ServerInfo {
pg_version: UNKNOWN_SERVER_VERSION, /* Postgres server version */
system_id: 0, /* Postgres system identifier */
tenant_id: ZTenantId::from([0u8; 16]),
timeline_id: ZTimelineId::from([0u8; 16]),
wal_seg_size: 0,
},
proposer_uuid: [0; 16],
commit_lsn: Lsn(0), /* part of WAL acknowledged by quorum */
truncate_lsn: Lsn(0), /* minimal LSN which may be needed for recovery of some safekeeper */
wal_start_lsn: Lsn(0),
peers: Peers(peers.iter().map(|p| (*p, PeerInfo::new())).collect()),
}
}
}

impl Default for SafeKeeperState {
fn default() -> Self {
Self::new()
#[cfg(test)]
pub fn empty() -> Self {
SafeKeeperState::new(&ZTenantTimelineId::empty(), vec![])
}
}

Expand Down Expand Up @@ -502,10 +539,8 @@ where
storage: ST,
state: SafeKeeperState,
) -> SafeKeeper<ST> {
if state.server.timeline_id != ZTimelineId::from([0u8; 16])
&& ztli != state.server.timeline_id
{
panic!("Calling SafeKeeper::new with inconsistent ztli ({}) and SafeKeeperState.server.timeline_id ({})", ztli, state.server.timeline_id);
if ztli != state.timeline_id {
panic!("Calling SafeKeeper::new with inconsistent ztli ({}) and SafeKeeperState.timeline_id ({})", ztli, state.timeline_id);
}
SafeKeeper {
flush_lsn,
Expand Down Expand Up @@ -570,18 +605,30 @@ where
msg.pg_version, self.s.server.pg_version
);
}
if msg.tenant_id != self.s.tenant_id {
bail!(
"invalid tenant ID, got {}, expected {}",
msg.tenant_id,
self.s.tenant_id
);
}
if msg.ztli != self.s.timeline_id {
bail!(
"invalid timeline ID, got {}, expected {}",
msg.ztli,
self.s.timeline_id
);
}

// set basic info about server, if not yet
self.s.server.system_id = msg.system_id;
self.s.server.tenant_id = msg.tenant_id;
self.s.server.timeline_id = msg.ztli;
self.s.server.wal_seg_size = msg.wal_seg_size;
self.storage
.persist(&self.s)
.context("failed to persist shared state")?;

self.metrics = SafeKeeperMetricsBuilder {
ztli: self.s.server.timeline_id,
ztli: self.s.timeline_id,
flush_lsn: self.flush_lsn,
commit_lsn: self.commit_lsn,
}
Expand Down Expand Up @@ -813,10 +860,10 @@ mod tests {
#[test]
fn test_voting() {
let storage = InMemoryStorage {
persisted_state: SafeKeeperState::new(),
persisted_state: SafeKeeperState::empty(),
};
let ztli = ZTimelineId::from([0u8; 16]);
let mut sk = SafeKeeper::new(ztli, Lsn(0), storage, SafeKeeperState::new());
let mut sk = SafeKeeper::new(ztli, Lsn(0), storage, SafeKeeperState::empty());

// check voting for 1 is ok
let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: 1 });
Expand Down Expand Up @@ -844,10 +891,10 @@ mod tests {
#[test]
fn test_epoch_switch() {
let storage = InMemoryStorage {
persisted_state: SafeKeeperState::new(),
persisted_state: SafeKeeperState::empty(),
};
let ztli = ZTimelineId::from([0u8; 16]);
let mut sk = SafeKeeper::new(ztli, Lsn(0), storage, SafeKeeperState::new());
let mut sk = SafeKeeper::new(ztli, Lsn(0), storage, SafeKeeperState::empty());

let mut ar_hdr = AppendRequestHeader {
term: 1,
Expand Down
Loading

0 comments on commit 6ba1c8c

Please sign in to comment.