Skip to content

Commit

Permalink
renames and refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
PSeitz committed Jun 21, 2022
1 parent 3a1c0fc commit 4b6db03
Showing 1 changed file with 24 additions and 30 deletions.
54 changes: 24 additions & 30 deletions src/store/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,13 @@ pub struct StoreWriter {

// the channel to send data to the compressor thread.
compressor_sender: SyncSender<BlockCompressorMessage>,

// the handle to check for errors on the thread
compressor_thread_handle: JoinHandle<io::Result<()>>,
}

enum BlockCompressorMessage {
AddBlock(DocumentBlock),
Stack((StoreReader, DocumentBlock)),
Stack(StoreReader),
}

impl StoreWriter {
Expand All @@ -61,10 +60,10 @@ impl StoreWriter {
while let Ok(packet) = block_receiver.recv() {
match packet {
BlockCompressorMessage::AddBlock(block) => {
block_compressor.compress_block(block)?;
block_compressor.compress_block_and_write(block)?;
}
BlockCompressorMessage::Stack((store_reader, block)) => {
block_compressor.stack(block, store_reader)?;
BlockCompressorMessage::Stack(store_reader) => {
block_compressor.stack(store_reader)?;
}
}
}
Expand Down Expand Up @@ -95,22 +94,25 @@ impl StoreWriter {
/// Checks if the current block is full, and if so, compresses and flushes it.
fn check_flush_block(&mut self) -> io::Result<()> {
if self.current_block.len() > self.block_size {
let block = self.get_current_block();
self.compressor_sender
.send(BlockCompressorMessage::AddBlock(block))
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
self.send_current_block_to_compressor()?;
}
Ok(())
}

fn get_current_block(&mut self) -> DocumentBlock {
/// Flushes current uncompressed block and sends to compressor.
fn send_current_block_to_compressor(&mut self) -> io::Result<()> {
let block = DocumentBlock {
data: self.current_block.to_owned(),
num_docs_in_block: self.num_docs_in_current_block,
};
self.current_block.clear();
self.num_docs_in_current_block = 0;
block
if !block.is_empty() {
self.compressor_sender
.send(BlockCompressorMessage::AddBlock(block))
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
}
Ok(())
}

/// Store a new document.
Expand Down Expand Up @@ -150,10 +152,10 @@ impl StoreWriter {
/// in the store and adding them one by one, as the store's data will
/// not be decompressed and then recompressed.
pub fn stack(&mut self, store_reader: StoreReader) -> io::Result<()> {
self.check_flush_block()?;
let block = self.get_current_block();
// We flush the current block first before stacking
self.send_current_block_to_compressor()?;
self.compressor_sender
.send(BlockCompressorMessage::Stack((store_reader, block)))
.send(BlockCompressorMessage::Stack(store_reader))
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;

Ok(())
Expand All @@ -164,12 +166,7 @@ impl StoreWriter {
/// Compress the last unfinished block if any,
/// and serializes the skip list index on disc.
pub fn close(mut self) -> io::Result<()> {
let block = self.get_current_block();
if !block.is_empty() {
self.compressor_sender
.send(BlockCompressorMessage::AddBlock(block))
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
}
self.send_current_block_to_compressor()?;
drop(self.compressor_sender);

self.compressor_thread_handle
Expand All @@ -183,7 +180,7 @@ impl StoreWriter {
/// BlockCompressor is separated from StoreWriter, to be run in an own thread
pub struct BlockCompressor {
compressor: Compressor,
doc: DocId,
first_doc_in_block: DocId,
offset_index_writer: SkipIndexBuilder,
intermediary_buffer: Vec<u8>,
writer: CountingWriter<WritePtr>,
Expand All @@ -204,14 +201,14 @@ impl BlockCompressor {
fn new(compressor: Compressor, writer: WritePtr) -> Self {
Self {
compressor,
doc: 0,
first_doc_in_block: 0,
offset_index_writer: SkipIndexBuilder::new(),
intermediary_buffer: Vec::new(),
writer: CountingWriter::wrap(writer),
}
}

fn compress_block(&mut self, block: DocumentBlock) -> io::Result<()> {
fn compress_block_and_write(&mut self, block: DocumentBlock) -> io::Result<()> {
assert!(block.num_docs_in_block > 0);
self.intermediary_buffer.clear();
self.compressor
Expand All @@ -222,26 +219,23 @@ impl BlockCompressor {
let end_offset = self.writer.written_bytes() as usize;

self.register_checkpoint(Checkpoint {
doc_range: self.doc..self.doc + block.num_docs_in_block,
doc_range: self.first_doc_in_block..self.first_doc_in_block + block.num_docs_in_block,
byte_range: start_offset..end_offset,
});
Ok(())
}

fn register_checkpoint(&mut self, checkpoint: Checkpoint) {
self.offset_index_writer.insert(checkpoint.clone());
self.doc = checkpoint.doc_range.end;
self.first_doc_in_block = checkpoint.doc_range.end;
}

/// Stacks a store reader on top of the documents written so far.
/// This method is an optimization compared to iterating over the documents
/// in the store and adding them one by one, as the store's data will
/// not be decompressed and then recompressed.
fn stack(&mut self, block: DocumentBlock, store_reader: StoreReader) -> io::Result<()> {
if !block.is_empty() {
self.compress_block(block)?;
}
let doc_shift = self.doc;
fn stack(&mut self, store_reader: StoreReader) -> io::Result<()> {
let doc_shift = self.first_doc_in_block;
let start_shift = self.writer.written_bytes() as usize;

// just bulk write all of the block of the given reader.
Expand Down

0 comments on commit 4b6db03

Please sign in to comment.