Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
in hash calc, dedup uses mmap files to avoid os panic
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffwashington committed Sep 8, 2023
1 parent dc6b1eb commit 150471e
Showing 1 changed file with 94 additions and 49 deletions.
143 changes: 94 additions & 49 deletions accounts-db/src/accounts_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ use {
std::{
borrow::Borrow,
convert::TryInto,
fs::File,
io::{BufWriter, Write},
io::{Seek, SeekFrom, Write},
path::PathBuf,
sync::{
atomic::{AtomicU64, AtomicUsize, Ordering},
Expand All @@ -33,14 +32,17 @@ pub const MERKLE_FANOUT: usize = 16;

/// 1 file containing account hashes sorted by pubkey, mapped into memory
struct MmapAccountHashesFile {
/// raw slice of `Hash` values. Can be a larger slice than `count`
mmap: MmapMut,
/// # of valid Hash entries in `mmap`
count: usize,
}

impl MmapAccountHashesFile {
/// return a slice of account hashes starting at 'index'
fn read(&self, index: usize) -> &[Hash] {
let start = std::mem::size_of::<Hash>() * index;
let item_slice: &[u8] = &self.mmap[start..];
let item_slice: &[u8] = &self.mmap[start..self.count * std::mem::size_of::<Hash>()];
let remaining_elements = item_slice.len() / std::mem::size_of::<Hash>();
unsafe {
let item = item_slice.as_ptr() as *const Hash;
Expand All @@ -52,23 +54,23 @@ impl MmapAccountHashesFile {
/// 1 file containing account hashes sorted by pubkey
pub struct AccountHashesFile {
/// # hashes and an open file that will be deleted on drop. None if there are zero hashes to represent, and thus, no file.
count_and_writer: Option<(usize, BufWriter<File>)>,
count_and_writer: Option<(usize, MmapAccountHashesFile)>,
/// The directory where temporary cache files are put
dir_for_temp_cache_files: PathBuf,
/// # bytes allocated
capacity: usize,
}

impl AccountHashesFile {
/// map the file into memory and return a reader that can access it by slice
fn get_reader(&mut self) -> Option<(usize, MmapAccountHashesFile)> {
std::mem::take(&mut self.count_and_writer).map(|(count, writer)| {
let file = Some(writer.into_inner().unwrap());
(
count,
MmapAccountHashesFile {
mmap: unsafe { MmapMut::map_mut(file.as_ref().unwrap()).unwrap() },
},
)
})
let mut count_and_writer = std::mem::take(&mut self.count_and_writer);
if let Some((count, reader)) = count_and_writer.as_mut() {
assert!(*count * std::mem::size_of::<Hash>() <= self.capacity);
// file was likely over allocated
reader.count = *count;
}
count_and_writer
}

/// # hashes stored in this file
Expand All @@ -84,30 +86,51 @@ impl AccountHashesFile {
pub fn write(&mut self, hash: &Hash) {
if self.count_and_writer.is_none() {
// we have hashes to write but no file yet, so create a file that will auto-delete on drop
self.count_and_writer = Some((
0,
BufWriter::new(
tempfile_in(&self.dir_for_temp_cache_files).unwrap_or_else(|err| {
panic!(
"Unable to create file within {}: {err}",
self.dir_for_temp_cache_files.display()
)
}),
),
));
}
let count_and_writer = self.count_and_writer.as_mut().unwrap();
count_and_writer
.1
.write_all(hash.as_ref())
.unwrap_or_else(|err| {

let mut data = tempfile_in(&self.dir_for_temp_cache_files).unwrap_or_else(|err| {
panic!(
"Unable to write file within {}: {err}",
"Unable to create file within {}: {err}",
self.dir_for_temp_cache_files.display()
)
});

count_and_writer.0 += 1;
// Theoretical performance optimization: write a zero to the end of
// the file so that we won't have to resize it later, which may be
// expensive.
data.seek(SeekFrom::Start((self.capacity - 1) as u64))
.unwrap();
data.write_all(&[0]).unwrap();
data.rewind().unwrap();
data.flush().unwrap();

//UNSAFE: Required to create a Mmap
let map = unsafe { MmapMut::map_mut(&data) };
let map = map.unwrap_or_else(|e| {
error!(
"Failed to map the data file (size: {}): {}.\n
Please increase sysctl vm.max_map_count or equivalent for your platform.",
self.capacity, e
);
std::process::exit(1);
});

self.count_and_writer = Some((
0,
MmapAccountHashesFile {
mmap: map,
count: self.capacity / std::mem::size_of::<Hash>(),
},
));
}
let (count, writer) = self.count_and_writer.as_mut().unwrap();
let start = *count * std::mem::size_of::<Hash>();
let end = start + std::mem::size_of::<Hash>();

unsafe {
let ptr = writer.mmap[start..end].as_ptr() as *mut Hash;
*ptr = *hash;
};
*count += 1;
}
}

Expand Down Expand Up @@ -511,10 +534,14 @@ impl<'a> AccountsHasher<'a> {
.into_par_iter()
.map(|i| {
let start_index = i * fanout;
let end_index = std::cmp::min(start_index + fanout, total_hashes);
let first_pubkey_in_next_bin = std::cmp::min(start_index + fanout, total_hashes);

let mut hasher = Hasher::default();
for item in hashes.iter().take(end_index).skip(start_index) {
for item in hashes
.iter()
.take(first_pubkey_in_next_bin)
.skip(start_index)
{
let h = extractor(item);
hasher.hash(h.as_ref());
}
Expand Down Expand Up @@ -597,12 +624,13 @@ impl<'a> AccountsHasher<'a> {
.map(|i| {
// summary:
// this closure computes 1 or 3 levels of merkle tree (all chunks will be 1 or all will be 3)
// for a subset (our chunk) of the input data [start_index..end_index]
// for a subset (our chunk) of the input data [start_index..first_pubkey_in_next_bin]

// index into get_hash_slice_starting_at_index where this chunk's range begins
let start_index = i * num_hashes_per_chunk;
// index into get_hash_slice_starting_at_index where this chunk's range ends
let end_index = std::cmp::min(start_index + num_hashes_per_chunk, total_hashes);
let first_pubkey_in_next_bin =
std::cmp::min(start_index + num_hashes_per_chunk, total_hashes);

// will compute the final result for this closure
let mut hasher = Hasher::default();
Expand All @@ -618,7 +646,7 @@ impl<'a> AccountsHasher<'a> {
if !three_level {
// 1 group of fanout
// The result of this loop is a single hash value from fanout input hashes.
for i in start_index..end_index {
for i in start_index..first_pubkey_in_next_bin {
if data_index >= data_len {
// we exhausted our data, fetch next slice starting at i
data = get_hash_slice_starting_at_index(i);
Expand Down Expand Up @@ -651,12 +679,12 @@ impl<'a> AccountsHasher<'a> {
// Now, some details:
// The result of this loop is a single hash value from fanout^3 input hashes.
// concepts:
// what we're conceptually hashing: "raw_hashes"[start_index..end_index]
// what we're conceptually hashing: "raw_hashes"[start_index..first_pubkey_in_next_bin]
// example: [a,b,c,d,e,f]
// but... hashes[] may really be multiple vectors that are pieced together.
// example: [[a,b],[c],[d,e,f]]
// get_hash_slice_starting_at_index(any_index) abstracts that and returns a slice starting at raw_hashes[any_index..]
// such that the end of get_hash_slice_starting_at_index may be <, >, or = end_index
// such that the end of get_hash_slice_starting_at_index may be <, >, or = first_pubkey_in_next_bin
// example: get_hash_slice_starting_at_index(1) returns [b]
// get_hash_slice_starting_at_index(3) returns [d,e,f]
// This code is basically 3 iterations of merkle tree hashing occurring simultaneously.
Expand All @@ -668,11 +696,11 @@ impl<'a> AccountsHasher<'a> {
// If there are < fanout^3 hashes, then this code stops when it runs out of raw hashes and returns whatever it hashed.
// This is always how the very last elements work in a merkle tree.
let mut i = start_index;
while i < end_index {
while i < first_pubkey_in_next_bin {
let mut hasher_j = Hasher::default();
for _j in 0..fanout {
let mut hasher_k = Hasher::default();
let end = std::cmp::min(end_index - i, fanout);
let end = std::cmp::min(first_pubkey_in_next_bin - i, fanout);
for _k in 0..end {
if data_index >= data_len {
// we exhausted our data, fetch next slice starting at i
Expand All @@ -685,7 +713,7 @@ impl<'a> AccountsHasher<'a> {
i += 1;
}
hasher_j.hash(hasher_k.result().as_ref());
if i >= end_index {
if i >= first_pubkey_in_next_bin {
break;
}
}
Expand Down Expand Up @@ -985,24 +1013,40 @@ impl<'a> AccountsHasher<'a> {
// map from index of an item in first_items[] to index of the corresponding item in sorted_data_by_pubkey[]
// this will change as items in sorted_data_by_pubkey[] are exhausted
let mut first_item_to_pubkey_division = Vec::with_capacity(len);
let mut hashes = AccountHashesFile {
count_and_writer: None,
dir_for_temp_cache_files: self.dir_for_temp_cache_files.clone(),
};

// initialize 'first_items', which holds the current lowest item in each slot group
sorted_data_by_pubkey
let max_inclusive_num_pubkeys = sorted_data_by_pubkey
.iter()
.enumerate()
.for_each(|(i, hash_data)| {
.map(|(i, hash_data)| {
let first_pubkey_in_bin =
Self::find_first_pubkey_in_bin(hash_data, pubkey_bin, bins, &binner, stats);
if let Some(first_pubkey_in_bin) = first_pubkey_in_bin {
let k = hash_data[first_pubkey_in_bin].pubkey;
first_items.push(k);
first_item_to_pubkey_division.push(i);
indexes.push(first_pubkey_in_bin);
let mut first_pubkey_in_next_bin = first_pubkey_in_bin + 1;
while first_pubkey_in_next_bin < hash_data.len() {
if binner.bin_from_pubkey(&hash_data[first_pubkey_in_next_bin].pubkey)
!= pubkey_bin
{
break;
}
first_pubkey_in_next_bin += 1;
}
first_pubkey_in_next_bin - first_pubkey_in_bin
} else {
0
}
});
})
.sum::<usize>();
let mut hashes = AccountHashesFile {
count_and_writer: None,
dir_for_temp_cache_files: self.dir_for_temp_cache_files.clone(),
capacity: max_inclusive_num_pubkeys * std::mem::size_of::<Hash>(),
};

let mut overall_sum = 0;
let mut duplicate_pubkey_indexes = Vec::with_capacity(len);
let filler_accounts_enabled = self.filler_accounts_enabled();
Expand Down Expand Up @@ -1240,6 +1284,7 @@ pub mod tests {
Self {
count_and_writer: None,
dir_for_temp_cache_files,
capacity: 1024, /* default 1k for tests */
}
}
}
Expand Down

0 comments on commit 150471e

Please sign in to comment.