From b4d43c2a6fd53ac8ee1accb33977445488a6ea78 Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Thu, 16 Jun 2022 13:06:52 +0800 Subject: [PATCH] use seperate thread to compress block store Use seperate thread to compress block store for increased indexing performance. This allows to use slower compressors with higher compression ratio, with less or no perfomance impact (with enough cores). A seperate thread is spawned to compress the docstore, which handles single blocks and stacking from other docstores. The spawned compressor thread does not write, instead it sends back the compressed data. This is done in order to avoid writing multithreaded on the same file. --- common/src/writer.rs | 2 +- src/indexer/merger.rs | 2 +- src/indexer/segment_serializer.rs | 2 +- src/indexer/segment_writer.rs | 2 +- src/store/index/mod.rs | 10 +- src/store/index/skip_index_builder.rs | 2 +- src/store/mod.rs | 2 +- src/store/writer.rs | 295 ++++++++++++++++++++------ 8 files changed, 239 insertions(+), 78 deletions(-) diff --git a/common/src/writer.rs b/common/src/writer.rs index 20f56221d7..9b8b86908d 100644 --- a/common/src/writer.rs +++ b/common/src/writer.rs @@ -62,7 +62,7 @@ impl TerminatingWrite for CountingWriter { pub struct AntiCallToken(()); /// Trait used to indicate when no more write need to be done on a writer -pub trait TerminatingWrite: Write { +pub trait TerminatingWrite: Write + Send { /// Indicate that the writer will no longer be used. Internally call terminate_ref. fn terminate(mut self) -> io::Result<()> where Self: Sized { diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 007934ba53..3c4f2a2abc 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -1081,7 +1081,7 @@ impl IndexMerger { store_writer.store_bytes(&doc_bytes)?; } } else { - store_writer.stack(&store_reader)?; + store_writer.stack(store_reader)?; } } } diff --git a/src/indexer/segment_serializer.rs b/src/indexer/segment_serializer.rs index 554503e668..ffb6a3dc33 100644 --- a/src/indexer/segment_serializer.rs +++ b/src/indexer/segment_serializer.rs @@ -42,7 +42,7 @@ impl SegmentSerializer { let blocksize = segment.index().settings().docstore_blocksize; Ok(SegmentSerializer { segment, - store_writer: StoreWriter::new(store_write, compressor, blocksize), + store_writer: StoreWriter::new(store_write, compressor, blocksize)?, fast_field_serializer, fieldnorms_serializer: Some(fieldnorms_serializer), postings_serializer, diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs index 94469c5bca..308cca255e 100644 --- a/src/indexer/segment_writer.rs +++ b/src/indexer/segment_writer.rs @@ -382,7 +382,7 @@ fn remap_and_write( let block_size = serializer.segment().index().settings().docstore_blocksize; let old_store_writer = std::mem::replace( &mut serializer.store_writer, - StoreWriter::new(store_write, compressor, block_size), + StoreWriter::new(store_write, compressor, block_size)?, ); old_store_writer.close()?; let store_read = StoreReader::open( diff --git a/src/store/index/mod.rs b/src/store/index/mod.rs index 2401e23d6d..af572e758b 100644 --- a/src/store/index/mod.rs +++ b/src/store/index/mod.rs @@ -54,7 +54,7 @@ mod tests { fn test_skip_index_empty() -> io::Result<()> { let mut output: Vec = Vec::new(); let skip_index_builder: SkipIndexBuilder = SkipIndexBuilder::new(); - skip_index_builder.write(&mut output)?; + skip_index_builder.serialize_into(&mut output)?; let skip_index: SkipIndex = SkipIndex::open(OwnedBytes::new(output)); let mut skip_cursor = skip_index.checkpoints(); assert!(skip_cursor.next().is_none()); @@ -70,7 +70,7 @@ mod tests { byte_range: 0..3, }; skip_index_builder.insert(checkpoint.clone()); - skip_index_builder.write(&mut output)?; + skip_index_builder.serialize_into(&mut output)?; let skip_index: SkipIndex = SkipIndex::open(OwnedBytes::new(output)); let mut skip_cursor = skip_index.checkpoints(); assert_eq!(skip_cursor.next(), Some(checkpoint)); @@ -108,7 +108,7 @@ mod tests { for checkpoint in &checkpoints { skip_index_builder.insert(checkpoint.clone()); } - skip_index_builder.write(&mut output)?; + skip_index_builder.serialize_into(&mut output)?; let skip_index: SkipIndex = SkipIndex::open(OwnedBytes::new(output)); assert_eq!( @@ -167,7 +167,7 @@ mod tests { for checkpoint in &checkpoints { skip_index_builder.insert(checkpoint.clone()); } - skip_index_builder.write(&mut output)?; + skip_index_builder.serialize_into(&mut output)?; assert_eq!(output.len(), 4035); let resulting_checkpoints: Vec = SkipIndex::open(OwnedBytes::new(output)) .checkpoints() @@ -238,7 +238,7 @@ mod tests { skip_index_builder.insert(checkpoint); } let mut buffer = Vec::new(); - skip_index_builder.write(&mut buffer).unwrap(); + skip_index_builder.serialize_into(&mut buffer).unwrap(); let skip_index = SkipIndex::open(OwnedBytes::new(buffer)); let iter_checkpoints: Vec = skip_index.checkpoints().collect(); assert_eq!(&checkpoints[..], &iter_checkpoints[..]); diff --git a/src/store/index/skip_index_builder.rs b/src/store/index/skip_index_builder.rs index cbb899a219..2f34376cd4 100644 --- a/src/store/index/skip_index_builder.rs +++ b/src/store/index/skip_index_builder.rs @@ -87,7 +87,7 @@ impl SkipIndexBuilder { } } - pub fn write(mut self, output: &mut W) -> io::Result<()> { + pub fn serialize_into(mut self, output: &mut W) -> io::Result<()> { let mut last_pointer = None; for skip_layer in self.layers.iter_mut() { if let Some(checkpoint) = last_pointer { diff --git a/src/store/mod.rs b/src/store/mod.rs index 88ef9b579e..8dd035fe7f 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -88,7 +88,7 @@ pub mod tests { schema_builder.add_text_field("title", TextOptions::default().set_stored()); let schema = schema_builder.build(); { - let mut store_writer = StoreWriter::new(writer, compressor, blocksize); + let mut store_writer = StoreWriter::new(writer, compressor, blocksize).unwrap(); for i in 0..num_docs { let mut doc = Document::default(); doc.add_field_value(field_body, LOREM.to_string()); diff --git a/src/store/writer.rs b/src/store/writer.rs index a351d0fcbc..4bebb251fe 100644 --- a/src/store/writer.rs +++ b/src/store/writer.rs @@ -1,6 +1,9 @@ use std::io::{self, Write}; +use std::sync::mpsc::{sync_channel, Receiver, SyncSender, TryRecvError}; +use std::thread::{self, JoinHandle}; -use common::{BinarySerializable, CountingWriter, VInt}; +use common::{BinarySerializable, VInt}; +use ownedbytes::OwnedBytes; use super::compressors::Compressor; use super::footer::DocStoreFooter; @@ -21,12 +24,23 @@ use crate::DocId; pub struct StoreWriter { compressor: Compressor, block_size: usize, - doc: DocId, - first_doc_in_block: DocId, - offset_index_writer: SkipIndexBuilder, - writer: CountingWriter, + num_docs_in_current_block: DocId, intermediary_buffer: Vec, current_block: Vec, + + writer: WritePtr, + // the channel to communicate with the compressor thread + compressor_sender: SyncSender, + // the channel to communicate with the compressor thread + data_receiver: Receiver, + + // the handle to check for errors on the thread + compressor_thread_handle: JoinHandle>, +} + +enum BlockCompressorMessage { + AddBlock(DocumentBlock), + Stack((StoreReader, DocumentBlock)), } impl StoreWriter { @@ -34,17 +48,45 @@ impl StoreWriter { /// /// The store writer will writes blocks on disc as /// document are added. - pub fn new(writer: WritePtr, compressor: Compressor, block_size: usize) -> StoreWriter { - StoreWriter { + pub fn new( + writer: WritePtr, + compressor: Compressor, + block_size: usize, + ) -> io::Result { + let thread_builder = thread::Builder::new() + .stack_size(32 * 1024) // 2MB default seems too high + .name("docstore compressor thread".to_string()); + + // Data channel to send fs writes to write only from current thread + let (data_sender, data_receiver) = sync_channel(3); + // Channel to send uncompressed data to compressor channel + let (block_sender, block_receiver) = sync_channel(3); + let thread_join_handle = thread_builder.spawn(move || { + let mut block_compressor = BlockCompressor::new(compressor, data_sender); + while let Ok(packet) = block_receiver.recv() { + match packet { + BlockCompressorMessage::AddBlock(block) => { + block_compressor.compress_block(block)?; + } + BlockCompressorMessage::Stack((store_reader, block)) => { + block_compressor.stack(block, store_reader)?; + } + } + } + block_compressor.close() + })?; + + Ok(StoreWriter { compressor, block_size, - doc: 0, - first_doc_in_block: 0, - offset_index_writer: SkipIndexBuilder::new(), - writer: CountingWriter::wrap(writer), + num_docs_in_current_block: 0, intermediary_buffer: Vec::new(), current_block: Vec::new(), - } + writer, + compressor_sender: block_sender, + compressor_thread_handle: thread_join_handle, + data_receiver, + }) } pub(crate) fn compressor(&self) -> Compressor { @@ -56,21 +98,53 @@ impl StoreWriter { self.intermediary_buffer.capacity() + self.current_block.capacity() } - /// Store bytes of a serialized document. - /// - /// The document id is implicitely the current number - /// of documents. - pub fn store_bytes(&mut self, serialized_document: &[u8]) -> io::Result<()> { - let doc_num_bytes = serialized_document.len(); - VInt(doc_num_bytes as u64).serialize(&mut self.current_block)?; - self.current_block.write_all(serialized_document)?; - self.doc += 1; + fn check_flush_block(&mut self) -> io::Result<()> { if self.current_block.len() > self.block_size { - self.write_and_compress_block()?; + let block = self.get_current_block(); + self.compressor_sender + .send(BlockCompressorMessage::AddBlock(block)) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; + self.fetch_writes_from_channel()?; } Ok(()) } + /// Try to empty the queue to write into the file. + /// + /// This is done in order to avoid writing from multiple threads into the file. + fn fetch_writes_from_channel(&mut self) -> io::Result<()> { + loop { + match self.data_receiver.try_recv() { + Ok(data) => { + self.writer.write_all(data.as_slice())?; + } + Err(err) => match err { + TryRecvError::Empty => { + break; + } + TryRecvError::Disconnected => { + return Err(io::Error::new( + io::ErrorKind::Other, + "compressor data channel unexpected closed".to_string(), + )); + } + }, + } + } + + Ok(()) + } + + fn get_current_block(&mut self) -> DocumentBlock { + let block = DocumentBlock { + data: self.current_block.to_owned(), + num_docs_in_block: self.num_docs_in_current_block, + }; + self.current_block.clear(); + self.num_docs_in_current_block = 0; + block + } + /// Store a new document. /// /// The document id is implicitely the current number @@ -85,28 +159,145 @@ impl StoreWriter { VInt(doc_num_bytes as u64).serialize(&mut self.current_block)?; self.current_block .write_all(&self.intermediary_buffer[..])?; - self.doc += 1; - if self.current_block.len() > self.block_size { - self.write_and_compress_block()?; + self.num_docs_in_current_block += 1; + self.check_flush_block()?; + Ok(()) + } + + /// Store bytes of a serialized document. + /// + /// The document id is implicitely the current number + /// of documents. + pub fn store_bytes(&mut self, serialized_document: &[u8]) -> io::Result<()> { + let doc_num_bytes = serialized_document.len(); + VInt(doc_num_bytes as u64).serialize(&mut self.current_block)?; + self.current_block.write_all(serialized_document)?; + self.num_docs_in_current_block += 1; + self.check_flush_block()?; + Ok(()) + } + + /// Stacks a store reader on top of the documents written so far. + /// This method is an optimization compared to iterating over the documents + /// in the store and adding them one by one, as the store's data will + /// not be decompressed and then recompressed. + pub fn stack(&mut self, store_reader: StoreReader) -> io::Result<()> { + self.check_flush_block()?; + let block = self.get_current_block(); + self.compressor_sender + .send(BlockCompressorMessage::Stack((store_reader, block))) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; + + Ok(()) + } + + /// Finalized the store writer. + /// + /// Compress the last unfinished block if any, + /// and serializes the skip list index on disc. + pub fn close(mut self) -> io::Result<()> { + let block = self.get_current_block(); + if !block.is_empty() { + self.compressor_sender + .send(BlockCompressorMessage::AddBlock(block)) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; } + drop(self.compressor_sender); + + // Wait for remaining data on the channel to write + while let Ok(data) = self.data_receiver.recv() { + self.writer.write_all(data.as_slice())?; + } + + // The compressor thread should have finished already, since data_receiver stopped + // receiving + let (docstore_footer, offset_index_writer) = self + .compressor_thread_handle + .join() + .map_err(|err| io::Error::new(io::ErrorKind::Other, format!("{:?}", err)))??; + + offset_index_writer.serialize_into(&mut self.writer)?; + docstore_footer.serialize(&mut self.writer)?; + self.writer.terminate() + } +} + +/// BlockCompressor is seperated from StoreWriter, to be run in an own thread +pub struct BlockCompressor { + compressor: Compressor, + doc: DocId, + offset_index_writer: SkipIndexBuilder, + intermediary_buffer: Vec, + written_bytes: usize, + data_sender: SyncSender, +} + +struct DocumentBlock { + data: Vec, + num_docs_in_block: DocId, +} + +impl DocumentBlock { + fn is_empty(&self) -> bool { + self.data.is_empty() + } +} + +impl BlockCompressor { + fn new(compressor: Compressor, data_sender: SyncSender) -> Self { + Self { + compressor, + doc: 0, + offset_index_writer: SkipIndexBuilder::new(), + intermediary_buffer: Vec::new(), + written_bytes: 0, + data_sender, + } + } + + fn compress_block(&mut self, block: DocumentBlock) -> io::Result<()> { + assert!(block.num_docs_in_block > 0); + self.intermediary_buffer.clear(); + self.compressor + .compress_into(&block.data[..], &mut self.intermediary_buffer)?; + + let byte_range = self.written_bytes..self.written_bytes + self.intermediary_buffer.len(); + + self.data_sender + .send(OwnedBytes::new(self.intermediary_buffer.to_owned())) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; + + self.written_bytes += byte_range.len(); + + self.register_checkpoint(Checkpoint { + doc_range: self.doc..self.doc + block.num_docs_in_block, + byte_range, + }); Ok(()) } + fn register_checkpoint(&mut self, checkpoint: Checkpoint) { + self.offset_index_writer.insert(checkpoint.clone()); + self.doc = checkpoint.doc_range.end; + } + /// Stacks a store reader on top of the documents written so far. /// This method is an optimization compared to iterating over the documents /// in the store and adding them one by one, as the store's data will /// not be decompressed and then recompressed. - pub fn stack(&mut self, store_reader: &StoreReader) -> io::Result<()> { - if !self.current_block.is_empty() { - self.write_and_compress_block()?; + fn stack(&mut self, block: DocumentBlock, store_reader: StoreReader) -> io::Result<()> { + if !block.is_empty() { + self.compress_block(block)?; } - assert_eq!(self.first_doc_in_block, self.doc); let doc_shift = self.doc; - let start_shift = self.writer.written_bytes() as usize; + let start_shift = self.written_bytes; // just bulk write all of the block of the given reader. - self.writer - .write_all(store_reader.block_data()?.as_slice())?; + self.data_sender + .send(store_reader.block_data()?) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; + + self.written_bytes += store_reader.block_data()?.as_slice().len(); // concatenate the index of the `store_reader`, after translating // its start doc id and its start file offset. @@ -119,42 +310,12 @@ impl StoreWriter { } Ok(()) } + fn close(self) -> io::Result<(DocStoreFooter, SkipIndexBuilder)> { + drop(self.data_sender); - fn register_checkpoint(&mut self, checkpoint: Checkpoint) { - self.offset_index_writer.insert(checkpoint.clone()); - self.first_doc_in_block = checkpoint.doc_range.end; - self.doc = checkpoint.doc_range.end; - } - - fn write_and_compress_block(&mut self) -> io::Result<()> { - assert!(self.doc > 0); - self.intermediary_buffer.clear(); - self.compressor - .compress_into(&self.current_block[..], &mut self.intermediary_buffer)?; - let start_offset = self.writer.written_bytes() as usize; - self.writer.write_all(&self.intermediary_buffer)?; - let end_offset = self.writer.written_bytes() as usize; - let end_doc = self.doc; - self.register_checkpoint(Checkpoint { - doc_range: self.first_doc_in_block..end_doc, - byte_range: start_offset..end_offset, - }); - self.current_block.clear(); - Ok(()) - } - - /// Finalized the store writer. - /// - /// Compress the last unfinished block if any, - /// and serializes the skip list index on disc. - pub fn close(mut self) -> io::Result<()> { - if !self.current_block.is_empty() { - self.write_and_compress_block()?; - } - let header_offset: u64 = self.writer.written_bytes() as u64; + let header_offset: u64 = self.written_bytes as u64; let footer = DocStoreFooter::new(header_offset, Decompressor::from(self.compressor)); - self.offset_index_writer.write(&mut self.writer)?; - footer.serialize(&mut self.writer)?; - self.writer.terminate() + + Ok((footer, self.offset_index_writer)) } }