Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

rework accounts hash calc dedup to avoid kernel file error #33195

Merged
merged 5 commits into from
Sep 11, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 84 additions & 52 deletions accounts-db/src/accounts_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ use {
std::{
borrow::Borrow,
convert::TryInto,
fs::File,
io::{BufWriter, Write},
io::{Seek, SeekFrom, Write},
path::PathBuf,
sync::{
atomic::{AtomicU64, AtomicUsize, Ordering},
Expand All @@ -33,81 +32,96 @@ pub const MERKLE_FANOUT: usize = 16;

/// 1 file containing account hashes sorted by pubkey, mapped into memory
struct MmapAccountHashesFile {
/// raw slice of `Hash` values. Can be a larger slice than `count`
mmap: MmapMut,
/// # of valid Hash entries in `mmap`
count: usize,
}

impl MmapAccountHashesFile {
/// return a slice of account hashes starting at 'index'
fn read(&self, index: usize) -> &[Hash] {
let start = std::mem::size_of::<Hash>() * index;
let item_slice: &[u8] = &self.mmap[start..];
let item_slice: &[u8] = &self.mmap[start..self.count * std::mem::size_of::<Hash>()];
brooksprumo marked this conversation as resolved.
Show resolved Hide resolved
let remaining_elements = item_slice.len() / std::mem::size_of::<Hash>();
unsafe {
let item = item_slice.as_ptr() as *const Hash;
std::slice::from_raw_parts(item, remaining_elements)
}
}

/// write a hash to the end of mmap file.
fn write(&mut self, hash: &Hash) {
let start = self.count * std::mem::size_of::<Hash>();
let end = start + std::mem::size_of::<Hash>();
self.mmap[start..end].copy_from_slice(hash.as_ref());
self.count += 1;
}
}

/// 1 file containing account hashes sorted by pubkey
pub struct AccountHashesFile {
/// # hashes and an open file that will be deleted on drop. None if there are zero hashes to represent, and thus, no file.
count_and_writer: Option<(usize, BufWriter<File>)>,
writer: Option<MmapAccountHashesFile>,
/// The directory where temporary cache files are put
dir_for_temp_cache_files: PathBuf,
/// # bytes allocated
capacity: usize,
}

impl AccountHashesFile {
/// map the file into memory and return a reader that can access it by slice
fn get_reader(&mut self) -> Option<(usize, MmapAccountHashesFile)> {
std::mem::take(&mut self.count_and_writer).map(|(count, writer)| {
let file = Some(writer.into_inner().unwrap());
(
count,
MmapAccountHashesFile {
mmap: unsafe { MmapMut::map_mut(file.as_ref().unwrap()).unwrap() },
},
)
})
/// return a mmap reader that can be accessed by slice
fn get_reader(&mut self) -> Option<MmapAccountHashesFile> {
std::mem::take(&mut self.writer)
}

/// # hashes stored in this file
pub fn count(&self) -> usize {
self.count_and_writer
self.writer
.as_ref()
.map(|(count, _)| *count)
.map(|writer| writer.count)
.unwrap_or_default()
}

/// write 'hash' to the file
/// If the file isn't open, create it first.
pub fn write(&mut self, hash: &Hash) {
if self.count_and_writer.is_none() {
if self.writer.is_none() {
// we have hashes to write but no file yet, so create a file that will auto-delete on drop
self.count_and_writer = Some((
0,
BufWriter::new(
tempfile_in(&self.dir_for_temp_cache_files).unwrap_or_else(|err| {
panic!(
"Unable to create file within {}: {err}",
self.dir_for_temp_cache_files.display()
)
}),
),
));
}
let count_and_writer = self.count_and_writer.as_mut().unwrap();
count_and_writer
.1
.write_all(hash.as_ref())
.unwrap_or_else(|err| {

let mut data = tempfile_in(&self.dir_for_temp_cache_files).unwrap_or_else(|err| {
panic!(
"Unable to write file within {}: {err}",
"Unable to create file within {}: {err}",
self.dir_for_temp_cache_files.display()
)
});

count_and_writer.0 += 1;
// Theoretical performance optimization: write a zero to the end of
// the file so that we won't have to resize it later, which may be
// expensive.
data.seek(SeekFrom::Start((self.capacity - 1) as u64))
.unwrap();
data.write_all(&[0]).unwrap();
data.rewind().unwrap();
data.flush().unwrap();

//UNSAFE: Required to create a Mmap
let map = unsafe { MmapMut::map_mut(&data) };
let map = map.unwrap_or_else(|e| {
error!(
"Failed to map the data file (size: {}): {}.\n
Please increase sysctl vm.max_map_count or equivalent for your platform.",
self.capacity, e
);
std::process::exit(1);
});
Comment on lines +109 to +117
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know that in append_vec.rs if creating the Mmap fails we'll call exit(), but that feels strange to me. Seems like a panic or expect would work here too. Wdyt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks to be copied from append vec, where stephen wrote this 3 years ago.
Some thoughts:
error messages get logged immediately.
Panics get delayed sometimes and eaten until a thread is joined (IIRC).
Either way it is an error condition. I'm not sure what is best.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, likely won't matter much, since if we cannot create a new mmap file then it's likely the whole system is borked.

I tend to think a library function shouldn't call exit, but instead assert its invariants. We also get a backtrace with panics, but I doubt it would contain useful information. I'd lean towards doing an expect with a message like "create accounts hashes mmap file".

Not a dealbreaker for me, so I defer to you on this one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#12213
Here is the source of the copied code.
@sakridge what do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have some inconsistency.

Here's an info I added a few years ago: #17727

Sometimes we just unwrap the mmap.

Ok(unsafe { MmapMut::map_mut(file).unwrap() })


self.writer = Some(MmapAccountHashesFile {
mmap: map,
count: 0,
});
}
self.writer.as_mut().unwrap().write(hash);
}
}

Expand Down Expand Up @@ -338,7 +352,8 @@ impl CumulativeHashesFromFiles {
let mut readers = Vec::with_capacity(hashes.len());
let cumulative = CumulativeOffsets::new(hashes.into_iter().filter_map(|mut hash_file| {
// ignores all hashfiles that have zero entries
hash_file.get_reader().map(|(count, reader)| {
hash_file.get_reader().map(|reader| {
let count = reader.count;
readers.push(reader);
count
})
Expand Down Expand Up @@ -985,24 +1000,40 @@ impl<'a> AccountsHasher<'a> {
// map from index of an item in first_items[] to index of the corresponding item in sorted_data_by_pubkey[]
// this will change as items in sorted_data_by_pubkey[] are exhausted
let mut first_item_to_pubkey_division = Vec::with_capacity(len);
let mut hashes = AccountHashesFile {
count_and_writer: None,
dir_for_temp_cache_files: self.dir_for_temp_cache_files.clone(),
};

// initialize 'first_items', which holds the current lowest item in each slot group
sorted_data_by_pubkey
let max_inclusive_num_pubkeys = sorted_data_by_pubkey
.iter()
.enumerate()
.for_each(|(i, hash_data)| {
.map(|(i, hash_data)| {
let first_pubkey_in_bin =
Self::find_first_pubkey_in_bin(hash_data, pubkey_bin, bins, &binner, stats);
if let Some(first_pubkey_in_bin) = first_pubkey_in_bin {
let k = hash_data[first_pubkey_in_bin].pubkey;
first_items.push(k);
first_item_to_pubkey_division.push(i);
indexes.push(first_pubkey_in_bin);
let mut first_pubkey_in_next_bin = first_pubkey_in_bin + 1;
while first_pubkey_in_next_bin < hash_data.len() {
if binner.bin_from_pubkey(&hash_data[first_pubkey_in_next_bin].pubkey)
!= pubkey_bin
{
break;
}
first_pubkey_in_next_bin += 1;
}
first_pubkey_in_next_bin - first_pubkey_in_bin
} else {
0
}
});
})
.sum::<usize>();
let mut hashes = AccountHashesFile {
writer: None,
dir_for_temp_cache_files: self.dir_for_temp_cache_files.clone(),
capacity: max_inclusive_num_pubkeys * std::mem::size_of::<Hash>(),
};

let mut overall_sum = 0;
let mut duplicate_pubkey_indexes = Vec::with_capacity(len);
let filler_accounts_enabled = self.filler_accounts_enabled();
Expand Down Expand Up @@ -1238,8 +1269,9 @@ pub mod tests {
impl AccountHashesFile {
fn new(dir_for_temp_cache_files: PathBuf) -> Self {
Self {
count_and_writer: None,
writer: None,
dir_for_temp_cache_files,
capacity: 1024, /* default 1k for tests */
}
}
}
Expand Down Expand Up @@ -1308,16 +1340,16 @@ pub mod tests {
// 1 hash
file.write(&hashes[0]);
let reader = file.get_reader().unwrap();
assert_eq!(&[hashes[0]][..], reader.1.read(0));
assert!(reader.1.read(1).is_empty());
assert_eq!(&[hashes[0]][..], reader.read(0));
assert!(reader.read(1).is_empty());

// multiple hashes
let mut file = AccountHashesFile::new(dir_for_temp_cache_files.path().to_path_buf());
assert!(file.get_reader().is_none());
hashes.iter().for_each(|hash| file.write(hash));
let reader = file.get_reader().unwrap();
(0..2).for_each(|i| assert_eq!(&hashes[i..], reader.1.read(i)));
assert!(reader.1.read(2).is_empty());
(0..2).for_each(|i| assert_eq!(&hashes[i..], reader.read(i)));
assert!(reader.read(2).is_empty());
}

#[test]
Expand Down Expand Up @@ -1476,7 +1508,7 @@ pub mod tests {
let accounts_hasher = AccountsHasher::new(dir_for_temp_cache_files.path().to_path_buf());
let (mut hashes, lamports) =
accounts_hasher.de_dup_accounts_in_parallel(&slice, 0, 1, &HashStats::default());
assert_eq!(&[Hash::default()], hashes.get_reader().unwrap().1.read(0));
assert_eq!(&[Hash::default()], hashes.get_reader().unwrap().read(0));
assert_eq!(lamports, 1);
}

Expand All @@ -1486,7 +1518,7 @@ pub mod tests {
fn get_vec(mut hashes: AccountHashesFile) -> Vec<Hash> {
hashes
.get_reader()
.map(|r| r.1.read(0).to_vec())
.map(|r| r.read(0).to_vec())
.unwrap_or_default()
}

Expand Down