Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch Arc<Node> for triomphe::Arc<Node> #793

Merged
merged 1 commit into from
Feb 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions benchmark/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ If you're looking for detailed logging, there are some command line options to e
cargo run --profile release --bin benchmark -- -l debug -n 10000 single
```

# Using opentelemetry
## Using opentelemetry

To use the opentelemetry server and record timings, just run a docker image that collects the data using:

Expand All @@ -178,6 +178,3 @@ docker run -p 127.0.0.1:4318:4318 -p 127.0.0.1:55679:55679 otel/openteleme
```

Then, pass the `-e` option to the benchmark.
```

Then, pass the `-e` option to the benchmark.
14 changes: 7 additions & 7 deletions firewood/src/merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use std::num::NonZeroUsize;
use std::sync::Arc;
use storage::{
BranchNode, Child, Hashable, HashedNodeReader, ImmutableProposal, LeafNode, LinearAddress,
MutableProposal, NibblesIterator, Node, NodeStore, Path, ReadableStorage, TrieHash, TrieReader,
ValueDigest,
MutableProposal, NibblesIterator, Node, NodeStore, Path, ReadableStorage, SharedNode, TrieHash,
TrieReader, ValueDigest,
};

use thiserror::Error;
Expand Down Expand Up @@ -93,7 +93,7 @@ fn get_helper<T: TrieReader>(
nodestore: &T,
node: &Node,
key: &[u8],
) -> Result<Option<Arc<Node>>, MerkleError> {
) -> Result<Option<SharedNode>, MerkleError> {
// 4 possibilities for the position of the `key` relative to `node`:
// 1. The node is at `key`
// 2. The key is above the node (i.e. its ancestor)
Expand All @@ -111,7 +111,7 @@ fn get_helper<T: TrieReader>(
// Case (2) or (4)
Ok(None)
}
(None, None) => Ok(Some(Arc::new(node.clone()))), // 1. The node is at `key`
(None, None) => Ok(Some(node.clone().into())), // 1. The node is at `key`
(Some((child_index, remaining_key)), None) => {
// 3. The key is below the node (i.e. its descendant)
match node {
Expand Down Expand Up @@ -152,7 +152,7 @@ impl<T> From<T> for Merkle<T> {
}

impl<T: TrieReader> Merkle<T> {
pub(crate) fn root(&self) -> Option<Arc<Node>> {
pub(crate) fn root(&self) -> Option<SharedNode> {
self.nodestore.root_node()
}

Expand All @@ -161,7 +161,7 @@ impl<T: TrieReader> Merkle<T> {
&self.nodestore
}

fn read_node(&self, addr: LinearAddress) -> Result<Arc<Node>, MerkleError> {
fn read_node(&self, addr: LinearAddress) -> Result<SharedNode, MerkleError> {
self.nodestore.read_node(addr).map_err(Into::into)
}

Expand Down Expand Up @@ -335,7 +335,7 @@ impl<T: TrieReader> Merkle<T> {
Ok(node.value().map(|v| v.to_vec().into_boxed_slice()))
}

pub(crate) fn get_node(&self, key: &[u8]) -> Result<Option<Arc<Node>>, MerkleError> {
pub(crate) fn get_node(&self, key: &[u8]) -> Result<Option<SharedNode>, MerkleError> {
let Some(root) = self.root() else {
return Ok(None);
};
Expand Down
17 changes: 9 additions & 8 deletions firewood/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,16 @@ use futures::stream::FusedStream;
use futures::{Stream, StreamExt};
use std::cmp::Ordering;
use std::iter::once;
use std::sync::Arc;
use std::task::Poll;
use storage::{BranchNode, Child, NibblesIterator, Node, PathIterItem, TrieReader};
use storage::{BranchNode, Child, NibblesIterator, Node, PathIterItem, SharedNode, TrieReader};

/// Represents an ongoing iteration over a node and its children.
enum IterationNode {
/// This node has not been returned yet.
Unvisited {
/// The key (as nibbles) of this node.
key: Key,
node: Arc<Node>,
node: SharedNode,
},
/// This node has been returned. Track which child to visit next.
Visited {
Expand Down Expand Up @@ -94,7 +93,7 @@ impl<'a, T: TrieReader> MerkleNodeStream<'a, T> {
}

impl<T: TrieReader> Stream for MerkleNodeStream<'_, T> {
type Item = Result<(Key, Arc<Node>), api::Error>;
type Item = Result<(Key, SharedNode), api::Error>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
Expand Down Expand Up @@ -141,7 +140,7 @@ impl<T: TrieReader> Stream for MerkleNodeStream<'_, T> {

let child = match child {
Child::AddressWithHash(addr, _) => merkle.read_node(addr)?,
Child::Node(node) => Arc::new(node.clone()),
Child::Node(node) => node.clone().into(),
};

let child_partial_path = child.partial_path().iter().copied();
Expand Down Expand Up @@ -251,7 +250,7 @@ fn get_iterator_intial_state<T: TrieReader>(
node = match child {
None => return Ok(NodeStreamState::Iterating { iter_stack }),
Some(Child::AddressWithHash(addr, _)) => merkle.read_node(*addr)?,
Some(Child::Node(node)) => Arc::new((*node).clone()), // TODO can we avoid cloning this?
Some(Child::Node(node)) => (*node).clone().into(), // TODO can we avoid cloning this?
};

matched_key_nibbles.push(next_unmatched_key_nibble);
Expand Down Expand Up @@ -374,7 +373,7 @@ enum PathIteratorState<'a> {
/// prefix of the key we're traversing to.
matched_key: Vec<u8>,
unmatched_key: NibblesIterator<'a>,
node: Arc<Node>,
node: SharedNode,
},
Exhausted,
}
Expand Down Expand Up @@ -504,7 +503,7 @@ impl<T: TrieReader> Iterator for PathIterator<'_, '_, T> {
matched_key.push(next_unmatched_key_nibble);

let ret = node.clone();
*node = Arc::new(child.clone());
*node = child.clone().into();

Some(Ok(PathIterItem {
key_nibbles: node_key,
Expand Down Expand Up @@ -584,6 +583,8 @@ fn key_from_nibble_iter<Iter: Iterator<Item = u8>>(mut nibbles: Iter) -> Key {
#[cfg(test)]
#[allow(clippy::indexing_slicing, clippy::unwrap_used)]
mod tests {
use std::sync::Arc;

use storage::{MemStore, MutableProposal, NodeStore};

use crate::merkle::Merkle;
Expand Down
3 changes: 1 addition & 2 deletions storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ bytemuck = "1.7.0"
bytemuck_derive = "1.7.0"
bitfield = "0.18.1"
fastrace = { version = "0.7.4" }
strum = "0.27.0"
strum_macros = "0.27.0"
triomphe = "0.1.14"

[dev-dependencies]
rand = "0.9.0"
Expand Down
6 changes: 4 additions & 2 deletions storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,16 @@ pub use nodestore::{
pub use linear::filebacked::FileBacked;
pub use linear::memory::MemStore;

use strum_macros::VariantArray;
pub use trie_hash::TrieHash;

/// A shared node, which is just a triophe Arc of a node
pub type SharedNode = triomphe::Arc<Node>;

/// The strategy for caching nodes that are read
/// from the storage layer. Generally, we only want to
/// cache write operations, but for some read-heavy workloads
/// you can enable caching of branch reads or all reads.
#[derive(Clone, Debug, VariantArray)]
#[derive(Clone, Debug)]
pub enum CacheReadStrategy {
/// Only cache writes (no reads will be cached)
WritesOnly,
Expand Down
12 changes: 6 additions & 6 deletions storage/src/linear/filebacked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,20 @@ use std::io::{Error, Read};
use std::num::NonZero;
use std::os::unix::fs::FileExt;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::sync::Mutex;

use lru::LruCache;
use metrics::counter;

use crate::{CacheReadStrategy, LinearAddress, Node};
use crate::{CacheReadStrategy, LinearAddress, SharedNode};

use super::{ReadableStorage, WritableStorage};

#[derive(Debug)]
/// A [ReadableStorage] backed by a file
pub struct FileBacked {
fd: File,
cache: Mutex<LruCache<LinearAddress, Arc<Node>>>,
cache: Mutex<LruCache<LinearAddress, SharedNode>>,
free_list_cache: Mutex<LruCache<LinearAddress, Option<LinearAddress>>>,
cache_read_strategy: CacheReadStrategy,
}
Expand Down Expand Up @@ -63,7 +63,7 @@ impl ReadableStorage for FileBacked {
Ok(self.fd.metadata()?.len())
}

fn read_cached_node(&self, addr: LinearAddress) -> Option<Arc<Node>> {
fn read_cached_node(&self, addr: LinearAddress) -> Option<SharedNode> {
let mut guard = self.cache.lock().expect("poisoned lock");
let cached = guard.get(&addr).cloned();
counter!("firewood.cache.node", "type" => if cached.is_some() { "hit" } else { "miss" })
Expand All @@ -82,7 +82,7 @@ impl ReadableStorage for FileBacked {
&self.cache_read_strategy
}

fn cache_node(&self, addr: LinearAddress, node: Arc<Node>) {
fn cache_node(&self, addr: LinearAddress, node: SharedNode) {
match self.cache_read_strategy {
CacheReadStrategy::WritesOnly => {
// we don't cache reads
Expand All @@ -108,7 +108,7 @@ impl WritableStorage for FileBacked {

fn write_cached_nodes<'a>(
&self,
nodes: impl Iterator<Item = (&'a std::num::NonZero<u64>, &'a std::sync::Arc<crate::Node>)>,
nodes: impl Iterator<Item = (&'a std::num::NonZero<u64>, &'a SharedNode)>,
) -> Result<(), Error> {
let mut guard = self.cache.lock().expect("poisoned lock");
for (addr, node) in nodes {
Expand Down
9 changes: 4 additions & 5 deletions storage/src/linear/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@
use std::fmt::Debug;
use std::io::{Error, Read};
use std::num::NonZero;
use std::sync::Arc;

use crate::{CacheReadStrategy, LinearAddress, Node};
use crate::{CacheReadStrategy, LinearAddress, SharedNode};
pub(super) mod filebacked;
pub mod memory;

Expand All @@ -43,7 +42,7 @@ pub trait ReadableStorage: Debug + Sync + Send {
fn size(&self) -> Result<u64, Error>;

/// Read a node from the cache (if any)
fn read_cached_node(&self, _addr: LinearAddress) -> Option<Arc<Node>> {
fn read_cached_node(&self, _addr: LinearAddress) -> Option<SharedNode> {
None
}

Expand All @@ -58,7 +57,7 @@ pub trait ReadableStorage: Debug + Sync + Send {
}

/// Cache a node for future reads
fn cache_node(&self, _addr: LinearAddress, _node: Arc<Node>) {}
fn cache_node(&self, _addr: LinearAddress, _node: SharedNode) {}
}

/// Trait for writable storage.
Expand All @@ -78,7 +77,7 @@ pub trait WritableStorage: ReadableStorage {
/// Write all nodes to the cache (if any)
fn write_cached_nodes<'a>(
&self,
_nodes: impl Iterator<Item = (&'a NonZero<u64>, &'a Arc<Node>)>,
_nodes: impl Iterator<Item = (&'a NonZero<u64>, &'a SharedNode)>,
) -> Result<(), Error> {
Ok(())
}
Expand Down
20 changes: 2 additions & 18 deletions storage/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use std::io::{Error, ErrorKind, Read, Write};
use std::num::NonZero;
use std::sync::Arc;
use std::vec;

mod branch;
Expand All @@ -18,7 +17,7 @@ pub mod path;
pub use branch::{BranchNode, Child};
pub use leaf::LeafNode;

use crate::Path;
use crate::{Path, SharedNode};

/// A node, either a Branch or Leaf

Expand Down Expand Up @@ -177,21 +176,6 @@ impl Node {
}
}

/// Returns a new `Arc<Node>` which is the same as `self` but with the given `partial_path`.
pub fn new_with_partial_path(self: &Node, partial_path: Path) -> Node {
match self {
Node::Branch(b) => Node::Branch(Box::new(BranchNode {
partial_path,
value: b.value.clone(),
children: b.children.clone(),
})),
Node::Leaf(l) => Node::Leaf(LeafNode {
partial_path,
value: l.value.clone(),
}),
}
}

/// Returns Some(value) inside the node, or None if the node is a branch
/// with no value.
pub fn value(&self) -> Option<&[u8]> {
Expand Down Expand Up @@ -454,7 +438,7 @@ pub struct PathIterItem {
/// The key of the node at `address` as nibbles.
pub key_nibbles: Box<[u8]>,
/// A reference to the node
pub node: Arc<Node>,
pub node: SharedNode,
/// The next item returned by the iterator is a child of `node`.
/// Specifically, it's the child at index `next_nibble` in `node`'s
/// children array.
Expand Down
Loading