Skip to content

Commit

Permalink
Merge #5124
Browse files Browse the repository at this point in the history
5124: Optimize Prefixes and Merges r=ManyTheFish a=Kerollmops

In this PR, we plan to optimize the read of LMDB to use read the entries in lexicographic order and better use the memory-mapping OS cache:

 - Optimize the prefix generation for word position docids (`@manythefish)`
 - Optimize the parallel merging of the caches to sort entries before merging the caches (`@kerollmops)`
 
## Benchmarks on 1cpu 2gb gpo3 (5k IOps)
 
Before on the tag meilisearch-v1.12.0-rc.3.

```
word_position_docids:merge_and_send_docids: 988s
compute_word_fst: 23.3s
word_pair_proximity_docids:merge_and_send_docids: 428s
compute_word_prefix_fid_docids:recompute_modified_prefixes: 76.3s
compute_word_prefix_position_docids:recompute_modified_prefixes:from_prefixes: 429s
```

After sorting the whole `HashMap`s in a `Vec` on this branch.

```
word_position_docids:merge_and_send_docids: 202s
compute_word_fst: 20.4s
word_pair_proximity_docids:merge_and_send_docids: 427s
compute_word_prefix_fid_docids:recompute_modified_prefixes: 65.5s
compute_word_prefix_position_docids:recompute_modified_prefixes:from_prefixes: 62.5s
```

Co-authored-by: ManyTheFish <many@meilisearch.com>
Co-authored-by: Kerollmops <clement@meilisearch.com>
  • Loading branch information
3 people authored Dec 5, 2024
2 parents 6298db5 + 5284312 commit cac355b
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 36 deletions.
15 changes: 8 additions & 7 deletions crates/milli/src/update/new/extract/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,12 +466,13 @@ pub fn transpose_and_freeze_caches<'a, 'extractor>(
Ok(bucket_caches)
}

/// Merges the caches that must be all associated to the same bucket.
/// Merges the caches that must be all associated to the same bucket
/// but make sure to sort the different buckets before performing the merges.
///
/// # Panics
///
/// - If the bucket IDs in these frozen caches are not exactly the same.
pub fn merge_caches<F>(frozen: Vec<FrozenCache>, mut f: F) -> Result<()>
pub fn merge_caches_sorted<F>(frozen: Vec<FrozenCache>, mut f: F) -> Result<()>
where
F: for<'a> FnMut(&'a [u8], DelAddRoaringBitmap) -> Result<()>,
{
Expand Down Expand Up @@ -543,12 +544,12 @@ where

// Then manage the content on the HashMap entries that weren't taken (mem::take).
while let Some(mut map) = maps.pop() {
for (key, bbbul) in map.iter_mut() {
// Make sure we don't try to work with entries already managed by the spilled
if bbbul.is_empty() {
continue;
}
// Make sure we don't try to work with entries already managed by the spilled
let mut ordered_entries: Vec<_> =
map.iter_mut().filter(|(_, bbbul)| !bbbul.is_empty()).collect();
ordered_entries.sort_unstable_by_key(|(key, _)| *key);

for (key, bbbul) in ordered_entries {
let mut output = DelAddRoaringBitmap::empty();
output.union_and_clear_bbbul(bbbul);

Expand Down
4 changes: 3 additions & 1 deletion crates/milli/src/update/new/extract/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ mod searchable;
mod vectors;

use bumpalo::Bump;
pub use cache::{merge_caches, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap};
pub use cache::{
merge_caches_sorted, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap,
};
pub use documents::*;
pub use faceted::*;
pub use geo::*;
Expand Down
8 changes: 4 additions & 4 deletions crates/milli/src/update/new/merger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use roaring::RoaringBitmap;

use super::channel::*;
use super::extract::{
merge_caches, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap, FacetKind,
GeoExtractorData,
merge_caches_sorted, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap,
FacetKind, GeoExtractorData,
};
use crate::{CboRoaringBitmapCodec, FieldId, GeoPoint, Index, InternalError, Result};

Expand Down Expand Up @@ -78,7 +78,7 @@ where
if must_stop_processing() {
return Err(InternalError::AbortedIndexation.into());
}
merge_caches(frozen, |key, DelAddRoaringBitmap { del, add }| {
merge_caches_sorted(frozen, |key, DelAddRoaringBitmap { del, add }| {
let current = database.get(&rtxn, key)?;
match merge_cbo_bitmaps(current, del, add)? {
Operation::Write(bitmap) => {
Expand Down Expand Up @@ -107,7 +107,7 @@ pub fn merge_and_send_facet_docids<'extractor>(
.map(|frozen| {
let mut facet_field_ids_delta = FacetFieldIdsDelta::default();
let rtxn = index.read_txn()?;
merge_caches(frozen, |key, DelAddRoaringBitmap { del, add }| {
merge_caches_sorted(frozen, |key, DelAddRoaringBitmap { del, add }| {
let current = database.get_cbo_roaring_bytes_value(&rtxn, key)?;
match merge_cbo_bitmaps(current, del, add)? {
Operation::Write(bitmap) => {
Expand Down
12 changes: 6 additions & 6 deletions crates/milli/src/update/new/word_fst_builder.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::HashSet;
use std::collections::BTreeSet;
use std::io::BufWriter;

use fst::{Set, SetBuilder, Streamer};
Expand Down Expand Up @@ -75,8 +75,8 @@ pub struct PrefixData {

#[derive(Debug)]
pub struct PrefixDelta {
pub modified: HashSet<Prefix>,
pub deleted: HashSet<Prefix>,
pub modified: BTreeSet<Prefix>,
pub deleted: BTreeSet<Prefix>,
}

struct PrefixFstBuilder {
Expand All @@ -86,7 +86,7 @@ struct PrefixFstBuilder {
prefix_fst_builders: Vec<SetBuilder<Vec<u8>>>,
current_prefix: Vec<Prefix>,
current_prefix_count: Vec<usize>,
modified_prefixes: HashSet<Prefix>,
modified_prefixes: BTreeSet<Prefix>,
current_prefix_is_modified: Vec<bool>,
}

Expand All @@ -110,7 +110,7 @@ impl PrefixFstBuilder {
prefix_fst_builders,
current_prefix: vec![Prefix::new(); max_prefix_length],
current_prefix_count: vec![0; max_prefix_length],
modified_prefixes: HashSet::new(),
modified_prefixes: BTreeSet::new(),
current_prefix_is_modified: vec![false; max_prefix_length],
})
}
Expand Down Expand Up @@ -180,7 +180,7 @@ impl PrefixFstBuilder {
let prefix_fst_mmap = unsafe { Mmap::map(&prefix_fst_file)? };
let new_prefix_fst = Set::new(&prefix_fst_mmap)?;
let old_prefix_fst = index.words_prefixes_fst(rtxn)?;
let mut deleted_prefixes = HashSet::new();
let mut deleted_prefixes = BTreeSet::new();
{
let mut deleted_prefixes_stream = old_prefix_fst.op().add(&new_prefix_fst).difference();
while let Some(prefix) = deleted_prefixes_stream.next() {
Expand Down
36 changes: 18 additions & 18 deletions crates/milli/src/update/new/words_prefix_docids.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::cell::RefCell;
use std::collections::HashSet;
use std::collections::BTreeSet;
use std::io::{BufReader, BufWriter, Read, Seek, Write};

use hashbrown::HashMap;
Expand Down Expand Up @@ -37,8 +37,8 @@ impl WordPrefixDocids {
fn execute(
self,
wtxn: &mut heed::RwTxn,
prefix_to_compute: &HashSet<Prefix>,
prefix_to_delete: &HashSet<Prefix>,
prefix_to_compute: &BTreeSet<Prefix>,
prefix_to_delete: &BTreeSet<Prefix>,
) -> Result<()> {
delete_prefixes(wtxn, &self.prefix_database, prefix_to_delete)?;
self.recompute_modified_prefixes(wtxn, prefix_to_compute)
Expand All @@ -48,7 +48,7 @@ impl WordPrefixDocids {
fn recompute_modified_prefixes(
&self,
wtxn: &mut RwTxn,
prefixes: &HashSet<Prefix>,
prefixes: &BTreeSet<Prefix>,
) -> Result<()> {
// We fetch the docids associated to the newly added word prefix fst only.
// And collect the CboRoaringBitmaps pointers in an HashMap.
Expand Down Expand Up @@ -127,7 +127,7 @@ impl<'a, 'rtxn> FrozenPrefixBitmaps<'a, 'rtxn> {
pub fn from_prefixes(
database: Database<Bytes, CboRoaringBitmapCodec>,
rtxn: &'rtxn RoTxn,
prefixes: &'a HashSet<Prefix>,
prefixes: &'a BTreeSet<Prefix>,
) -> heed::Result<Self> {
let database = database.remap_data_type::<Bytes>();

Expand Down Expand Up @@ -173,8 +173,8 @@ impl WordPrefixIntegerDocids {
fn execute(
self,
wtxn: &mut heed::RwTxn,
prefix_to_compute: &HashSet<Prefix>,
prefix_to_delete: &HashSet<Prefix>,
prefix_to_compute: &BTreeSet<Prefix>,
prefix_to_delete: &BTreeSet<Prefix>,
) -> Result<()> {
delete_prefixes(wtxn, &self.prefix_database, prefix_to_delete)?;
self.recompute_modified_prefixes(wtxn, prefix_to_compute)
Expand All @@ -184,7 +184,7 @@ impl WordPrefixIntegerDocids {
fn recompute_modified_prefixes(
&self,
wtxn: &mut RwTxn,
prefixes: &HashSet<Prefix>,
prefixes: &BTreeSet<Prefix>,
) -> Result<()> {
// We fetch the docids associated to the newly added word prefix fst only.
// And collect the CboRoaringBitmaps pointers in an HashMap.
Expand Down Expand Up @@ -262,7 +262,7 @@ impl<'a, 'rtxn> FrozenPrefixIntegerBitmaps<'a, 'rtxn> {
pub fn from_prefixes(
database: Database<Bytes, CboRoaringBitmapCodec>,
rtxn: &'rtxn RoTxn,
prefixes: &'a HashSet<Prefix>,
prefixes: &'a BTreeSet<Prefix>,
) -> heed::Result<Self> {
let database = database.remap_data_type::<Bytes>();

Expand Down Expand Up @@ -291,7 +291,7 @@ unsafe impl<'a, 'rtxn> Sync for FrozenPrefixIntegerBitmaps<'a, 'rtxn> {}
fn delete_prefixes(
wtxn: &mut RwTxn,
prefix_database: &Database<Bytes, CboRoaringBitmapCodec>,
prefixes: &HashSet<Prefix>,
prefixes: &BTreeSet<Prefix>,
) -> Result<()> {
// We remove all the entries that are no more required in this word prefix docids database.
for prefix in prefixes {
Expand All @@ -309,8 +309,8 @@ fn delete_prefixes(
pub fn compute_word_prefix_docids(
wtxn: &mut RwTxn,
index: &Index,
prefix_to_compute: &HashSet<Prefix>,
prefix_to_delete: &HashSet<Prefix>,
prefix_to_compute: &BTreeSet<Prefix>,
prefix_to_delete: &BTreeSet<Prefix>,
grenad_parameters: GrenadParameters,
) -> Result<()> {
WordPrefixDocids::new(
Expand All @@ -325,8 +325,8 @@ pub fn compute_word_prefix_docids(
pub fn compute_exact_word_prefix_docids(
wtxn: &mut RwTxn,
index: &Index,
prefix_to_compute: &HashSet<Prefix>,
prefix_to_delete: &HashSet<Prefix>,
prefix_to_compute: &BTreeSet<Prefix>,
prefix_to_delete: &BTreeSet<Prefix>,
grenad_parameters: GrenadParameters,
) -> Result<()> {
WordPrefixDocids::new(
Expand All @@ -341,8 +341,8 @@ pub fn compute_exact_word_prefix_docids(
pub fn compute_word_prefix_fid_docids(
wtxn: &mut RwTxn,
index: &Index,
prefix_to_compute: &HashSet<Prefix>,
prefix_to_delete: &HashSet<Prefix>,
prefix_to_compute: &BTreeSet<Prefix>,
prefix_to_delete: &BTreeSet<Prefix>,
grenad_parameters: GrenadParameters,
) -> Result<()> {
WordPrefixIntegerDocids::new(
Expand All @@ -357,8 +357,8 @@ pub fn compute_word_prefix_fid_docids(
pub fn compute_word_prefix_position_docids(
wtxn: &mut RwTxn,
index: &Index,
prefix_to_compute: &HashSet<Prefix>,
prefix_to_delete: &HashSet<Prefix>,
prefix_to_compute: &BTreeSet<Prefix>,
prefix_to_delete: &BTreeSet<Prefix>,
grenad_parameters: GrenadParameters,
) -> Result<()> {
WordPrefixIntegerDocids::new(
Expand Down

0 comments on commit cac355b

Please sign in to comment.