Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow for a same-thread doc compressor. #1510

Merged
merged 1 commit into from
Sep 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 60 additions & 1 deletion src/core/index_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,14 @@ impl InnerSegmentMeta {
}
}

fn return_true() -> bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fn return_true() -> bool {
fn docstore_compress_dedicated_thread() -> bool {

true
}

fn is_true(val: &bool) -> bool {
*val
}

/// Search Index Settings.
///
/// Contains settings which are applied on the whole
Expand All @@ -248,6 +256,12 @@ pub struct IndexSettings {
/// The `Compressor` used to compress the doc store.
#[serde(default)]
pub docstore_compression: Compressor,
/// If set to true, docstore compression will happen on a dedicated thread.
/// (defaults: true)
#[doc(hidden)]
#[serde(default = "return_true")]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#[serde(default = "return_true")]
#[serde(default = "docstore_compress_dedicated_thread")]

#[serde(skip_serializing_if = "is_true")]
pub docstore_compress_dedicated_thread: bool,
#[serde(default = "default_docstore_blocksize")]
/// The size of each block that will be compressed and written to disk
pub docstore_blocksize: usize,
Expand All @@ -264,6 +278,7 @@ impl Default for IndexSettings {
sort_by_field: None,
docstore_compression: Compressor::default(),
docstore_blocksize: default_docstore_blocksize(),
docstore_compress_dedicated_thread: true,
}
}
}
Expand Down Expand Up @@ -395,7 +410,7 @@ mod tests {
use super::IndexMeta;
use crate::core::index_meta::UntrackedIndexMeta;
use crate::schema::{Schema, TEXT};
use crate::store::ZstdCompressor;
use crate::store::{Compressor, ZstdCompressor};
use crate::{IndexSettings, IndexSortByField, Order};

#[test]
Expand Down Expand Up @@ -447,6 +462,7 @@ mod tests {
compression_level: Some(4),
}),
docstore_blocksize: 1_000_000,
docstore_compress_dedicated_thread: true,
},
segments: Vec::new(),
schema,
Expand Down Expand Up @@ -485,4 +501,47 @@ mod tests {
"unknown zstd option \"bla\" at line 1 column 103".to_string()
);
}

#[test]
#[cfg(feature = "lz4-compression")]
fn test_index_settings_default() {
let mut index_settings = IndexSettings::default();
assert_eq!(
index_settings,
IndexSettings {
sort_by_field: None,
docstore_compression: Compressor::default(),
docstore_compress_dedicated_thread: true,
docstore_blocksize: 16_384
}
);
{
let index_settings_json = serde_json::to_value(&index_settings).unwrap();
assert_eq!(
index_settings_json,
serde_json::json!({
"docstore_compression": "lz4",
"docstore_blocksize": 16384
})
);
let index_settings_deser: IndexSettings =
serde_json::from_value(index_settings_json).unwrap();
assert_eq!(index_settings_deser, index_settings);
}
{
index_settings.docstore_compress_dedicated_thread = false;
let index_settings_json = serde_json::to_value(&index_settings).unwrap();
assert_eq!(
index_settings_json,
serde_json::json!({
"docstore_compression": "lz4",
"docstore_blocksize": 16384,
"docstore_compress_dedicated_thread": false,
})
);
let index_settings_deser: IndexSettings =
serde_json::from_value(index_settings_json).unwrap();
assert_eq!(index_settings_deser, index_settings);
}
}
}
11 changes: 8 additions & 3 deletions src/indexer/segment_serializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,16 @@ impl SegmentSerializer {
let fieldnorms_serializer = FieldNormsSerializer::from_write(fieldnorms_write)?;

let postings_serializer = InvertedIndexSerializer::open(&mut segment)?;
let compressor = segment.index().settings().docstore_compression;
let blocksize = segment.index().settings().docstore_blocksize;
let settings = segment.index().settings();
let store_writer = StoreWriter::new(
store_write,
settings.docstore_compression,
settings.docstore_blocksize,
settings.docstore_compress_dedicated_thread,
)?;
Ok(SegmentSerializer {
segment,
store_writer: StoreWriter::new(store_write, compressor, blocksize)?,
store_writer,
fast_field_serializer,
fieldnorms_serializer: Some(fieldnorms_serializer),
postings_serializer,
Expand Down
14 changes: 8 additions & 6 deletions src/indexer/segment_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,12 +380,14 @@ fn remap_and_write(
let store_write = serializer
.segment_mut()
.open_write(SegmentComponent::Store)?;
let compressor = serializer.segment().index().settings().docstore_compression;
let block_size = serializer.segment().index().settings().docstore_blocksize;
let old_store_writer = std::mem::replace(
&mut serializer.store_writer,
StoreWriter::new(store_write, compressor, block_size)?,
);
let settings = serializer.segment().index().settings();
let store_writer = StoreWriter::new(
store_write,
settings.docstore_compression,
settings.docstore_blocksize,
settings.docstore_compress_dedicated_thread,
)?;
let old_store_writer = std::mem::replace(&mut serializer.store_writer, store_writer);
old_store_writer.close()?;
let store_read = StoreReader::open(
serializer
Expand Down
41 changes: 31 additions & 10 deletions src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub use self::decompressors::Decompressor;
pub(crate) use self::reader::DOCSTORE_CACHE_CAPACITY;
pub use self::reader::{CacheStats, StoreReader};
pub use self::writer::StoreWriter;
mod store_compressor;

#[cfg(feature = "lz4-compression")]
mod compression_lz4_block;
Expand Down Expand Up @@ -82,14 +83,16 @@ pub mod tests {
num_docs: usize,
compressor: Compressor,
blocksize: usize,
separate_thread: bool,
) -> Schema {
let mut schema_builder = Schema::builder();
let field_body = schema_builder.add_text_field("body", TextOptions::default().set_stored());
let field_title =
schema_builder.add_text_field("title", TextOptions::default().set_stored());
let schema = schema_builder.build();
{
let mut store_writer = StoreWriter::new(writer, compressor, blocksize).unwrap();
let mut store_writer =
StoreWriter::new(writer, compressor, blocksize, separate_thread).unwrap();
for i in 0..num_docs {
let mut doc = Document::default();
doc.add_field_value(field_body, LOREM.to_string());
Expand All @@ -112,7 +115,8 @@ pub mod tests {
let path = Path::new("store");
let directory = RamDirectory::create();
let store_wrt = directory.open_write(path)?;
let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, Compressor::Lz4, BLOCK_SIZE);
let schema =
write_lorem_ipsum_store(store_wrt, NUM_DOCS, Compressor::Lz4, BLOCK_SIZE, true);
let field_title = schema.get_field("title").unwrap();
let store_file = directory.open_read(path)?;
let store = StoreReader::open(store_file, 10)?;
Expand Down Expand Up @@ -148,11 +152,16 @@ pub mod tests {
Ok(())
}

fn test_store(compressor: Compressor, blocksize: usize) -> crate::Result<()> {
fn test_store(
compressor: Compressor,
blocksize: usize,
separate_thread: bool,
) -> crate::Result<()> {
let path = Path::new("store");
let directory = RamDirectory::create();
let store_wrt = directory.open_write(path)?;
let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, compressor, blocksize);
let schema =
write_lorem_ipsum_store(store_wrt, NUM_DOCS, compressor, blocksize, separate_thread);
let field_title = schema.get_field("title").unwrap();
let store_file = directory.open_read(path)?;
let store = StoreReader::open(store_file, 10)?;
Expand All @@ -177,29 +186,39 @@ pub mod tests {
}

#[test]
fn test_store_noop() -> crate::Result<()> {
test_store(Compressor::None, BLOCK_SIZE)
fn test_store_no_compression_same_thread() -> crate::Result<()> {
test_store(Compressor::None, BLOCK_SIZE, false)
}

#[test]
fn test_store_no_compression() -> crate::Result<()> {
test_store(Compressor::None, BLOCK_SIZE, true)
}

#[cfg(feature = "lz4-compression")]
#[test]
fn test_store_lz4_block() -> crate::Result<()> {
test_store(Compressor::Lz4, BLOCK_SIZE)
test_store(Compressor::Lz4, BLOCK_SIZE, true)
}
#[cfg(feature = "snappy-compression")]
#[test]
fn test_store_snap() -> crate::Result<()> {
test_store(Compressor::Snappy, BLOCK_SIZE)
test_store(Compressor::Snappy, BLOCK_SIZE, true)
}
#[cfg(feature = "brotli-compression")]
#[test]
fn test_store_brotli() -> crate::Result<()> {
test_store(Compressor::Brotli, BLOCK_SIZE)
test_store(Compressor::Brotli, BLOCK_SIZE, true)
}

#[cfg(feature = "zstd-compression")]
#[test]
fn test_store_zstd() -> crate::Result<()> {
test_store(Compressor::Zstd(ZstdCompressor::default()), BLOCK_SIZE)
test_store(
Compressor::Zstd(ZstdCompressor::default()),
BLOCK_SIZE,
true,
)
}

#[test]
Expand Down Expand Up @@ -364,6 +383,7 @@ mod bench {
1_000,
Compressor::default(),
16_384,
true,
);
directory.delete(path).unwrap();
});
Expand All @@ -378,6 +398,7 @@ mod bench {
1_000,
Compressor::default(),
16_384,
true,
);
let store_file = directory.open_read(path).unwrap();
let store = StoreReader::open(store_file, 10).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/store/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ mod tests {
let directory = RamDirectory::create();
let path = Path::new("store");
let writer = directory.open_write(path)?;
let schema = write_lorem_ipsum_store(writer, 500, Compressor::default(), BLOCK_SIZE);
let schema = write_lorem_ipsum_store(writer, 500, Compressor::default(), BLOCK_SIZE, true);
let title = schema.get_field("title").unwrap();
let store_file = directory.open_read(path)?;
let store = StoreReader::open(store_file, DOCSTORE_CACHE_CAPACITY)?;
Expand Down
Loading