diff --git a/runtime/src/accounts_hash.rs b/runtime/src/accounts_hash.rs index dce6ba2d4154a6..9f92f29a3cb09c 100644 --- a/runtime/src/accounts_hash.rs +++ b/runtime/src/accounts_hash.rs @@ -18,8 +18,7 @@ use { std::{ borrow::Borrow, convert::TryInto, - fs::File, - io::{BufWriter, Write}, + io::{Seek, SeekFrom, Write}, path::PathBuf, sync::{ atomic::{AtomicU64, AtomicUsize, Ordering}, @@ -35,57 +34,63 @@ pub type SortedDataByPubkey<'a> = Vec<&'a [CalculateHashIntermediate]>; /// 1 file containing account hashes sorted by pubkey, mapped into memory struct MmapAccountHashesFile { + /// raw slice of `Hash` values. Can be a larger slice than `count` mmap: MmapMut, + /// # of valid Hash entries in `mmap` + count: usize, } impl MmapAccountHashesFile { /// return a slice of account hashes starting at 'index' fn read(&self, index: usize) -> &[Hash] { let start = std::mem::size_of::() * index; - let item_slice: &[u8] = &self.mmap[start..]; + let item_slice: &[u8] = &self.mmap[start..self.count * std::mem::size_of::()]; let remaining_elements = item_slice.len() / std::mem::size_of::(); unsafe { let item = item_slice.as_ptr() as *const Hash; std::slice::from_raw_parts(item, remaining_elements) } } + + /// write a hash to the end of mmap file. + fn write(&mut self, hash: &Hash) { + let start = self.count * std::mem::size_of::(); + let end = start + std::mem::size_of::(); + self.mmap[start..end].copy_from_slice(hash.as_ref()); + self.count += 1; + } } /// 1 file containing account hashes sorted by pubkey pub struct AccountHashesFile { /// # hashes and an open file that will be deleted on drop. None if there are zero hashes to represent, and thus, no file. - count_and_writer: Option<(usize, BufWriter)>, + writer: Option, /// The directory where temporary cache files are put dir_for_temp_cache_files: PathBuf, + /// # bytes allocated + capacity: usize, } impl AccountHashesFile { - /// map the file into memory and return a reader that can access it by slice - fn get_reader(&mut self) -> Option<(usize, MmapAccountHashesFile)> { - std::mem::take(&mut self.count_and_writer).map(|(count, writer)| { - let file = Some(writer.into_inner().unwrap()); - ( - count, - MmapAccountHashesFile { - mmap: unsafe { MmapMut::map_mut(file.as_ref().unwrap()).unwrap() }, - }, - ) - }) + /// return a mmap reader that can be accessed by slice + fn get_reader(&mut self) -> Option { + std::mem::take(&mut self.writer) } /// # hashes stored in this file pub fn count(&self) -> usize { - self.count_and_writer + self.writer .as_ref() - .map(|(count, _)| *count) + .map(|writer| writer.count) .unwrap_or_default() } /// write 'hash' to the file /// If the file isn't open, create it first. pub fn write(&mut self, hash: &Hash) { - if self.count_and_writer.is_none() { + if self.writer.is_none() { // we have hashes to write but no file yet, so create a file that will auto-delete on drop +<<<<<<< HEAD:runtime/src/accounts_hash.rs self.count_and_writer = Some(( 0, BufWriter::new( @@ -112,6 +117,42 @@ impl AccountHashesFile { }) ); count_and_writer.0 += 1; +======= + + let mut data = tempfile_in(&self.dir_for_temp_cache_files).unwrap_or_else(|err| { + panic!( + "Unable to create file within {}: {err}", + self.dir_for_temp_cache_files.display() + ) + }); + + // Theoretical performance optimization: write a zero to the end of + // the file so that we won't have to resize it later, which may be + // expensive. + data.seek(SeekFrom::Start((self.capacity - 1) as u64)) + .unwrap(); + data.write_all(&[0]).unwrap(); + data.rewind().unwrap(); + data.flush().unwrap(); + + //UNSAFE: Required to create a Mmap + let map = unsafe { MmapMut::map_mut(&data) }; + let map = map.unwrap_or_else(|e| { + error!( + "Failed to map the data file (size: {}): {}.\n + Please increase sysctl vm.max_map_count or equivalent for your platform.", + self.capacity, e + ); + std::process::exit(1); + }); + + self.writer = Some(MmapAccountHashesFile { + mmap: map, + count: 0, + }); + } + self.writer.as_mut().unwrap().write(hash); +>>>>>>> 4dfe62a2f0 (rework accounts hash calc dedup to avoid kernel file error (#33195)):accounts-db/src/accounts_hash.rs } } @@ -364,7 +405,8 @@ impl CumulativeHashesFromFiles { let mut readers = Vec::with_capacity(hashes.len()); let cumulative = CumulativeOffsets::new(hashes.into_iter().filter_map(|mut hash_file| { // ignores all hashfiles that have zero entries - hash_file.get_reader().map(|(count, reader)| { + hash_file.get_reader().map(|reader| { + let count = reader.count; readers.push(reader); count }) @@ -944,11 +986,9 @@ impl AccountsHasher { // map from index of an item in first_items[] to index of the corresponding item in pubkey_division[] // this will change as items in pubkey_division[] are exhausted let mut first_item_to_pubkey_division = Vec::with_capacity(len); - let mut hashes = AccountHashesFile { - count_and_writer: None, - dir_for_temp_cache_files: self.dir_for_temp_cache_files.clone(), - }; + // initialize 'first_items', which holds the current lowest item in each slot group +<<<<<<< HEAD:runtime/src/accounts_hash.rs pubkey_division.iter().enumerate().for_each(|(i, bins)| { // check to make sure we can do bins[pubkey_bin] if bins.len() > pubkey_bin { @@ -960,6 +1000,40 @@ impl AccountsHasher { } } }); +======= + let max_inclusive_num_pubkeys = sorted_data_by_pubkey + .iter() + .enumerate() + .map(|(i, hash_data)| { + let first_pubkey_in_bin = + Self::find_first_pubkey_in_bin(hash_data, pubkey_bin, bins, &binner, stats); + if let Some(first_pubkey_in_bin) = first_pubkey_in_bin { + let k = hash_data[first_pubkey_in_bin].pubkey; + first_items.push(k); + first_item_to_pubkey_division.push(i); + indexes.push(first_pubkey_in_bin); + let mut first_pubkey_in_next_bin = first_pubkey_in_bin + 1; + while first_pubkey_in_next_bin < hash_data.len() { + if binner.bin_from_pubkey(&hash_data[first_pubkey_in_next_bin].pubkey) + != pubkey_bin + { + break; + } + first_pubkey_in_next_bin += 1; + } + first_pubkey_in_next_bin - first_pubkey_in_bin + } else { + 0 + } + }) + .sum::(); + let mut hashes = AccountHashesFile { + writer: None, + dir_for_temp_cache_files: self.dir_for_temp_cache_files.clone(), + capacity: max_inclusive_num_pubkeys * std::mem::size_of::(), + }; + +>>>>>>> 4dfe62a2f0 (rework accounts hash calc dedup to avoid kernel file error (#33195)):accounts-db/src/accounts_hash.rs let mut overall_sum = 0; let mut duplicate_pubkey_indexes = Vec::with_capacity(len); let filler_accounts_enabled = self.filler_accounts_enabled(); @@ -1142,8 +1216,9 @@ pub mod tests { impl AccountHashesFile { fn new(dir_for_temp_cache_files: PathBuf) -> Self { Self { - count_and_writer: None, + writer: None, dir_for_temp_cache_files, + capacity: 1024, /* default 1k for tests */ } } } @@ -1159,16 +1234,16 @@ pub mod tests { // 1 hash file.write(&hashes[0]); let reader = file.get_reader().unwrap(); - assert_eq!(&[hashes[0]][..], reader.1.read(0)); - assert!(reader.1.read(1).is_empty()); + assert_eq!(&[hashes[0]][..], reader.read(0)); + assert!(reader.read(1).is_empty()); // multiple hashes let mut file = AccountHashesFile::new(dir_for_temp_cache_files.path().to_path_buf()); assert!(file.get_reader().is_none()); hashes.iter().for_each(|hash| file.write(hash)); let reader = file.get_reader().unwrap(); - (0..2).for_each(|i| assert_eq!(&hashes[i..], reader.1.read(i))); - assert!(reader.1.read(2).is_empty()); + (0..2).for_each(|i| assert_eq!(&hashes[i..], reader.read(i))); + assert!(reader.read(2).is_empty()); } #[test] @@ -1325,8 +1400,14 @@ pub mod tests { let slice = convert_to_slice2(&temp_vec); let dir_for_temp_cache_files = tempdir().unwrap(); let accounts_hasher = AccountsHasher::new(dir_for_temp_cache_files.path().to_path_buf()); +<<<<<<< HEAD:runtime/src/accounts_hash.rs let (mut hashes, lamports, _) = accounts_hasher.de_dup_accounts_in_parallel(&slice, 0); assert_eq!(&[Hash::default()], hashes.get_reader().unwrap().1.read(0)); +======= + let (mut hashes, lamports) = + accounts_hasher.de_dup_accounts_in_parallel(&slice, 0, 1, &HashStats::default()); + assert_eq!(&[Hash::default()], hashes.get_reader().unwrap().read(0)); +>>>>>>> 4dfe62a2f0 (rework accounts hash calc dedup to avoid kernel file error (#33195)):accounts-db/src/accounts_hash.rs assert_eq!(lamports, 1); } @@ -1336,7 +1417,7 @@ pub mod tests { fn get_vec(mut hashes: AccountHashesFile) -> Vec { hashes .get_reader() - .map(|r| r.1.read(0).to_vec()) + .map(|r| r.read(0).to_vec()) .unwrap_or_default() }