Skip to content

Commit

Permalink
Merge pull request #1374 from kryesh/main
Browse files Browse the repository at this point in the history
Add Zstd compression support, Make block size configurable via IndexSettings
  • Loading branch information
PSeitz authored May 25, 2022
2 parents 1a6a139 + c95013b commit 89e19f1
Show file tree
Hide file tree
Showing 10 changed files with 135 additions and 19 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ jobs:
components: rustfmt, clippy

- name: Run tests
run: cargo +stable test --features mmap,brotli-compression,lz4-compression,snappy-compression,failpoints --verbose --workspace
run: cargo +stable test --features mmap,brotli-compression,lz4-compression,snappy-compression,zstd-compression,failpoints --verbose --workspace

- name: Run tests quickwit feature
run: cargo +stable test --features mmap,quickwit,failpoints --verbose --workspace
Expand Down
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ tantivy-fst = "0.3.0"
memmap2 = { version = "0.5.3", optional = true }
lz4_flex = { version = "0.9.2", default-features = false, features = ["checked-decode"], optional = true }
brotli = { version = "3.3.4", optional = true }
zstd = { version = "0.11", optional = true }
snap = { version = "1.0.5", optional = true }
tempfile = { version = "3.3.0", optional = true }
log = "0.4.16"
Expand Down Expand Up @@ -93,6 +94,7 @@ mmap = ["fs2", "tempfile", "memmap2"]
brotli-compression = ["brotli"]
lz4-compression = ["lz4_flex"]
snappy-compression = ["snap"]
zstd-compression = ["zstd"]

failpoints = ["fail/failpoints"]
unstable = [] # useful for benches.
Expand Down
23 changes: 21 additions & 2 deletions src/core/index_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ impl InnerSegmentMeta {
///
/// Contains settings which are applied on the whole
/// index, like presort documents.
#[derive(Clone, Debug, Default, Serialize, Deserialize, Eq, PartialEq)]
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct IndexSettings {
/// Sorts the documents by information
/// provided in `IndexSortByField`
Expand All @@ -248,7 +248,26 @@ pub struct IndexSettings {
/// The `Compressor` used to compress the doc store.
#[serde(default)]
pub docstore_compression: Compressor,
#[serde(default = "default_docstore_blocksize")]
/// The size of each block that will be compressed and written to disk
pub docstore_blocksize: usize,
}

/// Must be a function to be compatible with serde defaults
fn default_docstore_blocksize() -> usize {
16_384
}

impl Default for IndexSettings {
fn default() -> Self {
Self {
sort_by_field: None,
docstore_compression: Compressor::default(),
docstore_blocksize: default_docstore_blocksize(),
}
}
}

/// Settings to presort the documents in an index
///
/// Presorting documents can greatly performance
Expand Down Expand Up @@ -401,7 +420,7 @@ mod tests {
let json = serde_json::ser::to_string(&index_metas).expect("serialization failed");
assert_eq!(
json,
r#"{"index_settings":{"sort_by_field":{"field":"text","order":"Asc"},"docstore_compression":"lz4"},"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","fieldnorms":true,"tokenizer":"default"},"stored":false,"fast":false}}],"opstamp":0}"#
r#"{"index_settings":{"sort_by_field":{"field":"text","order":"Asc"},"docstore_compression":"lz4","docstore_blocksize":16384},"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","fieldnorms":true,"tokenizer":"default"},"stored":false,"fast":false}}],"opstamp":0}"#
);

let deser_meta: UntrackedIndexMeta = serde_json::from_str(&json).unwrap();
Expand Down
3 changes: 2 additions & 1 deletion src/indexer/segment_serializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@ impl SegmentSerializer {

let postings_serializer = InvertedIndexSerializer::open(&mut segment)?;
let compressor = segment.index().settings().docstore_compression;
let blocksize = segment.index().settings().docstore_blocksize;
Ok(SegmentSerializer {
segment,
store_writer: StoreWriter::new(store_write, compressor),
store_writer: StoreWriter::new(store_write, compressor, blocksize),
fast_field_serializer,
fieldnorms_serializer: Some(fieldnorms_serializer),
postings_serializer,
Expand Down
3 changes: 2 additions & 1 deletion src/indexer/segment_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,9 +372,10 @@ fn remap_and_write(
.segment_mut()
.open_write(SegmentComponent::Store)?;
let compressor = serializer.segment().index().settings().docstore_compression;
let block_size = serializer.segment().index().settings().docstore_blocksize;
let old_store_writer = std::mem::replace(
&mut serializer.store_writer,
StoreWriter::new(store_write, compressor),
StoreWriter::new(store_write, compressor, block_size),
);
old_store_writer.close()?;
let store_read = StoreReader::open(
Expand Down
50 changes: 50 additions & 0 deletions src/store/compression_zstd_block.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use std::io;

use zstd::bulk::{compress_to_buffer, decompress_to_buffer};
use zstd::DEFAULT_COMPRESSION_LEVEL;

#[inline]
pub fn compress(uncompressed: &[u8], compressed: &mut Vec<u8>) -> io::Result<()> {
let count_size = std::mem::size_of::<u32>();
let max_size = zstd::zstd_safe::compress_bound(uncompressed.len()) + count_size;

compressed.clear();
compressed.resize(max_size, 0);

let compressed_size = compress_to_buffer(
uncompressed,
&mut compressed[count_size..],
DEFAULT_COMPRESSION_LEVEL,
)?;

compressed[0..count_size].copy_from_slice(&(uncompressed.len() as u32).to_le_bytes());
compressed.resize(compressed_size + count_size, 0);

Ok(())
}

#[inline]
pub fn decompress(compressed: &[u8], decompressed: &mut Vec<u8>) -> io::Result<()> {
let count_size = std::mem::size_of::<u32>();
let uncompressed_size = u32::from_le_bytes(
compressed
.get(..count_size)
.ok_or(io::ErrorKind::InvalidData)?
.try_into()
.unwrap(),
) as usize;

decompressed.clear();
decompressed.resize(uncompressed_size, 0);

let decompressed_size = decompress_to_buffer(&compressed[count_size..], decompressed)?;

if decompressed_size != uncompressed_size {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"doc store block not completely decompressed, data corruption".to_string(),
));
}

Ok(())
}
27 changes: 27 additions & 0 deletions src/store/compressors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ pub enum Compressor {
#[serde(rename = "snappy")]
/// Use the snap compressor
Snappy,
#[serde(rename = "zstd")]
/// Use the zstd compressor
Zstd,
}

impl Default for Compressor {
Expand All @@ -36,6 +39,8 @@ impl Default for Compressor {
Compressor::Brotli
} else if cfg!(feature = "snappy-compression") {
Compressor::Snappy
} else if cfg!(feature = "zstd-compression") {
Compressor::Zstd
} else {
Compressor::None
}
Expand All @@ -49,6 +54,7 @@ impl Compressor {
1 => Compressor::Lz4,
2 => Compressor::Brotli,
3 => Compressor::Snappy,
4 => Compressor::Zstd,
_ => panic!("unknown compressor id {:?}", id),
}
}
Expand All @@ -58,6 +64,7 @@ impl Compressor {
Self::Lz4 => 1,
Self::Brotli => 2,
Self::Snappy => 3,
Self::Zstd => 4,
}
}
#[inline]
Expand Down Expand Up @@ -98,6 +105,16 @@ impl Compressor {
panic!("snappy-compression feature flag not activated");
}
}
Self::Zstd => {
#[cfg(feature = "zstd-compression")]
{
super::compression_zstd_block::compress(uncompressed, compressed)
}
#[cfg(not(feature = "zstd-compression"))]
{
panic!("zstd-compression feature flag not activated");
}
}
}
}

Expand Down Expand Up @@ -143,6 +160,16 @@ impl Compressor {
panic!("snappy-compression feature flag not activated");
}
}
Self::Zstd => {
#[cfg(feature = "zstd-compression")]
{
super::compression_zstd_block::decompress(compressed, decompressed)
}
#[cfg(not(feature = "zstd-compression"))]
{
panic!("zstd-compression feature flag not activated");
}
}
}
}
}
30 changes: 22 additions & 8 deletions src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ mod compression_brotli;
#[cfg(feature = "snappy-compression")]
mod compression_snap;

#[cfg(feature = "zstd-compression")]
mod compression_zstd_block;

#[cfg(test)]
pub mod tests {

Expand All @@ -69,18 +72,21 @@ pub mod tests {
sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt \
mollit anim id est laborum.";

const BLOCK_SIZE: usize = 16_384;

pub fn write_lorem_ipsum_store(
writer: WritePtr,
num_docs: usize,
compressor: Compressor,
blocksize: usize,
) -> Schema {
let mut schema_builder = Schema::builder();
let field_body = schema_builder.add_text_field("body", TextOptions::default().set_stored());
let field_title =
schema_builder.add_text_field("title", TextOptions::default().set_stored());
let schema = schema_builder.build();
{
let mut store_writer = StoreWriter::new(writer, compressor);
let mut store_writer = StoreWriter::new(writer, compressor, blocksize);
for i in 0..num_docs {
let mut doc = Document::default();
doc.add_field_value(field_body, LOREM.to_string());
Expand All @@ -103,7 +109,7 @@ pub mod tests {
let path = Path::new("store");
let directory = RamDirectory::create();
let store_wrt = directory.open_write(path)?;
let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, Compressor::Lz4);
let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, Compressor::Lz4, BLOCK_SIZE);
let field_title = schema.get_field("title").unwrap();
let store_file = directory.open_read(path)?;
let store = StoreReader::open(store_file)?;
Expand Down Expand Up @@ -139,11 +145,11 @@ pub mod tests {
Ok(())
}

fn test_store(compressor: Compressor) -> crate::Result<()> {
fn test_store(compressor: Compressor, blocksize: usize) -> crate::Result<()> {
let path = Path::new("store");
let directory = RamDirectory::create();
let store_wrt = directory.open_write(path)?;
let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, compressor);
let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, compressor, blocksize);
let field_title = schema.get_field("title").unwrap();
let store_file = directory.open_read(path)?;
let store = StoreReader::open(store_file)?;
Expand All @@ -169,22 +175,28 @@ pub mod tests {

#[test]
fn test_store_noop() -> crate::Result<()> {
test_store(Compressor::None)
test_store(Compressor::None, BLOCK_SIZE)
}
#[cfg(feature = "lz4-compression")]
#[test]
fn test_store_lz4_block() -> crate::Result<()> {
test_store(Compressor::Lz4)
test_store(Compressor::Lz4, BLOCK_SIZE)
}
#[cfg(feature = "snappy-compression")]
#[test]
fn test_store_snap() -> crate::Result<()> {
test_store(Compressor::Snappy)
test_store(Compressor::Snappy, BLOCK_SIZE)
}
#[cfg(feature = "brotli-compression")]
#[test]
fn test_store_brotli() -> crate::Result<()> {
test_store(Compressor::Brotli)
test_store(Compressor::Brotli, BLOCK_SIZE)
}

#[cfg(feature = "zstd-compression")]
#[test]
fn test_store_zstd() -> crate::Result<()> {
test_store(Compressor::Zstd, BLOCK_SIZE)
}

#[test]
Expand Down Expand Up @@ -348,6 +360,7 @@ mod bench {
directory.open_write(path).unwrap(),
1_000,
Compressor::default(),
16_384,
);
directory.delete(path).unwrap();
});
Expand All @@ -361,6 +374,7 @@ mod bench {
directory.open_write(path).unwrap(),
1_000,
Compressor::default(),
16_384,
);
let store_file = directory.open_read(path).unwrap();
let store = StoreReader::open(store_file).unwrap();
Expand Down
4 changes: 3 additions & 1 deletion src/store/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,8 @@ mod tests {
use crate::store::tests::write_lorem_ipsum_store;
use crate::Directory;

const BLOCK_SIZE: usize = 16_384;

fn get_text_field<'a>(doc: &'a Document, field: &'a Field) -> Option<&'a str> {
doc.get_first(*field).and_then(|f| f.as_text())
}
Expand All @@ -313,7 +315,7 @@ mod tests {
let directory = RamDirectory::create();
let path = Path::new("store");
let writer = directory.open_write(path)?;
let schema = write_lorem_ipsum_store(writer, 500, Compressor::default());
let schema = write_lorem_ipsum_store(writer, 500, Compressor::default(), BLOCK_SIZE);
let title = schema.get_field("title").unwrap();
let store_file = directory.open_read(path)?;
let store = StoreReader::open(store_file)?;
Expand Down
10 changes: 5 additions & 5 deletions src/store/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ use crate::schema::Document;
use crate::store::index::Checkpoint;
use crate::DocId;

const BLOCK_SIZE: usize = 16_384;

/// Write tantivy's [`Store`](./index.html)
///
/// Contrary to the other components of `tantivy`,
Expand All @@ -22,6 +20,7 @@ const BLOCK_SIZE: usize = 16_384;
/// The skip list index on the other hand, is built in memory.
pub struct StoreWriter {
compressor: Compressor,
block_size: usize,
doc: DocId,
first_doc_in_block: DocId,
offset_index_writer: SkipIndexBuilder,
Expand All @@ -35,9 +34,10 @@ impl StoreWriter {
///
/// The store writer will writes blocks on disc as
/// document are added.
pub fn new(writer: WritePtr, compressor: Compressor) -> StoreWriter {
pub fn new(writer: WritePtr, compressor: Compressor, block_size: usize) -> StoreWriter {
StoreWriter {
compressor,
block_size,
doc: 0,
first_doc_in_block: 0,
offset_index_writer: SkipIndexBuilder::new(),
Expand Down Expand Up @@ -65,7 +65,7 @@ impl StoreWriter {
VInt(doc_num_bytes as u64).serialize(&mut self.current_block)?;
self.current_block.write_all(serialized_document)?;
self.doc += 1;
if self.current_block.len() > BLOCK_SIZE {
if self.current_block.len() > self.block_size {
self.write_and_compress_block()?;
}
Ok(())
Expand All @@ -86,7 +86,7 @@ impl StoreWriter {
self.current_block
.write_all(&self.intermediary_buffer[..])?;
self.doc += 1;
if self.current_block.len() > BLOCK_SIZE {
if self.current_block.len() > self.block_size {
self.write_and_compress_block()?;
}
Ok(())
Expand Down

0 comments on commit 89e19f1

Please sign in to comment.