Skip to content

Commit

Permalink
use seperate thread to compress block store
Browse files Browse the repository at this point in the history
Use seperate thread to compress block store for increased indexing performance. This allows to use slower compressors with higher compression ratio, with less or no perfomance impact (with enough cores).

A seperate thread is spawned to compress the docstore, which handles single blocks and stacking from other docstores.
The spawned compressor thread does not write, instead it sends back the compressed data. This is done in order to avoid writing multithreaded on the same file.
  • Loading branch information
PSeitz committed Jun 16, 2022
1 parent 83d0c13 commit b4d43c2
Show file tree
Hide file tree
Showing 8 changed files with 239 additions and 78 deletions.
2 changes: 1 addition & 1 deletion common/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl<W: TerminatingWrite> TerminatingWrite for CountingWriter<W> {
pub struct AntiCallToken(());

/// Trait used to indicate when no more write need to be done on a writer
pub trait TerminatingWrite: Write {
pub trait TerminatingWrite: Write + Send {
/// Indicate that the writer will no longer be used. Internally call terminate_ref.
fn terminate(mut self) -> io::Result<()>
where Self: Sized {
Expand Down
2 changes: 1 addition & 1 deletion src/indexer/merger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1081,7 +1081,7 @@ impl IndexMerger {
store_writer.store_bytes(&doc_bytes)?;
}
} else {
store_writer.stack(&store_reader)?;
store_writer.stack(store_reader)?;
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/indexer/segment_serializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl SegmentSerializer {
let blocksize = segment.index().settings().docstore_blocksize;
Ok(SegmentSerializer {
segment,
store_writer: StoreWriter::new(store_write, compressor, blocksize),
store_writer: StoreWriter::new(store_write, compressor, blocksize)?,
fast_field_serializer,
fieldnorms_serializer: Some(fieldnorms_serializer),
postings_serializer,
Expand Down
2 changes: 1 addition & 1 deletion src/indexer/segment_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ fn remap_and_write(
let block_size = serializer.segment().index().settings().docstore_blocksize;
let old_store_writer = std::mem::replace(
&mut serializer.store_writer,
StoreWriter::new(store_write, compressor, block_size),
StoreWriter::new(store_write, compressor, block_size)?,
);
old_store_writer.close()?;
let store_read = StoreReader::open(
Expand Down
10 changes: 5 additions & 5 deletions src/store/index/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ mod tests {
fn test_skip_index_empty() -> io::Result<()> {
let mut output: Vec<u8> = Vec::new();
let skip_index_builder: SkipIndexBuilder = SkipIndexBuilder::new();
skip_index_builder.write(&mut output)?;
skip_index_builder.serialize_into(&mut output)?;
let skip_index: SkipIndex = SkipIndex::open(OwnedBytes::new(output));
let mut skip_cursor = skip_index.checkpoints();
assert!(skip_cursor.next().is_none());
Expand All @@ -70,7 +70,7 @@ mod tests {
byte_range: 0..3,
};
skip_index_builder.insert(checkpoint.clone());
skip_index_builder.write(&mut output)?;
skip_index_builder.serialize_into(&mut output)?;
let skip_index: SkipIndex = SkipIndex::open(OwnedBytes::new(output));
let mut skip_cursor = skip_index.checkpoints();
assert_eq!(skip_cursor.next(), Some(checkpoint));
Expand Down Expand Up @@ -108,7 +108,7 @@ mod tests {
for checkpoint in &checkpoints {
skip_index_builder.insert(checkpoint.clone());
}
skip_index_builder.write(&mut output)?;
skip_index_builder.serialize_into(&mut output)?;

let skip_index: SkipIndex = SkipIndex::open(OwnedBytes::new(output));
assert_eq!(
Expand Down Expand Up @@ -167,7 +167,7 @@ mod tests {
for checkpoint in &checkpoints {
skip_index_builder.insert(checkpoint.clone());
}
skip_index_builder.write(&mut output)?;
skip_index_builder.serialize_into(&mut output)?;
assert_eq!(output.len(), 4035);
let resulting_checkpoints: Vec<Checkpoint> = SkipIndex::open(OwnedBytes::new(output))
.checkpoints()
Expand Down Expand Up @@ -238,7 +238,7 @@ mod tests {
skip_index_builder.insert(checkpoint);
}
let mut buffer = Vec::new();
skip_index_builder.write(&mut buffer).unwrap();
skip_index_builder.serialize_into(&mut buffer).unwrap();
let skip_index = SkipIndex::open(OwnedBytes::new(buffer));
let iter_checkpoints: Vec<Checkpoint> = skip_index.checkpoints().collect();
assert_eq!(&checkpoints[..], &iter_checkpoints[..]);
Expand Down
2 changes: 1 addition & 1 deletion src/store/index/skip_index_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl SkipIndexBuilder {
}
}

pub fn write<W: Write>(mut self, output: &mut W) -> io::Result<()> {
pub fn serialize_into<W: Write>(mut self, output: &mut W) -> io::Result<()> {
let mut last_pointer = None;
for skip_layer in self.layers.iter_mut() {
if let Some(checkpoint) = last_pointer {
Expand Down
2 changes: 1 addition & 1 deletion src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ pub mod tests {
schema_builder.add_text_field("title", TextOptions::default().set_stored());
let schema = schema_builder.build();
{
let mut store_writer = StoreWriter::new(writer, compressor, blocksize);
let mut store_writer = StoreWriter::new(writer, compressor, blocksize).unwrap();
for i in 0..num_docs {
let mut doc = Document::default();
doc.add_field_value(field_body, LOREM.to_string());
Expand Down
Loading

0 comments on commit b4d43c2

Please sign in to comment.