From ee5386049ec68e6a004f758c14fd76cd12b06ddc Mon Sep 17 00:00:00 2001 From: arvidn Date: Sun, 7 Jan 2024 19:48:36 +0100 Subject: [PATCH] add support for adding atoms 'zero-copy' by attaching external buffers to the Allocator --- src/allocator.rs | 278 +++++++++++++++++++++++++++++++----- src/run_program.rs | 16 +-- src/serde/object_cache.rs | 2 +- wasm/src/api.rs | 2 +- wasm/src/lazy_node.rs | 6 +- wheel/src/adapt_response.rs | 4 +- wheel/src/lazy_node.rs | 6 +- 7 files changed, 263 insertions(+), 51 deletions(-) diff --git a/src/allocator.rs b/src/allocator.rs index 1d9f08d1..5aea66da 100644 --- a/src/allocator.rs +++ b/src/allocator.rs @@ -8,13 +8,18 @@ const MAX_NUM_ATOMS: usize = 62500000; const MAX_NUM_PAIRS: usize = 62500000; const NODE_PTR_IDX_BITS: u32 = 26; const NODE_PTR_IDX_MASK: u32 = (1 << NODE_PTR_IDX_BITS) - 1; +const MAX_NUM_EXTERNAL_BUFFERS: usize = 3; #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct NodePtr(u32); +#[derive(PartialEq, Eq)] enum ObjectType { Pair, Bytes, + ExternalBytes0, + ExternalBytes1, + ExternalBytes2, } // The top 6 bits of the NodePtr indicate what type of object it is @@ -38,6 +43,9 @@ impl NodePtr { match self.0 >> NODE_PTR_IDX_BITS { 0 => ObjectType::Pair, 1 => ObjectType::Bytes, + 2 => ObjectType::ExternalBytes0, + 3 => ObjectType::ExternalBytes1, + 4 => ObjectType::ExternalBytes2, _ => { panic!("unknown NodePtr type"); } @@ -48,8 +56,11 @@ impl NodePtr { pub(crate) fn as_index(&self) -> usize { match self.node_type() { - (ObjectType::Pair, idx) => idx * 2, - (ObjectType::Bytes, idx) => idx * 2 + 1, + (ObjectType::Pair, idx) => idx * 5, + (ObjectType::Bytes, idx) => idx * 5 + 1, + (ObjectType::ExternalBytes0, idx) => idx * 5 + 2, + (ObjectType::ExternalBytes1, idx) => idx * 5 + 3, + (ObjectType::ExternalBytes2, idx) => idx * 5 + 4, } } } @@ -90,10 +101,14 @@ pub struct Checkpoint { u8s: usize, pairs: usize, atoms: usize, + external_bufs: usize, } +#[derive(Debug, Clone, Copy)] +pub struct BufRef(u8); + #[derive(Debug)] -pub struct Allocator { +pub struct Allocator<'a> { // this is effectively a grow-only stack where atoms are allocated. Atoms // are immutable, so once they are created, they will stay around until the // program completes @@ -107,17 +122,25 @@ pub struct Allocator { // on. atom_vec: Vec, + // some atoms may not be allocated on the heap, but rather added + // "zero-copy" by attaching external buffers to the allocator. e.g. when + // parsing a program, if the program is defined in a buffer that will + // outlive its execution, that buffer can be attached as an external buffer + // and atoms can point directly into it. This is the list of such external + // buffers. + external_buffers: Vec<&'a [u8]>, + // the atom_vec may not grow past this heap_limit: usize, } -impl Default for Allocator { +impl Default for Allocator<'_> { fn default() -> Self { Self::new() } } -impl Allocator { +impl<'a> Allocator<'a> { pub fn new() -> Self { Self::new_limited(u32::MAX as usize) } @@ -130,6 +153,7 @@ impl Allocator { u8_vec: Vec::new(), pair_vec: Vec::new(), atom_vec: Vec::new(), + external_buffers: Vec::new(), heap_limit, }; r.u8_vec.reserve(1024 * 1024); @@ -151,6 +175,7 @@ impl Allocator { u8s: self.u8_vec.len(), pairs: self.pair_vec.len(), atoms: self.atom_vec.len(), + external_bufs: self.external_buffers.len(), } } @@ -162,9 +187,25 @@ impl Allocator { assert!(self.u8_vec.len() >= cp.u8s); assert!(self.pair_vec.len() >= cp.pairs); assert!(self.atom_vec.len() >= cp.atoms); + assert!(self.external_buffers.len() >= cp.external_bufs); self.u8_vec.truncate(cp.u8s); self.pair_vec.truncate(cp.pairs); self.atom_vec.truncate(cp.atoms); + self.external_buffers.truncate(cp.external_bufs); + } + + pub fn add_external_buffer(&mut self, buf: &'a [u8]) -> Option { + let idx = self.external_buffers.len(); + if idx == MAX_NUM_EXTERNAL_BUFFERS { + return None; + } + // the AtomBuf indices are only u32, so can only point into a heap of 4 + // GB + if buf.len() > u32::MAX as usize { + return None; + } + self.external_buffers.push(buf); + Some(BufRef(idx as u8)) } pub fn new_atom(&mut self, v: &[u8]) -> Result { @@ -182,6 +223,34 @@ impl Allocator { Ok(NodePtr::new(ObjectType::Bytes, idx)) } + pub fn new_ext_atom(&mut self, ext: BufRef, v: &[u8]) -> Result { + let idx = self.atom_vec.len(); + if idx == MAX_NUM_ATOMS { + return err(self.nil(), "too many atoms"); + } + + // ensure that v is in fact part of the buffer + let buf = self.external_buffers[ext.0 as usize]; + let buf_range = buf.as_ptr_range(); + let v_range = v.as_ptr_range(); + if buf_range.start > v_range.start || buf_range.end < v_range.end { + panic!("new_ext_atom() called with invalid atom. It must be a sub-slice of the external buffer"); + } + let start = unsafe { v_range.start.offset_from(buf_range.start) } as u32; + let end = unsafe { v_range.end.offset_from(buf_range.start) } as u32; + assert!(start <= end); + self.atom_vec.push(AtomBuf { start, end }); + let obj_type = match ext.0 { + 0 => ObjectType::ExternalBytes0, + 1 => ObjectType::ExternalBytes1, + 2 => ObjectType::ExternalBytes2, + _ => { + panic!("invalid BufRef argument to new_ext_atom()"); + } + }; + Ok(NodePtr::new(obj_type, idx)) + } + pub fn new_number(&mut self, v: Number) -> Result { node_from_number(self, &v) } @@ -207,9 +276,10 @@ impl Allocator { if self.atom_vec.len() == MAX_NUM_ATOMS { return err(self.nil(), "too many atoms"); } - let (ObjectType::Bytes, idx) = node.node_type() else { + let (node_type, idx) = node.node_type(); + if node_type == ObjectType::Pair { return err(node, "(internal error) substr expected atom, got pair"); - }; + } let atom = self.atom_vec[idx]; let atom_len = atom.end - atom.start; if start > atom_len { @@ -226,7 +296,7 @@ impl Allocator { start: atom.start + start, end: atom.start + end, }); - Ok(NodePtr::new(ObjectType::Bytes, idx)) + Ok(NodePtr::new(node_type, idx)) } pub fn new_concat(&mut self, new_size: usize, nodes: &[NodePtr]) -> Result { @@ -241,7 +311,8 @@ impl Allocator { let mut counter: usize = 0; for node in nodes { - let (ObjectType::Bytes, idx) = node.node_type() else { + let (node_type, idx) = node.node_type(); + if node_type == ObjectType::Pair { self.u8_vec.truncate(start); return err(*node, "(internal error) concat expected atom, got pair"); }; @@ -251,8 +322,30 @@ impl Allocator { self.u8_vec.truncate(start); return err(*node, "(internal error) concat passed invalid new_size"); } - self.u8_vec - .extend_from_within(term.start as usize..term.end as usize); + match node_type { + ObjectType::Bytes => { + self.u8_vec + .extend_from_within(term.start as usize..term.end as usize); + } + ObjectType::ExternalBytes0 + | ObjectType::ExternalBytes1 + | ObjectType::ExternalBytes2 => { + let heap = match node_type { + ObjectType::ExternalBytes0 => 0, + ObjectType::ExternalBytes1 => 1, + ObjectType::ExternalBytes2 => 2, + _ => { + panic!("unreachable"); + } + }; + let heap = self.external_buffers[heap]; + let slice = &heap[term.start as usize..term.end as usize]; + self.u8_vec.extend_from_slice(slice); + } + ObjectType::Pair => { + panic!("concat expected atom, got pair"); + } + } counter += term.len(); } if counter != new_size { @@ -276,27 +369,33 @@ impl Allocator { } pub fn atom(&self, node: NodePtr) -> &[u8] { - match node.node_type() { - (ObjectType::Bytes, idx) => { - let atom = self.atom_vec[idx]; - &self.u8_vec[atom.start as usize..atom.end as usize] - } + let (node_type, idx) = node.node_type(); + + let heap = match node_type { + ObjectType::Bytes => self.u8_vec.as_slice(), + ObjectType::ExternalBytes0 => self.external_buffers[0], + ObjectType::ExternalBytes1 => self.external_buffers[1], + ObjectType::ExternalBytes2 => self.external_buffers[2], _ => { panic!("expected atom, got pair"); } - } + }; + let atom = self.atom_vec[idx]; + &heap[atom.start as usize..atom.end as usize] } pub fn atom_len(&self, node: NodePtr) -> usize { - match node.node_type() { - (ObjectType::Bytes, idx) => { - let atom = self.atom_vec[idx]; - (atom.end - atom.start) as usize - } - _ => { + let idx = match node.node_type() { + (ObjectType::Bytes, idx) => idx, + (ObjectType::ExternalBytes0, idx) => idx, + (ObjectType::ExternalBytes1, idx) => idx, + (ObjectType::ExternalBytes2, idx) => idx, + (ObjectType::Pair, _) => { panic!("expected atom, got pair"); } - } + }; + let atom = self.atom_vec[idx]; + (atom.end - atom.start) as usize } pub fn number(&self, node: NodePtr) -> Number { @@ -334,6 +433,9 @@ impl Allocator { pub fn sexp(&self, node: NodePtr) -> SExp { match node.node_type() { (ObjectType::Bytes, _) => SExp::Atom, + (ObjectType::ExternalBytes0, _) => SExp::Atom, + (ObjectType::ExternalBytes1, _) => SExp::Atom, + (ObjectType::ExternalBytes2, _) => SExp::Atom, (ObjectType::Pair, idx) => { let pair = self.pair_vec[idx]; SExp::Pair(pair.first, pair.rest) @@ -377,7 +479,7 @@ impl Allocator { } } -impl ClvmEncoder for Allocator { +impl ClvmEncoder for Allocator<'_> { type Node = NodePtr; fn encode_atom(&mut self, bytes: &[u8]) -> Result { @@ -393,7 +495,7 @@ impl ClvmEncoder for Allocator { } } -impl ClvmDecoder for Allocator { +impl ClvmDecoder for Allocator<'_> { type Node = NodePtr; fn decode_atom(&self, node: &Self::Node) -> Result<&[u8], FromClvmError> { @@ -413,17 +515,59 @@ impl ClvmDecoder for Allocator { } } +pub struct ImmutableAllocator { + u8_vec: Vec, + pair_vec: Vec, + atom_vec: Vec, +} + +impl<'a> Allocator<'a> { + pub fn make_immutable(self) -> ImmutableAllocator { + ImmutableAllocator { + u8_vec: self.u8_vec, + pair_vec: self.pair_vec, + atom_vec: self.atom_vec, + } + } +} + +impl ImmutableAllocator { + pub fn sexp(&self, node: NodePtr) -> SExp { + match node.node_type() { + (ObjectType::Bytes, _) => SExp::Atom, + (ObjectType::Pair, idx) => { + let pair = self.pair_vec[idx]; + SExp::Pair(pair.first, pair.rest) + } + _ => { + panic!("invalid NodePtr"); + } + } + } + + pub fn atom(&self, node: NodePtr) -> &[u8] { + let idx = match node.node_type() { + (ObjectType::Bytes, idx) => idx, + _ => { + panic!("invalid NodePtr"); + } + }; + let atom = self.atom_vec[idx]; + &self.u8_vec[atom.start as usize..atom.end as usize] + } +} + #[test] fn test_node_as_index() { assert_eq!(NodePtr::new(ObjectType::Pair, 0).as_index(), 0); - assert_eq!(NodePtr::new(ObjectType::Pair, 1).as_index(), 2); - assert_eq!(NodePtr::new(ObjectType::Pair, 2).as_index(), 4); - assert_eq!(NodePtr::new(ObjectType::Pair, 3).as_index(), 6); + assert_eq!(NodePtr::new(ObjectType::Pair, 1).as_index(), 5); + assert_eq!(NodePtr::new(ObjectType::Pair, 2).as_index(), 10); + assert_eq!(NodePtr::new(ObjectType::Pair, 3).as_index(), 15); assert_eq!(NodePtr::new(ObjectType::Bytes, 0).as_index(), 1); - assert_eq!(NodePtr::new(ObjectType::Bytes, 1).as_index(), 3); - assert_eq!(NodePtr::new(ObjectType::Bytes, 2).as_index(), 5); - assert_eq!(NodePtr::new(ObjectType::Bytes, 3).as_index(), 7); - assert_eq!(NodePtr::new(ObjectType::Bytes, 4).as_index(), 9); + assert_eq!(NodePtr::new(ObjectType::Bytes, 1).as_index(), 6); + assert_eq!(NodePtr::new(ObjectType::Bytes, 2).as_index(), 11); + assert_eq!(NodePtr::new(ObjectType::Bytes, 3).as_index(), 16); + assert_eq!(NodePtr::new(ObjectType::Bytes, 4).as_index(), 21); } #[test] @@ -1168,3 +1312,71 @@ fn test_atom_len_g2(#[case] buffer_hex: &str, #[case] expected: usize) { let atom = a.new_g2(g2).unwrap(); assert_eq!(a.atom_len(atom), expected); } + +// test external buffers +#[test] +fn test_external_atom() { + let buffer = b"foobar"; + let mut a = Allocator::new(); + let buf = a.add_external_buffer(buffer).unwrap(); + let a1 = a.new_ext_atom(buf, &buffer[0..3]).unwrap(); + let a2 = a.new_ext_atom(buf, &buffer[2..5]).unwrap(); + let a3 = a.new_ext_atom(buf, &buffer[..]).unwrap(); + assert_eq!(a.atom(a1), b"foo"); + assert_eq!(a.atom_len(a1), 3); + assert_eq!(a.atom(a2), b"oba"); + assert_eq!(a.atom_len(a2), 3); + assert_eq!(a.atom(a3), b"foobar"); + assert_eq!(a.atom_len(a3), 6); + assert!(!a.atom_eq(a1, a2)); + assert!(!a.atom_eq(a2, a3)); + assert!(a.atom_eq(a2, a2)); +} + +#[test] +fn test_external_substr() { + let buffer = b"foobar"; + let mut a = Allocator::new(); + let buf = a.add_external_buffer(buffer).unwrap(); + let a1 = a.new_ext_atom(buf, &buffer[..]).unwrap(); + assert_eq!(a.atom(a1), buffer); + assert_eq!(a.atom(a1), b"foobar"); + + // substr of external atom + let a2 = a.new_substr(a1, 2, 5).unwrap(); + assert_eq!(a.atom(a2), b"oba"); +} + +#[test] +fn test_external_concat() { + let buffer1 = b"foobar"; + let buffer2 = b"1337"; + let mut a = Allocator::new(); + let buf1 = a.add_external_buffer(buffer1).unwrap(); + let buf2 = a.add_external_buffer(buffer2).unwrap(); + let a1 = a.new_ext_atom(buf1, &buffer1[..]).unwrap(); + let a2 = a.new_ext_atom(buf2, &buffer2[..]).unwrap(); + let a3 = a.new_atom(b"42").unwrap(); + assert_eq!(a.atom(a1), b"foobar"); + assert_eq!(a.atom(a2), b"1337"); + assert_eq!(a.atom(a3), b"42"); + + // concat of external atoms and heap allocation + let a4 = a.new_concat(12, &[a1, a2, a3]).unwrap(); + assert_eq!(a.atom(a4), b"foobar133742"); +} + +#[test] +fn test_external_eq() { + let buffer1 = b"foobar"; + let buffer2 = b"foobar"; + let mut a = Allocator::new(); + let buf1 = a.add_external_buffer(buffer1).unwrap(); + let buf2 = a.add_external_buffer(buffer2).unwrap(); + let a1 = a.new_ext_atom(buf1, &buffer1[..]).unwrap(); + let a2 = a.new_ext_atom(buf2, &buffer2[..]).unwrap(); + let a3 = a.new_atom(b"foobar").unwrap(); + assert!(a.atom_eq(a1, a2)); + assert!(a.atom_eq(a1, a3)); + assert!(a.atom_eq(a2, a3)); +} diff --git a/src/run_program.rs b/src/run_program.rs index 4ea88f63..b867e66d 100644 --- a/src/run_program.rs +++ b/src/run_program.rs @@ -88,8 +88,8 @@ struct SoftforkGuard { // 3. the environment stack (points to the environment for the current // operation). env_stack -struct RunProgramContext<'a, D> { - allocator: &'a mut Allocator, +struct RunProgramContext<'a, 'b, D> { + allocator: &'a mut Allocator<'b>, dialect: &'a D, val_stack: Vec, env_stack: Vec, @@ -114,7 +114,7 @@ fn augment_cost_errors(r: Result, max_cost: NodePtr) -> Result RunProgramContext<'a, D> { +impl<'a, 'b, D: Dialect> RunProgramContext<'a, 'b, D> { #[cfg(feature = "counters")] #[inline(always)] fn account_val_push(&mut self) { @@ -178,7 +178,7 @@ impl<'a, D: Dialect> RunProgramContext<'a, D> { #[cfg(feature = "pre-eval")] fn new_with_pre_eval( - allocator: &'a mut Allocator, + allocator: &'a mut Allocator<'b>, dialect: &'a D, pre_eval: Option, ) -> Self { @@ -196,7 +196,7 @@ impl<'a, D: Dialect> RunProgramContext<'a, D> { } } - fn new(allocator: &'a mut Allocator, dialect: &'a D) -> Self { + fn new(allocator: &'a mut Allocator<'b>, dialect: &'a D) -> Self { RunProgramContext { allocator, dialect, @@ -502,7 +502,7 @@ impl<'a, D: Dialect> RunProgramContext<'a, D> { } pub fn run_program<'a, D: Dialect>( - allocator: &'a mut Allocator, + allocator: &'a mut Allocator<'_>, dialect: &'a D, program: NodePtr, env: NodePtr, @@ -514,7 +514,7 @@ pub fn run_program<'a, D: Dialect>( #[cfg(feature = "pre-eval")] pub fn run_program_with_pre_eval<'a, D: Dialect>( - allocator: &'a mut Allocator, + allocator: &'a mut Allocator<'_>, dialect: &'a D, program: NodePtr, env: NodePtr, @@ -527,7 +527,7 @@ pub fn run_program_with_pre_eval<'a, D: Dialect>( #[cfg(feature = "counters")] pub fn run_program_with_counters<'a, D: Dialect>( - allocator: &'a mut Allocator, + allocator: &'a mut Allocator<'_>, dialect: &'a D, program: NodePtr, env: NodePtr, diff --git a/src/serde/object_cache.rs b/src/serde/object_cache.rs index 6a8f2257..7072bb69 100644 --- a/src/serde/object_cache.rs +++ b/src/serde/object_cache.rs @@ -14,7 +14,7 @@ use super::bytes32::{hash_blobs, Bytes32}; pub struct ObjectCache<'a, T> { cache: Vec>, - allocator: &'a Allocator, + allocator: &'a Allocator<'a>, /// The function `f` is expected to calculate its T value recursively based /// on the T values for the left and right child for a pair. For an atom, the diff --git a/wasm/src/api.rs b/wasm/src/api.rs index 657baddf..f1613c4d 100644 --- a/wasm/src/api.rs +++ b/wasm/src/api.rs @@ -72,7 +72,7 @@ pub fn run_chia_program( match r { Ok(reduction) => { let cost = JsValue::from(reduction.0); - let node = LazyNode::new(Rc::new(allocator), reduction.1); + let node = LazyNode::new(Rc::new(allocator.make_immutable()), reduction.1); let val = JsValue::from(node); let tuple = Array::new_with_length(2); diff --git a/wasm/src/lazy_node.rs b/wasm/src/lazy_node.rs index 13444653..5e273162 100644 --- a/wasm/src/lazy_node.rs +++ b/wasm/src/lazy_node.rs @@ -1,4 +1,4 @@ -use clvmr::allocator::{Allocator, NodePtr, SExp}; +use clvmr::allocator::{ImmutableAllocator, NodePtr, SExp}; use std::rc::Rc; use js_sys::Array; @@ -7,7 +7,7 @@ use wasm_bindgen::prelude::*; #[wasm_bindgen] #[derive(Clone)] pub struct LazyNode { - allocator: Rc, + allocator: Rc, node: NodePtr, } @@ -38,7 +38,7 @@ impl LazyNode { } impl LazyNode { - pub const fn new(a: Rc, n: NodePtr) -> Self { + pub const fn new(a: Rc, n: NodePtr) -> Self { Self { allocator: a, node: n, diff --git a/wheel/src/adapt_response.rs b/wheel/src/adapt_response.rs index 6b30886a..5992acd5 100644 --- a/wheel/src/adapt_response.rs +++ b/wheel/src/adapt_response.rs @@ -15,11 +15,11 @@ pub fn adapt_response( ) -> PyResult<(u64, LazyNode)> { match response { Ok(reduction) => { - let val = LazyNode::new(Rc::new(allocator), reduction.1); + let val = LazyNode::new(Rc::new(allocator.make_immutable()), reduction.1); Ok((reduction.0, val)) } Err(eval_err) => { - let sexp = LazyNode::new(Rc::new(allocator), eval_err.0).to_object(py); + let sexp = LazyNode::new(Rc::new(allocator.make_immutable()), eval_err.0).to_object(py); let msg = eval_err.1.to_object(py); let tuple = PyTuple::new(py, [msg, sexp]); let value_error: PyErr = PyValueError::new_err(tuple.to_object(py)); diff --git a/wheel/src/lazy_node.rs b/wheel/src/lazy_node.rs index 228df325..b5d65813 100644 --- a/wheel/src/lazy_node.rs +++ b/wheel/src/lazy_node.rs @@ -1,4 +1,4 @@ -use clvmr::allocator::{Allocator, NodePtr, SExp}; +use clvmr::allocator::{ImmutableAllocator, NodePtr, SExp}; use std::rc::Rc; use pyo3::prelude::*; @@ -7,7 +7,7 @@ use pyo3::types::{PyBytes, PyTuple}; #[pyclass(subclass, unsendable)] #[derive(Clone)] pub struct LazyNode { - allocator: Rc, + allocator: Rc, node: NodePtr, } @@ -44,7 +44,7 @@ impl LazyNode { } impl LazyNode { - pub const fn new(a: Rc, n: NodePtr) -> Self { + pub const fn new(a: Rc, n: NodePtr) -> Self { Self { allocator: a, node: n,