diff --git a/accounts-db/src/accounts_hash.rs b/accounts-db/src/accounts_hash.rs index ac4134cf80a936..a650ae948879d9 100644 --- a/accounts-db/src/accounts_hash.rs +++ b/accounts-db/src/accounts_hash.rs @@ -19,8 +19,7 @@ use { std::{ borrow::Borrow, convert::TryInto, - fs::File, - io::{BufWriter, Write}, + io::{Seek, SeekFrom, Write}, path::PathBuf, sync::{ atomic::{AtomicU64, AtomicUsize, Ordering}, @@ -33,14 +32,17 @@ pub const MERKLE_FANOUT: usize = 16; /// 1 file containing account hashes sorted by pubkey, mapped into memory struct MmapAccountHashesFile { + /// raw slice of `Hash` values. Can be a larger slice than `count` mmap: MmapMut, + /// # of valid Hash entries in `mmap` + count: usize, } impl MmapAccountHashesFile { /// return a slice of account hashes starting at 'index' fn read(&self, index: usize) -> &[Hash] { let start = std::mem::size_of::() * index; - let item_slice: &[u8] = &self.mmap[start..]; + let item_slice: &[u8] = &self.mmap[start..self.count * std::mem::size_of::()]; let remaining_elements = item_slice.len() / std::mem::size_of::(); unsafe { let item = item_slice.as_ptr() as *const Hash; @@ -52,23 +54,23 @@ impl MmapAccountHashesFile { /// 1 file containing account hashes sorted by pubkey pub struct AccountHashesFile { /// # hashes and an open file that will be deleted on drop. None if there are zero hashes to represent, and thus, no file. - count_and_writer: Option<(usize, BufWriter)>, + count_and_writer: Option<(usize, MmapAccountHashesFile)>, /// The directory where temporary cache files are put dir_for_temp_cache_files: PathBuf, + /// # bytes allocated + capacity: usize, } impl AccountHashesFile { /// map the file into memory and return a reader that can access it by slice fn get_reader(&mut self) -> Option<(usize, MmapAccountHashesFile)> { - std::mem::take(&mut self.count_and_writer).map(|(count, writer)| { - let file = Some(writer.into_inner().unwrap()); - ( - count, - MmapAccountHashesFile { - mmap: unsafe { MmapMut::map_mut(file.as_ref().unwrap()).unwrap() }, - }, - ) - }) + let mut count_and_writer = std::mem::take(&mut self.count_and_writer); + if let Some((count, reader)) = count_and_writer.as_mut() { + assert!(*count * std::mem::size_of::() <= self.capacity); + // file was likely over allocated + reader.count = *count; + } + count_and_writer } /// # hashes stored in this file @@ -84,30 +86,51 @@ impl AccountHashesFile { pub fn write(&mut self, hash: &Hash) { if self.count_and_writer.is_none() { // we have hashes to write but no file yet, so create a file that will auto-delete on drop - self.count_and_writer = Some(( - 0, - BufWriter::new( - tempfile_in(&self.dir_for_temp_cache_files).unwrap_or_else(|err| { - panic!( - "Unable to create file within {}: {err}", - self.dir_for_temp_cache_files.display() - ) - }), - ), - )); - } - let count_and_writer = self.count_and_writer.as_mut().unwrap(); - count_and_writer - .1 - .write_all(hash.as_ref()) - .unwrap_or_else(|err| { + + let mut data = tempfile_in(&self.dir_for_temp_cache_files).unwrap_or_else(|err| { panic!( - "Unable to write file within {}: {err}", + "Unable to create file within {}: {err}", self.dir_for_temp_cache_files.display() ) }); - count_and_writer.0 += 1; + // Theoretical performance optimization: write a zero to the end of + // the file so that we won't have to resize it later, which may be + // expensive. + data.seek(SeekFrom::Start((self.capacity - 1) as u64)) + .unwrap(); + data.write_all(&[0]).unwrap(); + data.rewind().unwrap(); + data.flush().unwrap(); + + //UNSAFE: Required to create a Mmap + let map = unsafe { MmapMut::map_mut(&data) }; + let map = map.unwrap_or_else(|e| { + error!( + "Failed to map the data file (size: {}): {}.\n + Please increase sysctl vm.max_map_count or equivalent for your platform.", + self.capacity, e + ); + std::process::exit(1); + }); + + self.count_and_writer = Some(( + 0, + MmapAccountHashesFile { + mmap: map, + count: self.capacity / std::mem::size_of::(), + }, + )); + } + let (count, writer) = self.count_and_writer.as_mut().unwrap(); + let start = *count * std::mem::size_of::(); + let end = start + std::mem::size_of::(); + + unsafe { + let ptr = writer.mmap[start..end].as_ptr() as *mut Hash; + *ptr = *hash; + }; + *count += 1; } } @@ -511,10 +534,14 @@ impl<'a> AccountsHasher<'a> { .into_par_iter() .map(|i| { let start_index = i * fanout; - let end_index = std::cmp::min(start_index + fanout, total_hashes); + let first_pubkey_in_next_bin = std::cmp::min(start_index + fanout, total_hashes); let mut hasher = Hasher::default(); - for item in hashes.iter().take(end_index).skip(start_index) { + for item in hashes + .iter() + .take(first_pubkey_in_next_bin) + .skip(start_index) + { let h = extractor(item); hasher.hash(h.as_ref()); } @@ -597,12 +624,13 @@ impl<'a> AccountsHasher<'a> { .map(|i| { // summary: // this closure computes 1 or 3 levels of merkle tree (all chunks will be 1 or all will be 3) - // for a subset (our chunk) of the input data [start_index..end_index] + // for a subset (our chunk) of the input data [start_index..first_pubkey_in_next_bin] // index into get_hash_slice_starting_at_index where this chunk's range begins let start_index = i * num_hashes_per_chunk; // index into get_hash_slice_starting_at_index where this chunk's range ends - let end_index = std::cmp::min(start_index + num_hashes_per_chunk, total_hashes); + let first_pubkey_in_next_bin = + std::cmp::min(start_index + num_hashes_per_chunk, total_hashes); // will compute the final result for this closure let mut hasher = Hasher::default(); @@ -618,7 +646,7 @@ impl<'a> AccountsHasher<'a> { if !three_level { // 1 group of fanout // The result of this loop is a single hash value from fanout input hashes. - for i in start_index..end_index { + for i in start_index..first_pubkey_in_next_bin { if data_index >= data_len { // we exhausted our data, fetch next slice starting at i data = get_hash_slice_starting_at_index(i); @@ -651,12 +679,12 @@ impl<'a> AccountsHasher<'a> { // Now, some details: // The result of this loop is a single hash value from fanout^3 input hashes. // concepts: - // what we're conceptually hashing: "raw_hashes"[start_index..end_index] + // what we're conceptually hashing: "raw_hashes"[start_index..first_pubkey_in_next_bin] // example: [a,b,c,d,e,f] // but... hashes[] may really be multiple vectors that are pieced together. // example: [[a,b],[c],[d,e,f]] // get_hash_slice_starting_at_index(any_index) abstracts that and returns a slice starting at raw_hashes[any_index..] - // such that the end of get_hash_slice_starting_at_index may be <, >, or = end_index + // such that the end of get_hash_slice_starting_at_index may be <, >, or = first_pubkey_in_next_bin // example: get_hash_slice_starting_at_index(1) returns [b] // get_hash_slice_starting_at_index(3) returns [d,e,f] // This code is basically 3 iterations of merkle tree hashing occurring simultaneously. @@ -668,11 +696,11 @@ impl<'a> AccountsHasher<'a> { // If there are < fanout^3 hashes, then this code stops when it runs out of raw hashes and returns whatever it hashed. // This is always how the very last elements work in a merkle tree. let mut i = start_index; - while i < end_index { + while i < first_pubkey_in_next_bin { let mut hasher_j = Hasher::default(); for _j in 0..fanout { let mut hasher_k = Hasher::default(); - let end = std::cmp::min(end_index - i, fanout); + let end = std::cmp::min(first_pubkey_in_next_bin - i, fanout); for _k in 0..end { if data_index >= data_len { // we exhausted our data, fetch next slice starting at i @@ -685,7 +713,7 @@ impl<'a> AccountsHasher<'a> { i += 1; } hasher_j.hash(hasher_k.result().as_ref()); - if i >= end_index { + if i >= first_pubkey_in_next_bin { break; } } @@ -985,15 +1013,12 @@ impl<'a> AccountsHasher<'a> { // map from index of an item in first_items[] to index of the corresponding item in sorted_data_by_pubkey[] // this will change as items in sorted_data_by_pubkey[] are exhausted let mut first_item_to_pubkey_division = Vec::with_capacity(len); - let mut hashes = AccountHashesFile { - count_and_writer: None, - dir_for_temp_cache_files: self.dir_for_temp_cache_files.clone(), - }; + // initialize 'first_items', which holds the current lowest item in each slot group - sorted_data_by_pubkey + let max_inclusive_num_pubkeys = sorted_data_by_pubkey .iter() .enumerate() - .for_each(|(i, hash_data)| { + .map(|(i, hash_data)| { let first_pubkey_in_bin = Self::find_first_pubkey_in_bin(hash_data, pubkey_bin, bins, &binner, stats); if let Some(first_pubkey_in_bin) = first_pubkey_in_bin { @@ -1001,8 +1026,27 @@ impl<'a> AccountsHasher<'a> { first_items.push(k); first_item_to_pubkey_division.push(i); indexes.push(first_pubkey_in_bin); + let mut first_pubkey_in_next_bin = first_pubkey_in_bin + 1; + while first_pubkey_in_next_bin < hash_data.len() { + if binner.bin_from_pubkey(&hash_data[first_pubkey_in_next_bin].pubkey) + != pubkey_bin + { + break; + } + first_pubkey_in_next_bin += 1; + } + first_pubkey_in_next_bin - first_pubkey_in_bin + } else { + 0 } - }); + }) + .sum::(); + let mut hashes = AccountHashesFile { + count_and_writer: None, + dir_for_temp_cache_files: self.dir_for_temp_cache_files.clone(), + capacity: max_inclusive_num_pubkeys * std::mem::size_of::(), + }; + let mut overall_sum = 0; let mut duplicate_pubkey_indexes = Vec::with_capacity(len); let filler_accounts_enabled = self.filler_accounts_enabled(); @@ -1240,6 +1284,7 @@ pub mod tests { Self { count_and_writer: None, dir_for_temp_cache_files, + capacity: 1024, /* default 1k for tests */ } } }