From 19bc196e05df36bdceca8ee5ef8db03996118872 Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Sat, 8 Oct 2022 13:17:47 +0800 Subject: [PATCH] avoid prepare_doc allocation avoid prepare_doc allocation, ~10% more thoughput best case --- src/indexer/segment_writer.rs | 56 ++++++++++++++--------------------- src/schema/document.rs | 28 ++++++++++++++++++ src/store/mod.rs | 2 +- src/store/writer.rs | 14 +++------ 4 files changed, 56 insertions(+), 44 deletions(-) diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs index 415378752c..3031e6de44 100644 --- a/src/indexer/segment_writer.rs +++ b/src/indexer/segment_writer.rs @@ -12,7 +12,7 @@ use crate::postings::{ compute_table_size, serialize_postings, IndexingContext, IndexingPosition, PerFieldPostingsWriter, PostingsWriter, }; -use crate::schema::{FieldEntry, FieldType, FieldValue, Schema, Term, Value}; +use crate::schema::{FieldEntry, FieldType, Schema, Term, Value}; use crate::store::{StoreReader, StoreWriter}; use crate::tokenizer::{ BoxTokenStream, FacetTokenizer, PreTokenizedStream, TextAnalyzer, Tokenizer, @@ -307,9 +307,8 @@ impl SegmentWriter { self.doc_opstamps.push(add_operation.opstamp); self.fast_field_writers.add_document(&doc); self.index_document(&doc)?; - let prepared_doc = prepare_doc_for_store(doc, &self.schema); let doc_writer = self.segment_serializer.get_store_writer(); - doc_writer.store(&prepared_doc)?; + doc_writer.store(&doc, &self.schema)?; self.max_doc += 1; Ok(()) } @@ -406,40 +405,24 @@ fn remap_and_write( Ok(()) } -/// Prepares Document for being stored in the document store -/// -/// Method transforms PreTokenizedString values into String -/// values. -pub fn prepare_doc_for_store(doc: Document, schema: &Schema) -> Document { - Document::from( - doc.into_iter() - .filter(|field_value| schema.get_field_entry(field_value.field()).is_stored()) - .map(|field_value| match field_value { - FieldValue { - field, - value: Value::PreTokStr(pre_tokenized_text), - } => FieldValue { - field, - value: Value::Str(pre_tokenized_text.text), - }, - field_value => field_value, - }) - .collect::>(), - ) -} - #[cfg(test)] mod tests { + use std::path::Path; + use super::compute_initial_table_size; use crate::collector::Count; + use crate::directory::RamDirectory; use crate::indexer::json_term_writer::JsonTermWriter; use crate::postings::TermInfo; use crate::query::PhraseQuery; use crate::schema::{IndexRecordOption, Schema, Type, STORED, STRING, TEXT}; + use crate::store::{Compressor, StoreReader, StoreWriter}; use crate::time::format_description::well_known::Rfc3339; use crate::time::OffsetDateTime; use crate::tokenizer::{PreTokenizedString, Token}; - use crate::{DateTime, DocAddress, DocSet, Document, Index, Postings, Term, TERMINATED}; + use crate::{ + DateTime, Directory, DocAddress, DocSet, Document, Index, Postings, Term, TERMINATED, + }; #[test] fn test_hashmap_size() { @@ -469,14 +452,21 @@ mod tests { doc.add_pre_tokenized_text(text_field, pre_tokenized_text); doc.add_text(text_field, "title"); - let prepared_doc = super::prepare_doc_for_store(doc, &schema); - assert_eq!(prepared_doc.field_values().len(), 2); - assert_eq!(prepared_doc.field_values()[0].value().as_text(), Some("A")); - assert_eq!( - prepared_doc.field_values()[1].value().as_text(), - Some("title") - ); + let path = Path::new("store"); + let directory = RamDirectory::create(); + let store_wrt = directory.open_write(path).unwrap(); + + let mut store_writer = StoreWriter::new(store_wrt, Compressor::None, 0, false).unwrap(); + store_writer.store(&doc, &schema).unwrap(); + store_writer.close().unwrap(); + + let reader = StoreReader::open(directory.open_read(path).unwrap(), 0).unwrap(); + let doc = reader.get(0).unwrap(); + + assert_eq!(doc.field_values().len(), 2); + assert_eq!(doc.field_values()[0].value().as_text(), Some("A")); + assert_eq!(doc.field_values()[1].value().as_text(), Some("title")); } #[test] diff --git a/src/schema/document.rs b/src/schema/document.rs index 3bde526b1a..42960931fd 100644 --- a/src/schema/document.rs +++ b/src/schema/document.rs @@ -191,6 +191,34 @@ impl Document { pub fn get_first(&self, field: Field) -> Option<&Value> { self.get_all(field).next() } + + /// Serializes stored field values. + pub fn serialize_stored(&self, schema: &Schema, writer: &mut W) -> io::Result<()> { + let stored_field_values = || { + self.field_values() + .iter() + .filter(|field_value| schema.get_field_entry(field_value.field()).is_stored()) + }; + let num_field_values = stored_field_values().count(); + + VInt(num_field_values as u64).serialize(writer)?; + for field_value in stored_field_values() { + match field_value { + FieldValue { + field, + value: Value::PreTokStr(pre_tokenized_text), + } => { + let field_value = FieldValue { + field: *field, + value: Value::Str(pre_tokenized_text.text.to_string()), + }; + field_value.serialize(writer)?; + } + field_value => field_value.serialize(writer)?, + }; + } + Ok(()) + } } impl BinarySerializable for Document { diff --git a/src/store/mod.rs b/src/store/mod.rs index 73ecf92f9d..ecd2f30b14 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -96,7 +96,7 @@ pub mod tests { let mut doc = Document::default(); doc.add_field_value(field_body, LOREM.to_string()); doc.add_field_value(field_title, format!("Doc {i}")); - store_writer.store(&doc).unwrap(); + store_writer.store(&doc, &schema).unwrap(); } store_writer.close().unwrap(); } diff --git a/src/store/writer.rs b/src/store/writer.rs index c034548f35..1c70df766c 100644 --- a/src/store/writer.rs +++ b/src/store/writer.rs @@ -1,11 +1,11 @@ -use std::io::{self, Write}; +use std::io; use common::BinarySerializable; use super::compressors::Compressor; use super::StoreReader; use crate::directory::WritePtr; -use crate::schema::Document; +use crate::schema::{Document, Schema}; use crate::store::store_compressor::BlockCompressor; use crate::DocId; @@ -99,15 +99,9 @@ impl StoreWriter { /// /// The document id is implicitly the current number /// of documents. - pub fn store(&mut self, stored_document: &Document) -> io::Result<()> { - self.intermediary_buffer.clear(); - stored_document.serialize(&mut self.intermediary_buffer)?; - // calling store bytes would be preferable for code reuse, but then we can't use - // intermediary_buffer due to the borrow checker - // a new buffer costs ~1% indexing performance + pub fn store(&mut self, document: &Document, schema: &Schema) -> io::Result<()> { self.doc_pos.push(self.current_block.len() as u32); - self.current_block - .write_all(&self.intermediary_buffer[..])?; + document.serialize_stored(schema, &mut self.current_block)?; self.num_docs_in_current_block += 1; self.check_flush_block()?; Ok(())