Skip to content

Commit

Permalink
avoid prepare_doc allocation
Browse files Browse the repository at this point in the history
avoid prepare_doc allocation, ~10% more thoughput best case
  • Loading branch information
PSeitz committed Oct 9, 2022
1 parent 5f565e7 commit 19bc196
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 44 deletions.
56 changes: 23 additions & 33 deletions src/indexer/segment_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::postings::{
compute_table_size, serialize_postings, IndexingContext, IndexingPosition,
PerFieldPostingsWriter, PostingsWriter,
};
use crate::schema::{FieldEntry, FieldType, FieldValue, Schema, Term, Value};
use crate::schema::{FieldEntry, FieldType, Schema, Term, Value};
use crate::store::{StoreReader, StoreWriter};
use crate::tokenizer::{
BoxTokenStream, FacetTokenizer, PreTokenizedStream, TextAnalyzer, Tokenizer,
Expand Down Expand Up @@ -307,9 +307,8 @@ impl SegmentWriter {
self.doc_opstamps.push(add_operation.opstamp);
self.fast_field_writers.add_document(&doc);
self.index_document(&doc)?;
let prepared_doc = prepare_doc_for_store(doc, &self.schema);
let doc_writer = self.segment_serializer.get_store_writer();
doc_writer.store(&prepared_doc)?;
doc_writer.store(&doc, &self.schema)?;
self.max_doc += 1;
Ok(())
}
Expand Down Expand Up @@ -406,40 +405,24 @@ fn remap_and_write(
Ok(())
}

/// Prepares Document for being stored in the document store
///
/// Method transforms PreTokenizedString values into String
/// values.
pub fn prepare_doc_for_store(doc: Document, schema: &Schema) -> Document {
Document::from(
doc.into_iter()
.filter(|field_value| schema.get_field_entry(field_value.field()).is_stored())
.map(|field_value| match field_value {
FieldValue {
field,
value: Value::PreTokStr(pre_tokenized_text),
} => FieldValue {
field,
value: Value::Str(pre_tokenized_text.text),
},
field_value => field_value,
})
.collect::<Vec<_>>(),
)
}

#[cfg(test)]
mod tests {
use std::path::Path;

use super::compute_initial_table_size;
use crate::collector::Count;
use crate::directory::RamDirectory;
use crate::indexer::json_term_writer::JsonTermWriter;
use crate::postings::TermInfo;
use crate::query::PhraseQuery;
use crate::schema::{IndexRecordOption, Schema, Type, STORED, STRING, TEXT};
use crate::store::{Compressor, StoreReader, StoreWriter};
use crate::time::format_description::well_known::Rfc3339;
use crate::time::OffsetDateTime;
use crate::tokenizer::{PreTokenizedString, Token};
use crate::{DateTime, DocAddress, DocSet, Document, Index, Postings, Term, TERMINATED};
use crate::{
DateTime, Directory, DocAddress, DocSet, Document, Index, Postings, Term, TERMINATED,
};

#[test]
fn test_hashmap_size() {
Expand Down Expand Up @@ -469,14 +452,21 @@ mod tests {

doc.add_pre_tokenized_text(text_field, pre_tokenized_text);
doc.add_text(text_field, "title");
let prepared_doc = super::prepare_doc_for_store(doc, &schema);

assert_eq!(prepared_doc.field_values().len(), 2);
assert_eq!(prepared_doc.field_values()[0].value().as_text(), Some("A"));
assert_eq!(
prepared_doc.field_values()[1].value().as_text(),
Some("title")
);
let path = Path::new("store");
let directory = RamDirectory::create();
let store_wrt = directory.open_write(path).unwrap();

let mut store_writer = StoreWriter::new(store_wrt, Compressor::None, 0, false).unwrap();
store_writer.store(&doc, &schema).unwrap();
store_writer.close().unwrap();

let reader = StoreReader::open(directory.open_read(path).unwrap(), 0).unwrap();
let doc = reader.get(0).unwrap();

assert_eq!(doc.field_values().len(), 2);
assert_eq!(doc.field_values()[0].value().as_text(), Some("A"));
assert_eq!(doc.field_values()[1].value().as_text(), Some("title"));
}

#[test]
Expand Down
28 changes: 28 additions & 0 deletions src/schema/document.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,34 @@ impl Document {
pub fn get_first(&self, field: Field) -> Option<&Value> {
self.get_all(field).next()
}

/// Serializes stored field values.
pub fn serialize_stored<W: Write>(&self, schema: &Schema, writer: &mut W) -> io::Result<()> {
let stored_field_values = || {
self.field_values()
.iter()
.filter(|field_value| schema.get_field_entry(field_value.field()).is_stored())
};
let num_field_values = stored_field_values().count();

VInt(num_field_values as u64).serialize(writer)?;
for field_value in stored_field_values() {
match field_value {
FieldValue {
field,
value: Value::PreTokStr(pre_tokenized_text),
} => {
let field_value = FieldValue {
field: *field,
value: Value::Str(pre_tokenized_text.text.to_string()),
};
field_value.serialize(writer)?;
}
field_value => field_value.serialize(writer)?,
};
}
Ok(())
}
}

impl BinarySerializable for Document {
Expand Down
2 changes: 1 addition & 1 deletion src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ pub mod tests {
let mut doc = Document::default();
doc.add_field_value(field_body, LOREM.to_string());
doc.add_field_value(field_title, format!("Doc {i}"));
store_writer.store(&doc).unwrap();
store_writer.store(&doc, &schema).unwrap();
}
store_writer.close().unwrap();
}
Expand Down
14 changes: 4 additions & 10 deletions src/store/writer.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::io::{self, Write};
use std::io;

use common::BinarySerializable;

use super::compressors::Compressor;
use super::StoreReader;
use crate::directory::WritePtr;
use crate::schema::Document;
use crate::schema::{Document, Schema};
use crate::store::store_compressor::BlockCompressor;
use crate::DocId;

Expand Down Expand Up @@ -99,15 +99,9 @@ impl StoreWriter {
///
/// The document id is implicitly the current number
/// of documents.
pub fn store(&mut self, stored_document: &Document) -> io::Result<()> {
self.intermediary_buffer.clear();
stored_document.serialize(&mut self.intermediary_buffer)?;
// calling store bytes would be preferable for code reuse, but then we can't use
// intermediary_buffer due to the borrow checker
// a new buffer costs ~1% indexing performance
pub fn store(&mut self, document: &Document, schema: &Schema) -> io::Result<()> {
self.doc_pos.push(self.current_block.len() as u32);
self.current_block
.write_all(&self.intermediary_buffer[..])?;
document.serialize_stored(schema, &mut self.current_block)?;
self.num_docs_in_current_block += 1;
self.check_flush_block()?;
Ok(())
Expand Down

0 comments on commit 19bc196

Please sign in to comment.