Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
rework accounts hash calc dedup to avoid kernel file error (#33195)
Browse files Browse the repository at this point in the history
* in hash calc, calculate max_inclusive_num_pubkeys

* in hash calc, dedup uses mmap files to avoid os panic

* as_mut_ptr

* remove unsafe code

* refactor count in hash files

---------

Co-authored-by: HaoranYi <haoran.yi@solana.com>
(cherry picked from commit 4dfe62a)

# Conflicts:
#	runtime/src/accounts_hash.rs
  • Loading branch information
jeffwashington authored and mergify[bot] committed Sep 11, 2023
1 parent 2cbbf4b commit f48cadd
Showing 1 changed file with 110 additions and 29 deletions.
139 changes: 110 additions & 29 deletions runtime/src/accounts_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ use {
std::{
borrow::Borrow,
convert::TryInto,
fs::File,
io::{BufWriter, Write},
io::{Seek, SeekFrom, Write},
path::PathBuf,
sync::{
atomic::{AtomicU64, AtomicUsize, Ordering},
Expand All @@ -35,57 +34,63 @@ pub type SortedDataByPubkey<'a> = Vec<&'a [CalculateHashIntermediate]>;

/// 1 file containing account hashes sorted by pubkey, mapped into memory
struct MmapAccountHashesFile {
/// raw slice of `Hash` values. Can be a larger slice than `count`
mmap: MmapMut,
/// # of valid Hash entries in `mmap`
count: usize,
}

impl MmapAccountHashesFile {
/// return a slice of account hashes starting at 'index'
fn read(&self, index: usize) -> &[Hash] {
let start = std::mem::size_of::<Hash>() * index;
let item_slice: &[u8] = &self.mmap[start..];
let item_slice: &[u8] = &self.mmap[start..self.count * std::mem::size_of::<Hash>()];
let remaining_elements = item_slice.len() / std::mem::size_of::<Hash>();
unsafe {
let item = item_slice.as_ptr() as *const Hash;
std::slice::from_raw_parts(item, remaining_elements)
}
}

/// write a hash to the end of mmap file.
fn write(&mut self, hash: &Hash) {
let start = self.count * std::mem::size_of::<Hash>();
let end = start + std::mem::size_of::<Hash>();
self.mmap[start..end].copy_from_slice(hash.as_ref());
self.count += 1;
}
}

/// 1 file containing account hashes sorted by pubkey
pub struct AccountHashesFile {
/// # hashes and an open file that will be deleted on drop. None if there are zero hashes to represent, and thus, no file.
count_and_writer: Option<(usize, BufWriter<File>)>,
writer: Option<MmapAccountHashesFile>,
/// The directory where temporary cache files are put
dir_for_temp_cache_files: PathBuf,
/// # bytes allocated
capacity: usize,
}

impl AccountHashesFile {
/// map the file into memory and return a reader that can access it by slice
fn get_reader(&mut self) -> Option<(usize, MmapAccountHashesFile)> {
std::mem::take(&mut self.count_and_writer).map(|(count, writer)| {
let file = Some(writer.into_inner().unwrap());
(
count,
MmapAccountHashesFile {
mmap: unsafe { MmapMut::map_mut(file.as_ref().unwrap()).unwrap() },
},
)
})
/// return a mmap reader that can be accessed by slice
fn get_reader(&mut self) -> Option<MmapAccountHashesFile> {
std::mem::take(&mut self.writer)
}

/// # hashes stored in this file
pub fn count(&self) -> usize {
self.count_and_writer
self.writer
.as_ref()
.map(|(count, _)| *count)
.map(|writer| writer.count)
.unwrap_or_default()
}

/// write 'hash' to the file
/// If the file isn't open, create it first.
pub fn write(&mut self, hash: &Hash) {
if self.count_and_writer.is_none() {
if self.writer.is_none() {
// we have hashes to write but no file yet, so create a file that will auto-delete on drop
<<<<<<< HEAD:runtime/src/accounts_hash.rs
self.count_and_writer = Some((
0,
BufWriter::new(
Expand All @@ -112,6 +117,42 @@ impl AccountHashesFile {
})
);
count_and_writer.0 += 1;
=======

let mut data = tempfile_in(&self.dir_for_temp_cache_files).unwrap_or_else(|err| {
panic!(
"Unable to create file within {}: {err}",
self.dir_for_temp_cache_files.display()
)
});

// Theoretical performance optimization: write a zero to the end of
// the file so that we won't have to resize it later, which may be
// expensive.
data.seek(SeekFrom::Start((self.capacity - 1) as u64))
.unwrap();
data.write_all(&[0]).unwrap();
data.rewind().unwrap();
data.flush().unwrap();

//UNSAFE: Required to create a Mmap
let map = unsafe { MmapMut::map_mut(&data) };
let map = map.unwrap_or_else(|e| {
error!(
"Failed to map the data file (size: {}): {}.\n
Please increase sysctl vm.max_map_count or equivalent for your platform.",
self.capacity, e
);
std::process::exit(1);
});

self.writer = Some(MmapAccountHashesFile {
mmap: map,
count: 0,
});
}
self.writer.as_mut().unwrap().write(hash);
>>>>>>> 4dfe62a2f0 (rework accounts hash calc dedup to avoid kernel file error (#33195)):accounts-db/src/accounts_hash.rs
}
}

Expand Down Expand Up @@ -364,7 +405,8 @@ impl CumulativeHashesFromFiles {
let mut readers = Vec::with_capacity(hashes.len());
let cumulative = CumulativeOffsets::new(hashes.into_iter().filter_map(|mut hash_file| {
// ignores all hashfiles that have zero entries
hash_file.get_reader().map(|(count, reader)| {
hash_file.get_reader().map(|reader| {
let count = reader.count;
readers.push(reader);
count
})
Expand Down Expand Up @@ -944,11 +986,9 @@ impl AccountsHasher {
// map from index of an item in first_items[] to index of the corresponding item in pubkey_division[]
// this will change as items in pubkey_division[] are exhausted
let mut first_item_to_pubkey_division = Vec::with_capacity(len);
let mut hashes = AccountHashesFile {
count_and_writer: None,
dir_for_temp_cache_files: self.dir_for_temp_cache_files.clone(),
};

// initialize 'first_items', which holds the current lowest item in each slot group
<<<<<<< HEAD:runtime/src/accounts_hash.rs
pubkey_division.iter().enumerate().for_each(|(i, bins)| {
// check to make sure we can do bins[pubkey_bin]
if bins.len() > pubkey_bin {
Expand All @@ -960,6 +1000,40 @@ impl AccountsHasher {
}
}
});
=======
let max_inclusive_num_pubkeys = sorted_data_by_pubkey
.iter()
.enumerate()
.map(|(i, hash_data)| {
let first_pubkey_in_bin =
Self::find_first_pubkey_in_bin(hash_data, pubkey_bin, bins, &binner, stats);
if let Some(first_pubkey_in_bin) = first_pubkey_in_bin {
let k = hash_data[first_pubkey_in_bin].pubkey;
first_items.push(k);
first_item_to_pubkey_division.push(i);
indexes.push(first_pubkey_in_bin);
let mut first_pubkey_in_next_bin = first_pubkey_in_bin + 1;
while first_pubkey_in_next_bin < hash_data.len() {
if binner.bin_from_pubkey(&hash_data[first_pubkey_in_next_bin].pubkey)
!= pubkey_bin
{
break;
}
first_pubkey_in_next_bin += 1;
}
first_pubkey_in_next_bin - first_pubkey_in_bin
} else {
0
}
})
.sum::<usize>();
let mut hashes = AccountHashesFile {
writer: None,
dir_for_temp_cache_files: self.dir_for_temp_cache_files.clone(),
capacity: max_inclusive_num_pubkeys * std::mem::size_of::<Hash>(),
};

>>>>>>> 4dfe62a2f0 (rework accounts hash calc dedup to avoid kernel file error (#33195)):accounts-db/src/accounts_hash.rs
let mut overall_sum = 0;
let mut duplicate_pubkey_indexes = Vec::with_capacity(len);
let filler_accounts_enabled = self.filler_accounts_enabled();
Expand Down Expand Up @@ -1142,8 +1216,9 @@ pub mod tests {
impl AccountHashesFile {
fn new(dir_for_temp_cache_files: PathBuf) -> Self {
Self {
count_and_writer: None,
writer: None,
dir_for_temp_cache_files,
capacity: 1024, /* default 1k for tests */
}
}
}
Expand All @@ -1159,16 +1234,16 @@ pub mod tests {
// 1 hash
file.write(&hashes[0]);
let reader = file.get_reader().unwrap();
assert_eq!(&[hashes[0]][..], reader.1.read(0));
assert!(reader.1.read(1).is_empty());
assert_eq!(&[hashes[0]][..], reader.read(0));
assert!(reader.read(1).is_empty());

// multiple hashes
let mut file = AccountHashesFile::new(dir_for_temp_cache_files.path().to_path_buf());
assert!(file.get_reader().is_none());
hashes.iter().for_each(|hash| file.write(hash));
let reader = file.get_reader().unwrap();
(0..2).for_each(|i| assert_eq!(&hashes[i..], reader.1.read(i)));
assert!(reader.1.read(2).is_empty());
(0..2).for_each(|i| assert_eq!(&hashes[i..], reader.read(i)));
assert!(reader.read(2).is_empty());
}

#[test]
Expand Down Expand Up @@ -1325,8 +1400,14 @@ pub mod tests {
let slice = convert_to_slice2(&temp_vec);
let dir_for_temp_cache_files = tempdir().unwrap();
let accounts_hasher = AccountsHasher::new(dir_for_temp_cache_files.path().to_path_buf());
<<<<<<< HEAD:runtime/src/accounts_hash.rs
let (mut hashes, lamports, _) = accounts_hasher.de_dup_accounts_in_parallel(&slice, 0);
assert_eq!(&[Hash::default()], hashes.get_reader().unwrap().1.read(0));
=======
let (mut hashes, lamports) =
accounts_hasher.de_dup_accounts_in_parallel(&slice, 0, 1, &HashStats::default());
assert_eq!(&[Hash::default()], hashes.get_reader().unwrap().read(0));
>>>>>>> 4dfe62a2f0 (rework accounts hash calc dedup to avoid kernel file error (#33195)):accounts-db/src/accounts_hash.rs
assert_eq!(lamports, 1);
}

Expand All @@ -1336,7 +1417,7 @@ pub mod tests {
fn get_vec(mut hashes: AccountHashesFile) -> Vec<Hash> {
hashes
.get_reader()
.map(|r| r.1.read(0).to_vec())
.map(|r| r.read(0).to_vec())
.unwrap_or_default()
}

Expand Down

0 comments on commit f48cadd

Please sign in to comment.