diff --git a/common/src/writer.rs b/common/src/writer.rs index 20f56221d7..9b8b86908d 100644 --- a/common/src/writer.rs +++ b/common/src/writer.rs @@ -62,7 +62,7 @@ impl TerminatingWrite for CountingWriter { pub struct AntiCallToken(()); /// Trait used to indicate when no more write need to be done on a writer -pub trait TerminatingWrite: Write { +pub trait TerminatingWrite: Write + Send { /// Indicate that the writer will no longer be used. Internally call terminate_ref. fn terminate(mut self) -> io::Result<()> where Self: Sized { diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 007934ba53..3c4f2a2abc 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -1081,7 +1081,7 @@ impl IndexMerger { store_writer.store_bytes(&doc_bytes)?; } } else { - store_writer.stack(&store_reader)?; + store_writer.stack(store_reader)?; } } } diff --git a/src/indexer/segment_serializer.rs b/src/indexer/segment_serializer.rs index 554503e668..ffb6a3dc33 100644 --- a/src/indexer/segment_serializer.rs +++ b/src/indexer/segment_serializer.rs @@ -42,7 +42,7 @@ impl SegmentSerializer { let blocksize = segment.index().settings().docstore_blocksize; Ok(SegmentSerializer { segment, - store_writer: StoreWriter::new(store_write, compressor, blocksize), + store_writer: StoreWriter::new(store_write, compressor, blocksize)?, fast_field_serializer, fieldnorms_serializer: Some(fieldnorms_serializer), postings_serializer, diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs index 94469c5bca..308cca255e 100644 --- a/src/indexer/segment_writer.rs +++ b/src/indexer/segment_writer.rs @@ -382,7 +382,7 @@ fn remap_and_write( let block_size = serializer.segment().index().settings().docstore_blocksize; let old_store_writer = std::mem::replace( &mut serializer.store_writer, - StoreWriter::new(store_write, compressor, block_size), + StoreWriter::new(store_write, compressor, block_size)?, ); old_store_writer.close()?; let store_read = StoreReader::open( diff --git a/src/store/index/mod.rs b/src/store/index/mod.rs index 2401e23d6d..af572e758b 100644 --- a/src/store/index/mod.rs +++ b/src/store/index/mod.rs @@ -54,7 +54,7 @@ mod tests { fn test_skip_index_empty() -> io::Result<()> { let mut output: Vec = Vec::new(); let skip_index_builder: SkipIndexBuilder = SkipIndexBuilder::new(); - skip_index_builder.write(&mut output)?; + skip_index_builder.serialize_into(&mut output)?; let skip_index: SkipIndex = SkipIndex::open(OwnedBytes::new(output)); let mut skip_cursor = skip_index.checkpoints(); assert!(skip_cursor.next().is_none()); @@ -70,7 +70,7 @@ mod tests { byte_range: 0..3, }; skip_index_builder.insert(checkpoint.clone()); - skip_index_builder.write(&mut output)?; + skip_index_builder.serialize_into(&mut output)?; let skip_index: SkipIndex = SkipIndex::open(OwnedBytes::new(output)); let mut skip_cursor = skip_index.checkpoints(); assert_eq!(skip_cursor.next(), Some(checkpoint)); @@ -108,7 +108,7 @@ mod tests { for checkpoint in &checkpoints { skip_index_builder.insert(checkpoint.clone()); } - skip_index_builder.write(&mut output)?; + skip_index_builder.serialize_into(&mut output)?; let skip_index: SkipIndex = SkipIndex::open(OwnedBytes::new(output)); assert_eq!( @@ -167,7 +167,7 @@ mod tests { for checkpoint in &checkpoints { skip_index_builder.insert(checkpoint.clone()); } - skip_index_builder.write(&mut output)?; + skip_index_builder.serialize_into(&mut output)?; assert_eq!(output.len(), 4035); let resulting_checkpoints: Vec = SkipIndex::open(OwnedBytes::new(output)) .checkpoints() @@ -238,7 +238,7 @@ mod tests { skip_index_builder.insert(checkpoint); } let mut buffer = Vec::new(); - skip_index_builder.write(&mut buffer).unwrap(); + skip_index_builder.serialize_into(&mut buffer).unwrap(); let skip_index = SkipIndex::open(OwnedBytes::new(buffer)); let iter_checkpoints: Vec = skip_index.checkpoints().collect(); assert_eq!(&checkpoints[..], &iter_checkpoints[..]); diff --git a/src/store/index/skip_index_builder.rs b/src/store/index/skip_index_builder.rs index cbb899a219..2f34376cd4 100644 --- a/src/store/index/skip_index_builder.rs +++ b/src/store/index/skip_index_builder.rs @@ -87,7 +87,7 @@ impl SkipIndexBuilder { } } - pub fn write(mut self, output: &mut W) -> io::Result<()> { + pub fn serialize_into(mut self, output: &mut W) -> io::Result<()> { let mut last_pointer = None; for skip_layer in self.layers.iter_mut() { if let Some(checkpoint) = last_pointer { diff --git a/src/store/mod.rs b/src/store/mod.rs index 88ef9b579e..8dd035fe7f 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -88,7 +88,7 @@ pub mod tests { schema_builder.add_text_field("title", TextOptions::default().set_stored()); let schema = schema_builder.build(); { - let mut store_writer = StoreWriter::new(writer, compressor, blocksize); + let mut store_writer = StoreWriter::new(writer, compressor, blocksize).unwrap(); for i in 0..num_docs { let mut doc = Document::default(); doc.add_field_value(field_body, LOREM.to_string()); diff --git a/src/store/writer.rs b/src/store/writer.rs index a351d0fcbc..4bebb251fe 100644 --- a/src/store/writer.rs +++ b/src/store/writer.rs @@ -1,6 +1,9 @@ use std::io::{self, Write}; +use std::sync::mpsc::{sync_channel, Receiver, SyncSender, TryRecvError}; +use std::thread::{self, JoinHandle}; -use common::{BinarySerializable, CountingWriter, VInt}; +use common::{BinarySerializable, VInt}; +use ownedbytes::OwnedBytes; use super::compressors::Compressor; use super::footer::DocStoreFooter; @@ -21,12 +24,23 @@ use crate::DocId; pub struct StoreWriter { compressor: Compressor, block_size: usize, - doc: DocId, - first_doc_in_block: DocId, - offset_index_writer: SkipIndexBuilder, - writer: CountingWriter, + num_docs_in_current_block: DocId, intermediary_buffer: Vec, current_block: Vec, + + writer: WritePtr, + // the channel to communicate with the compressor thread + compressor_sender: SyncSender, + // the channel to communicate with the compressor thread + data_receiver: Receiver, + + // the handle to check for errors on the thread + compressor_thread_handle: JoinHandle>, +} + +enum BlockCompressorMessage { + AddBlock(DocumentBlock), + Stack((StoreReader, DocumentBlock)), } impl StoreWriter { @@ -34,17 +48,45 @@ impl StoreWriter { /// /// The store writer will writes blocks on disc as /// document are added. - pub fn new(writer: WritePtr, compressor: Compressor, block_size: usize) -> StoreWriter { - StoreWriter { + pub fn new( + writer: WritePtr, + compressor: Compressor, + block_size: usize, + ) -> io::Result { + let thread_builder = thread::Builder::new() + .stack_size(32 * 1024) // 2MB default seems too high + .name("docstore compressor thread".to_string()); + + // Data channel to send fs writes to write only from current thread + let (data_sender, data_receiver) = sync_channel(3); + // Channel to send uncompressed data to compressor channel + let (block_sender, block_receiver) = sync_channel(3); + let thread_join_handle = thread_builder.spawn(move || { + let mut block_compressor = BlockCompressor::new(compressor, data_sender); + while let Ok(packet) = block_receiver.recv() { + match packet { + BlockCompressorMessage::AddBlock(block) => { + block_compressor.compress_block(block)?; + } + BlockCompressorMessage::Stack((store_reader, block)) => { + block_compressor.stack(block, store_reader)?; + } + } + } + block_compressor.close() + })?; + + Ok(StoreWriter { compressor, block_size, - doc: 0, - first_doc_in_block: 0, - offset_index_writer: SkipIndexBuilder::new(), - writer: CountingWriter::wrap(writer), + num_docs_in_current_block: 0, intermediary_buffer: Vec::new(), current_block: Vec::new(), - } + writer, + compressor_sender: block_sender, + compressor_thread_handle: thread_join_handle, + data_receiver, + }) } pub(crate) fn compressor(&self) -> Compressor { @@ -56,21 +98,53 @@ impl StoreWriter { self.intermediary_buffer.capacity() + self.current_block.capacity() } - /// Store bytes of a serialized document. - /// - /// The document id is implicitely the current number - /// of documents. - pub fn store_bytes(&mut self, serialized_document: &[u8]) -> io::Result<()> { - let doc_num_bytes = serialized_document.len(); - VInt(doc_num_bytes as u64).serialize(&mut self.current_block)?; - self.current_block.write_all(serialized_document)?; - self.doc += 1; + fn check_flush_block(&mut self) -> io::Result<()> { if self.current_block.len() > self.block_size { - self.write_and_compress_block()?; + let block = self.get_current_block(); + self.compressor_sender + .send(BlockCompressorMessage::AddBlock(block)) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; + self.fetch_writes_from_channel()?; } Ok(()) } + /// Try to empty the queue to write into the file. + /// + /// This is done in order to avoid writing from multiple threads into the file. + fn fetch_writes_from_channel(&mut self) -> io::Result<()> { + loop { + match self.data_receiver.try_recv() { + Ok(data) => { + self.writer.write_all(data.as_slice())?; + } + Err(err) => match err { + TryRecvError::Empty => { + break; + } + TryRecvError::Disconnected => { + return Err(io::Error::new( + io::ErrorKind::Other, + "compressor data channel unexpected closed".to_string(), + )); + } + }, + } + } + + Ok(()) + } + + fn get_current_block(&mut self) -> DocumentBlock { + let block = DocumentBlock { + data: self.current_block.to_owned(), + num_docs_in_block: self.num_docs_in_current_block, + }; + self.current_block.clear(); + self.num_docs_in_current_block = 0; + block + } + /// Store a new document. /// /// The document id is implicitely the current number @@ -85,28 +159,145 @@ impl StoreWriter { VInt(doc_num_bytes as u64).serialize(&mut self.current_block)?; self.current_block .write_all(&self.intermediary_buffer[..])?; - self.doc += 1; - if self.current_block.len() > self.block_size { - self.write_and_compress_block()?; + self.num_docs_in_current_block += 1; + self.check_flush_block()?; + Ok(()) + } + + /// Store bytes of a serialized document. + /// + /// The document id is implicitely the current number + /// of documents. + pub fn store_bytes(&mut self, serialized_document: &[u8]) -> io::Result<()> { + let doc_num_bytes = serialized_document.len(); + VInt(doc_num_bytes as u64).serialize(&mut self.current_block)?; + self.current_block.write_all(serialized_document)?; + self.num_docs_in_current_block += 1; + self.check_flush_block()?; + Ok(()) + } + + /// Stacks a store reader on top of the documents written so far. + /// This method is an optimization compared to iterating over the documents + /// in the store and adding them one by one, as the store's data will + /// not be decompressed and then recompressed. + pub fn stack(&mut self, store_reader: StoreReader) -> io::Result<()> { + self.check_flush_block()?; + let block = self.get_current_block(); + self.compressor_sender + .send(BlockCompressorMessage::Stack((store_reader, block))) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; + + Ok(()) + } + + /// Finalized the store writer. + /// + /// Compress the last unfinished block if any, + /// and serializes the skip list index on disc. + pub fn close(mut self) -> io::Result<()> { + let block = self.get_current_block(); + if !block.is_empty() { + self.compressor_sender + .send(BlockCompressorMessage::AddBlock(block)) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; } + drop(self.compressor_sender); + + // Wait for remaining data on the channel to write + while let Ok(data) = self.data_receiver.recv() { + self.writer.write_all(data.as_slice())?; + } + + // The compressor thread should have finished already, since data_receiver stopped + // receiving + let (docstore_footer, offset_index_writer) = self + .compressor_thread_handle + .join() + .map_err(|err| io::Error::new(io::ErrorKind::Other, format!("{:?}", err)))??; + + offset_index_writer.serialize_into(&mut self.writer)?; + docstore_footer.serialize(&mut self.writer)?; + self.writer.terminate() + } +} + +/// BlockCompressor is seperated from StoreWriter, to be run in an own thread +pub struct BlockCompressor { + compressor: Compressor, + doc: DocId, + offset_index_writer: SkipIndexBuilder, + intermediary_buffer: Vec, + written_bytes: usize, + data_sender: SyncSender, +} + +struct DocumentBlock { + data: Vec, + num_docs_in_block: DocId, +} + +impl DocumentBlock { + fn is_empty(&self) -> bool { + self.data.is_empty() + } +} + +impl BlockCompressor { + fn new(compressor: Compressor, data_sender: SyncSender) -> Self { + Self { + compressor, + doc: 0, + offset_index_writer: SkipIndexBuilder::new(), + intermediary_buffer: Vec::new(), + written_bytes: 0, + data_sender, + } + } + + fn compress_block(&mut self, block: DocumentBlock) -> io::Result<()> { + assert!(block.num_docs_in_block > 0); + self.intermediary_buffer.clear(); + self.compressor + .compress_into(&block.data[..], &mut self.intermediary_buffer)?; + + let byte_range = self.written_bytes..self.written_bytes + self.intermediary_buffer.len(); + + self.data_sender + .send(OwnedBytes::new(self.intermediary_buffer.to_owned())) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; + + self.written_bytes += byte_range.len(); + + self.register_checkpoint(Checkpoint { + doc_range: self.doc..self.doc + block.num_docs_in_block, + byte_range, + }); Ok(()) } + fn register_checkpoint(&mut self, checkpoint: Checkpoint) { + self.offset_index_writer.insert(checkpoint.clone()); + self.doc = checkpoint.doc_range.end; + } + /// Stacks a store reader on top of the documents written so far. /// This method is an optimization compared to iterating over the documents /// in the store and adding them one by one, as the store's data will /// not be decompressed and then recompressed. - pub fn stack(&mut self, store_reader: &StoreReader) -> io::Result<()> { - if !self.current_block.is_empty() { - self.write_and_compress_block()?; + fn stack(&mut self, block: DocumentBlock, store_reader: StoreReader) -> io::Result<()> { + if !block.is_empty() { + self.compress_block(block)?; } - assert_eq!(self.first_doc_in_block, self.doc); let doc_shift = self.doc; - let start_shift = self.writer.written_bytes() as usize; + let start_shift = self.written_bytes; // just bulk write all of the block of the given reader. - self.writer - .write_all(store_reader.block_data()?.as_slice())?; + self.data_sender + .send(store_reader.block_data()?) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; + + self.written_bytes += store_reader.block_data()?.as_slice().len(); // concatenate the index of the `store_reader`, after translating // its start doc id and its start file offset. @@ -119,42 +310,12 @@ impl StoreWriter { } Ok(()) } + fn close(self) -> io::Result<(DocStoreFooter, SkipIndexBuilder)> { + drop(self.data_sender); - fn register_checkpoint(&mut self, checkpoint: Checkpoint) { - self.offset_index_writer.insert(checkpoint.clone()); - self.first_doc_in_block = checkpoint.doc_range.end; - self.doc = checkpoint.doc_range.end; - } - - fn write_and_compress_block(&mut self) -> io::Result<()> { - assert!(self.doc > 0); - self.intermediary_buffer.clear(); - self.compressor - .compress_into(&self.current_block[..], &mut self.intermediary_buffer)?; - let start_offset = self.writer.written_bytes() as usize; - self.writer.write_all(&self.intermediary_buffer)?; - let end_offset = self.writer.written_bytes() as usize; - let end_doc = self.doc; - self.register_checkpoint(Checkpoint { - doc_range: self.first_doc_in_block..end_doc, - byte_range: start_offset..end_offset, - }); - self.current_block.clear(); - Ok(()) - } - - /// Finalized the store writer. - /// - /// Compress the last unfinished block if any, - /// and serializes the skip list index on disc. - pub fn close(mut self) -> io::Result<()> { - if !self.current_block.is_empty() { - self.write_and_compress_block()?; - } - let header_offset: u64 = self.writer.written_bytes() as u64; + let header_offset: u64 = self.written_bytes as u64; let footer = DocStoreFooter::new(header_offset, Decompressor::from(self.compressor)); - self.offset_index_writer.write(&mut self.writer)?; - footer.serialize(&mut self.writer)?; - self.writer.terminate() + + Ok((footer, self.offset_index_writer)) } }