Skip to content

Commit

Permalink
store: use a BTreeMap for RecentCachedBlocks
Browse files Browse the repository at this point in the history
This commit significatly alters the design of RecentCachedBlocks. Thes
most prominent changes are:

1. We don't require a `.set_chain_head` call anymore. Block insertion
   and chain head update attempt are now the same thing.
2. We don't evict all items in the cache anymore every time the chain
   head advances.
3. Unlike the previous data structure, we are now limited to storing a
   contiguous range of blocks in the cache. This is not really a
   drawback (as the cache contents will usually be identical, i.e. the
   last N blocks before the chain head), but it's worth pointing out.
  • Loading branch information
neysofu committed Dec 13, 2022
1 parent 27b205c commit cc866b6
Showing 1 changed file with 111 additions and 72 deletions.
183 changes: 111 additions & 72 deletions store/postgres/src/chain_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1352,6 +1352,12 @@ pub struct ChainStore {
genesis_block_ptr: BlockPtr,
status: ChainStatus,
chain_head_update_sender: ChainHeadUpdateSender,
// TODO: We currently only use this cache for
// [`ChainStore::ancestor_block`], but it could very well be expanded to
// also track the network's chain head and generally improve its hit rate.
// It is, however, quite challenging to keep the cache perfectly consistent
// with the database and to correctly implement invalidation. So, a
// conservative approach is acceptable.
recent_blocks_cache: RecentBlocksCache,
}

Expand Down Expand Up @@ -1493,6 +1499,12 @@ impl ChainStoreTrait for ChainStore {
}

async fn upsert_block(&self, block: Arc<dyn Block>) -> Result<(), Error> {
// We should always have the parent block available to us at this point.
if let Some(parent_hash) = block.parent_hash() {
self.recent_blocks_cache
.insert_block(block.ptr(), block.data().ok(), parent_hash);
}

let pool = self.pool.clone();
let network = self.chain.clone();
let storage = self.storage.clone();
Expand Down Expand Up @@ -1605,10 +1617,6 @@ impl ChainStoreTrait for ChainStore {
_ => unreachable!(),
})
.and_then(|opt: Option<BlockPtr>| opt)
.map(|head| {
self.recent_blocks_cache.set_chain_head(head.clone());
head
})
})
.map_err(|e| CancelableError::from(StoreError::from(e)))
})
Expand Down Expand Up @@ -1654,7 +1662,6 @@ impl ChainStoreTrait for ChainStore {

//this will send an update via postgres, channel: chain_head_updates
self.chain_head_update_sender.send(&hash, number)?;
self.recent_blocks_cache.set_chain_head(ptr);

pool.with_conn(move |conn, _| {
conn.transaction(|| -> Result<(), StoreError> {
Expand Down Expand Up @@ -1697,13 +1704,13 @@ impl ChainStoreTrait for ChainStore {
);

// Check the local cache first.
if let Some(data) = self.recent_blocks_cache.get_ancestor(&block_ptr, offset) {
if let Some(data) = self.recent_blocks_cache.get_block(&block_ptr, offset) {
return Ok(data.1.clone());
}

let block_ptr_clone = block_ptr.clone();
let chain_store = self.cheap_clone();
let block_data = self
Ok(self
.pool
.with_conn(move |conn, _| {
chain_store
Expand All @@ -1712,16 +1719,8 @@ impl ChainStoreTrait for ChainStore {
.map_err(StoreError::from)
.map_err(CancelableError::from)
})
.await?;

if let Some((data, ptr)) = block_data {
// Update the local cache.
self.recent_blocks_cache.set_ancestor(block_ptr, ptr, &data);

Ok(Some(data))
} else {
Ok(None)
}
.await?
.map(|b| b.0))
}

fn cleanup_cached_blocks(
Expand Down Expand Up @@ -1844,68 +1843,106 @@ impl ChainStoreTrait for ChainStore {

mod recent_blocks_cache {
use super::*;
use std::collections::BTreeMap;

struct CachedBlock {
ptr: BlockPtr,
data: Option<json::Value>,
parent_hash: BlockHash,
}

struct Inner {
chain_head: Option<BlockNumber>,
blocks: HashMap<BlockNumber, (BlockPtr, Option<json::Value>)>,
// Note: At all times, only a continuous range of blocks will be in the
// cache, i.e. without gaps. This constraint makes it possible to easily
// verify a direct line of ancestry between two blocks via parent hashes.
blocks: BTreeMap<BlockNumber, CachedBlock>,
// We only store these many blocks.
capacity: usize,
}

impl Inner {
fn set_chain_head(&mut self, chain_head: BlockPtr) {
self.chain_head = Some(chain_head.number);
// We invalidate the entire cache when the chain head changes.
self.blocks.clear();
self.blocks.insert(chain_head.number, (chain_head, None));
}

fn get_ancestor(
fn get_block(
&self,
child: &BlockPtr,
offset: BlockNumber,
) -> Option<(BlockPtr, Option<json::Value>)> {
let child_is_cached = match self.blocks.get(&child.number) {
Some((ptr, _)) => ptr.hash == child.hash,
_ => false,
};
) -> Option<(&BlockPtr, Option<&json::Value>)> {
// Before we can go find the ancestor, we need to make sure that
// we're looking for the ancestor of the right block, i.e. check if
// the hash (and number) of the child matches.
let child_is_cached = &self.blocks.get(&child.number)?.ptr == child;

if child_is_cached {
self.blocks.get(&(child.number - offset)).cloned()
let ancestor_block_number = child.number - offset;
let block = self.blocks.get(&ancestor_block_number)?;
Some((&block.ptr, block.data.as_ref()))
} else {
None
}
}

fn contains_block(&self, ptr: &BlockPtr) -> bool {
let identity_offset = 0;
self.get_ancestor(ptr, identity_offset).is_some()
fn chain_head(&self) -> Option<&BlockPtr> {
self.blocks.iter().next_back().map(|b| &b.1.ptr)
}

/// Tentatively caches the `ancestor` of a [`BlockPtr`], together with
/// its associated `data`.
fn set_ancestor(&mut self, child: BlockPtr, ancestor: BlockPtr, data: &json::Value) {
// We must have a chain head to cache blocks.
let chain_head = if let Some(n) = self.chain_head {
n
} else {
return;
};
fn last_block(&self) -> Option<&CachedBlock> {
self.blocks.iter().next().map(|b| b.1)
}

// We only cache blocks that are close to the chain head.
let offset_from_head = chain_head - ancestor.number;
if offset_from_head >= self.capacity as _ {
return;
fn evict_if_necessary(&mut self) {
while self.blocks.len() > self.capacity && !self.blocks.is_empty() {
let block_num = *self.blocks.iter().next().unwrap().0;
self.blocks.remove(&block_num);
}
}

// We only cache ancestors of blocks that are cached already (chain
// head excluded). Otherwise, we might inadvertently cache blocks
// that don't belong to the main chain, e.g. uncle blocks.
if !self.contains_block(&child) {
fn insert_block(
&mut self,
ptr: BlockPtr,
data: Option<json::Value>,
parent_hash: BlockHash,
) {
let block = CachedBlock {
ptr,
data,
parent_hash,
};

let chain_head = if let Some(ch) = self.chain_head() {
ch
} else {
// We don't have anything in the cache, so we're free to store
// everything we want.
self.blocks.insert(block.ptr.number, block);
return;
};

if chain_head.number == block.ptr.number - 1 && chain_head.hash == block.parent_hash {
// We have a new chain head that is a direct child of our
// previous chain head, so we get to keep all items in the
// cache.
self.blocks.insert(block.ptr.number, block);
} else if block.ptr.number > chain_head.number && block.parent_hash != chain_head.hash {
// We have a new chain head, but it's not a direct child of
// our previous chain head. This means that we must
// invalidate all the items in the cache before inserting
// this block.
self.blocks.clear();
// Try again after clearing the cache.
self.insert_block(block.ptr, block.data, block.parent_hash);
} else {
// Unwrap: we have checked already that the cache is not empty,
// at the beginning of this function body.
let last_block = self.last_block().unwrap();
// Let's check if this is the parent of the last block in the
// cache.
let is_prev_block = last_block.ptr.number == block.ptr.number + 1
&& &last_block.parent_hash == &block.ptr.hash;
if is_prev_block {
self.blocks.insert(block.ptr.number, block);
}
}

self.blocks
.insert(ancestor.number, (ancestor, Some(data.clone())));
self.evict_if_necessary();
}
}

Expand All @@ -1921,41 +1958,43 @@ mod recent_blocks_cache {
pub fn new(capacity: usize) -> Self {
RecentBlocksCache {
inner: RwLock::new(Inner {
chain_head: None,
blocks: HashMap::new(),
blocks: BTreeMap::new(),
capacity,
}),
}
}

pub fn set_chain_head(&self, chain_head: BlockPtr) {
self.inner.write().set_chain_head(chain_head)
}

pub fn chain_head_ptr(&self) -> Option<BlockPtr> {
let inner = self.inner.read();
inner
.blocks
.get(&inner.chain_head?)
.map(|(ptr, _)| ptr.clone())
inner.chain_head().cloned()
}

pub fn clear(&self) {
self.inner.write().blocks.clear();
self.inner.write().blocks.clear()
}

pub fn get_ancestor(
pub fn get_block(
&self,
child: &BlockPtr,
offset: BlockNumber,
) -> Option<(BlockPtr, Option<json::Value>)> {
self.inner.read().get_ancestor(child, offset)
self.inner
.read()
.get_block(child, offset)
.map(|b| (b.0.clone(), b.1.cloned()))
}

/// Inserts this block into the cache, if close enough to the chain
/// head. If not, it's a no-op.
pub fn set_ancestor(&self, child: BlockPtr, ancestor: BlockPtr, data: &json::Value) {
self.inner.write().set_ancestor(child, ancestor, data)
/// Tentatively caches the `ancestor` of a [`BlockPtr`] (`child`), together with
/// its associated `data`. Note that for this to work, `child` must be
/// in the cache already. The first block in the cache should be
/// inserted via [`RecentBlocksCache::set_chain_head`].
pub fn insert_block(
&self,
ptr: BlockPtr,
data: Option<json::Value>,
parent_hash: BlockHash,
) {
self.inner.write().insert_block(ptr, data, parent_hash)
}
}
}
Expand Down

0 comments on commit cc866b6

Please sign in to comment.