Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor pagecache <-> Wal redo communication #60

Merged
merged 1 commit into from
Apr 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion pageserver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ edition = "2018"

[dependencies]
chrono = "0.4.19"
crossbeam-channel = "0.5.0"
rand = "0.8.3"
regex = "1.4.5"
bytes = "1.0.1"
Expand Down
127 changes: 39 additions & 88 deletions pageserver/src/page_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,22 @@
use crate::restore_local_repo::restore_timeline;
use crate::waldecoder::Oid;
use crate::ZTimelineId;
use crate::{walredo, zenith_repo_dir, PageServerConf};
use crate::{zenith_repo_dir, PageServerConf};
use anyhow::{bail, Context};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use crossbeam_channel::unbounded;
use crossbeam_channel::{Receiver, Sender};
use lazy_static::lazy_static;
use log::*;
use rocksdb;
use std::cmp::min;
use std::collections::HashMap;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Condvar, Mutex};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};
use std::{convert::TryInto, ops::AddAssign};
use zenith_utils::seqwait::SeqWait;
use crate::walredo::WalRedoManager;

// Timeout when waiting or WAL receiver to catch up to an LSN given in a GetPage@LSN call.
static TIMEOUT: Duration = Duration::from_secs(60);
Expand All @@ -34,9 +33,8 @@ pub struct PageCache {
// RocksDB handle
db: rocksdb::DB,

// Channel for communicating with the WAL redo process here.
pub walredo_sender: Sender<Arc<CacheEntry>>,
pub walredo_receiver: Receiver<Arc<CacheEntry>>,
// WAL redo manager
walredo_mgr: WalRedoManager,

// Allows .await on the arrival of a particular LSN.
seqwait_lsn: SeqWait,
Expand Down Expand Up @@ -131,20 +129,11 @@ pub fn get_or_restore_pagecache(

let result = Arc::new(pcache);

// Launch the WAL redo thread
result.walredo_mgr.launch(result.clone());

pcaches.insert(timelineid, result.clone());

// Initialize the WAL redo thread
//
// Now join_handle is not saved any where and we won'try restart tharead
// if it is dead. We may later stop that treads after some inactivity period
// and restart them on demand.
let conf_copy = conf.clone();
let _walredo_thread = thread::Builder::new()
.name("WAL redo thread".into())
.spawn(move || {
walredo::wal_redo_main(&conf_copy, timelineid);
})
.unwrap();
if conf.gc_horizon != 0 {
let conf_copy = conf.clone();
let _gc_thread = thread::Builder::new()
Expand All @@ -162,7 +151,10 @@ pub fn get_or_restore_pagecache(
fn gc_thread_main(conf: &PageServerConf, timelineid: ZTimelineId) {
info!("Garbage collection thread started {}", timelineid);
let pcache = get_pagecache(conf, timelineid).unwrap();
pcache.do_gc(conf).unwrap();

let runtime = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();

runtime.block_on(pcache.do_gc(conf)).unwrap();
}

fn open_rocksdb(_conf: &PageServerConf, timelineid: ZTimelineId) -> rocksdb::DB {
Expand All @@ -176,20 +168,19 @@ fn open_rocksdb(_conf: &PageServerConf, timelineid: ZTimelineId) -> rocksdb::DB
}

fn init_page_cache(conf: &PageServerConf, timelineid: ZTimelineId) -> PageCache {
// Initialize the channel between the page cache and the WAL applicator
let (s, r) = unbounded();

PageCache {
db: open_rocksdb(&conf, timelineid),
shared: Mutex::new(PageCacheShared {
first_valid_lsn: 0,
last_valid_lsn: 0,
last_record_lsn: 0,
}),
seqwait_lsn: SeqWait::new(0),

walredo_sender: s,
walredo_receiver: r,
db: open_rocksdb(&conf, timelineid),

walredo_mgr: WalRedoManager::new(conf, timelineid),

seqwait_lsn: SeqWait::new(0),

num_entries: AtomicU64::new(0),
num_page_images: AtomicU64::new(0),
Expand Down Expand Up @@ -234,19 +225,6 @@ impl CacheKey {
}
}

pub struct CacheEntry {
pub key: CacheKey,

pub content: Mutex<CacheEntryContent>,

// Condition variable used by the WAL redo service, to wake up
// requester.
//
// FIXME: this takes quite a lot of space. Consider using parking_lot::Condvar
// or something else.
pub walredo_condvar: Condvar,
}

pub struct CacheEntryContent {
pub page_image: Option<Bytes>,
pub wal_record: Option<WALRecord>,
Expand Down Expand Up @@ -283,16 +261,6 @@ impl CacheEntryContent {
}
}

impl CacheEntry {
fn new(key: CacheKey, content: CacheEntryContent) -> CacheEntry {
CacheEntry {
key,
content: Mutex::new(content),
walredo_condvar: Condvar::new(),
}
}
}

#[derive(Debug, PartialEq, Eq, PartialOrd, Hash, Ord, Clone, Copy)]
pub struct RelTag {
pub spcnode: u32,
Expand Down Expand Up @@ -378,7 +346,8 @@ impl WALRecord {
// Public interface functions

impl PageCache {
fn do_gc(&self, conf: &PageServerConf) -> anyhow::Result<Bytes> {
async fn do_gc(&self, conf: &PageServerConf) -> anyhow::Result<Bytes> {

let mut minbuf = BytesMut::new();
let mut maxbuf = BytesMut::new();
let cf = self
Expand Down Expand Up @@ -429,10 +398,10 @@ impl PageCache {
minkey.lsn = 0;

// reconstruct most recent page version
if content.wal_record.is_some() {
trace!("Reconstruct most recent page {:?}", key);
if let Some(rec) = content.wal_record {
trace!("Reconstruct most recent page {:?}", key);
// force reconstruction of most recent page version
self.reconstruct_page(key, content)?;
self.walredo_mgr.request_redo(key.tag, rec.lsn).await?;
reconstructed += 1;
}

Expand All @@ -449,12 +418,13 @@ impl PageCache {
minbuf.clear();
minbuf.extend_from_slice(&v);
let content = CacheEntryContent::unpack(&mut minbuf);
if content.wal_record.is_some() {
if let Some(rec) = content.wal_record {
minbuf.clear();
minbuf.extend_from_slice(&k);
let key = CacheKey::unpack(&mut minbuf);

trace!("Reconstruct horizon page {:?}", key);
self.reconstruct_page(key, content)?;
self.walredo_mgr.request_redo(key.tag, rec.lsn).await?;
truncated += 1;
}
}
Expand All @@ -475,31 +445,6 @@ impl PageCache {
}
}

fn reconstruct_page(&self, key: CacheKey, content: CacheEntryContent) -> anyhow::Result<Bytes> {
let entry_rc = Arc::new(CacheEntry::new(key.clone(), content));

let mut entry_content = entry_rc.content.lock().unwrap();
entry_content.apply_pending = true;

let s = &self.walredo_sender;
s.send(entry_rc.clone())?;

while entry_content.apply_pending {
entry_content = entry_rc.walredo_condvar.wait(entry_content).unwrap();
}
// We should now have a page image. If we don't, it means that WAL redo
// failed to reconstruct it. WAL redo should've logged that error already.
let page_img = match &entry_content.page_image {
Some(p) => p.clone(),
None => {
error!("could not apply WAL to reconstruct page image for GetPage@LSN request");
bail!("could not apply WAL to reconstruct page image");
}
};
self.put_page_image(key.tag, key.lsn, page_img.clone());
Ok(page_img)
}

async fn wait_lsn(&self, req_lsn: u64) -> anyhow::Result<u64> {
let mut lsn = req_lsn;
//When invalid LSN is requested, it means "don't wait, return latest version of the page"
Expand Down Expand Up @@ -561,18 +506,17 @@ impl PageCache {
return Ok(Bytes::from_static(&ZERO_PAGE));
/* return Err("could not find page image")?; */
}
let (k, v) = entry_opt.unwrap();
let (_k, v) = entry_opt.unwrap();
buf.clear();
buf.extend_from_slice(&v);
let content = CacheEntryContent::unpack(&mut buf);
let page_img: Bytes;
if let Some(img) = &content.page_image {
page_img = img.clone();
} else if content.wal_record.is_some() {
buf.clear();
buf.extend_from_slice(&k);
let key = CacheKey::unpack(&mut buf);
page_img = self.reconstruct_page(key, content)?;

// Request the WAL redo manager to apply the WAL records for us.
page_img = self.walredo_mgr.request_redo(tag, lsn).await?;
} else {
// No base image, and no WAL record. Huh?
bail!("no page image or WAL record for requested page");
Expand Down Expand Up @@ -602,10 +546,10 @@ impl PageCache {
// Returns an old page image (if any), and a vector of WAL records to apply
// over it.
//
pub fn collect_records_for_apply(&self, entry: &CacheEntry) -> (Option<Bytes>, Vec<WALRecord>) {
pub fn collect_records_for_apply(&self, tag: BufferTag, lsn: u64) -> (Option<Bytes>, Vec<WALRecord>) {
let minkey = CacheKey {
tag: BufferTag {
rel: entry.key.tag.rel,
rel: tag.rel,
blknum: 0,
},
lsn: 0,
Expand All @@ -617,8 +561,15 @@ impl PageCache {
let mut readopts = rocksdb::ReadOptions::default();
readopts.set_iterate_lower_bound(buf.to_vec());

let key = CacheKey {
tag: BufferTag {
rel: tag.rel,
blknum: tag.blknum,
},
lsn: lsn,
};
buf.clear();
entry.key.pack(&mut buf);
key.pack(&mut buf);
let iter = self.db.iterator_opt(
rocksdb::IteratorMode::From(&buf[..], rocksdb::Direction::Reverse),
readopts,
Expand Down
Loading