Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enable bloom filter index #6639

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
0f346c0
enable bloom filter index
dantengsky Jul 14, 2022
952e381
tweak ut
dantengsky Jul 15, 2022
86d7ccf
rm unused code
dantengsky Jul 19, 2022
d2a7cd2
Merge remote-tracking branch 'origin/main' into feat-bloom-switch-to-…
dantengsky Jul 19, 2022
3020d84
fix bloom filter compile err
dantengsky Jul 19, 2022
f98df53
fix: replace deprecated `parquet_source_builder`
dantengsky Jul 19, 2022
aa97e9f
minor refactor
dantengsky Jul 19, 2022
e41d62a
Merge remote-tracking branch 'origin/main' into feat-bloom-switch-to-…
dantengsky Jul 25, 2022
25f2e36
add instrument
dantengsky Jul 25, 2022
14f0051
Merge remote-tracking branch 'origin/main' into feat-bloom-switch-to-…
dantengsky Jul 25, 2022
482b56e
Merge remote-tracking branch 'origin/main' into feat-bloom-switch-to-…
dantengsky Jul 25, 2022
3182db5
Merge remote-tracking branch 'origin/main' into feat-bloom-switch-to-…
dantengsky Jul 25, 2022
3257e7b
wip: block prunner shortcut
dantengsky Jul 25, 2022
72a43d7
wip: refactoring pruner
dantengsky Jul 25, 2022
60fefd6
Merge remote-tracking branch 'origin/main' into feat-bloom-switch-to-…
dantengsky Jul 25, 2022
c5a134c
Merge remote-tracking branch 'origin/main' into feat-bloom-switch-to-…
dantengsky Jul 26, 2022
3952dc9
remove needless tokio spawn
dantengsky Jul 26, 2022
28e5b96
refactor: block prunner shortcuts
dantengsky Jul 26, 2022
fe18bc0
WIP: lifetime seems ok
dantengsky Jul 27, 2022
16f4685
Separated filters
dantengsky Jul 27, 2022
8d8f507
move predicate cstors to individual mods
dantengsky Jul 27, 2022
02d1724
tidy up
dantengsky Jul 28, 2022
11fa723
add setting for toggling bloom filter
dantengsky Jul 28, 2022
ead1fa0
refacor
dantengsky Jul 28, 2022
5436f79
index cache
dantengsky Jul 29, 2022
e51ecd4
fix ut of settings
dantengsky Jul 29, 2022
4ec7657
use storage runtime
dantengsky Aug 1, 2022
83d7f07
clean up
dantengsky Aug 1, 2022
5b9d1b8
disable compression for bloom filter
dantengsky Aug 1, 2022
9b9d27b
Merge remote-tracking branch 'origin/main' into feat-bloom-switch-to-…
dantengsky Aug 1, 2022
f7b4494
versioned index reader
dantengsky Aug 1, 2022
83569a2
Place index in separate key prefix & populate index size
dantengsky Aug 1, 2022
ef749ab
show index in system.tables
dantengsky Aug 1, 2022
af2085c
fix unit tests
dantengsky Aug 2, 2022
1f60fde
fix stateless test
dantengsky Aug 2, 2022
f864cd2
rebuild index during mutations
dantengsky Aug 2, 2022
4ee34d1
tweak unit tests
dantengsky Aug 2, 2022
38a11eb
refactor accumulator
dantengsky Aug 2, 2022
9225ae4
tweak cache
dantengsky Aug 2, 2022
0e6f61e
try fix sqllogic test
dantengsky Aug 2, 2022
b30a9d3
refine limiter
dantengsky Aug 2, 2022
052a640
refine unit test
dantengsky Aug 2, 2022
b4cbd9a
refine ut
dantengsky Aug 2, 2022
2a80550
count bloom filter index cache by bytes
dantengsky Aug 3, 2022
94e1696
add sqlogictest for bloom filter index
dantengsky Aug 3, 2022
aaf4be8
tidy up
dantengsky Aug 3, 2022
f4c06c5
remove setting "enable_bloom_filter_index"
dantengsky Aug 3, 2022
c3415d7
adjust test cases
dantengsky Aug 3, 2022
5f5c445
Update query/src/storages/fuse/io/read/snapshot_history_reader.rs
dantengsky Aug 3, 2022
84d053f
runtime arrangement tweaks
dantengsky Aug 3, 2022
0edf55f
make lint
dantengsky Aug 3, 2022
f2e0ecb
Merge remote-tracking branch 'origin/main' into feat-bloom-switch-to-…
dantengsky Aug 3, 2022
89ad8ed
bring back sqlogic test
dantengsky Aug 3, 2022
8477721
fix logictest cases
dantengsky Aug 3, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions common/cache/src/meter/bytes_meter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::borrow::Borrow;
use std::sync::Arc;

use super::Meter;
pub struct BytesMeter;
Expand All @@ -24,3 +25,11 @@ impl<K> Meter<K, Vec<u8>> for BytesMeter {
v.len()
}
}

impl<K> Meter<K, Arc<Vec<u8>>> for BytesMeter {
type Measure = usize;
fn measure<Q: ?Sized>(&self, _: &Q, v: &Arc<Vec<u8>>) -> usize
where K: Borrow<Q> {
v.len()
}
}
2 changes: 1 addition & 1 deletion common/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,5 +211,5 @@ pub struct TableStatistics {
pub num_rows: Option<u64>,
pub data_size: Option<u64>,
pub data_size_compressed: Option<u64>,
pub index_length: Option<u64>,
pub index_size: Option<u64>,
}
19 changes: 14 additions & 5 deletions common/datablocks/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,23 @@ use common_arrow::parquet::encoding::Encoding;
use common_arrow::parquet::metadata::ThriftFileMetaData;
use common_arrow::parquet::write::Version;
use common_arrow::write_parquet_file;
use common_datavalues::DataSchemaRef;
use common_datavalues::DataSchema;
use common_exception::ErrorCode;
use common_exception::Result;

use crate::DataBlock;

pub fn serialize_data_blocks(
pub fn serialize_data_blocks_with_compression(
blocks: Vec<DataBlock>,
schema: &DataSchemaRef,
schema: impl AsRef<DataSchema>,
buf: &mut Vec<u8>,
compression: CompressionOptions,
) -> Result<(u64, ThriftFileMetaData)> {
let arrow_schema = schema.to_arrow();
let arrow_schema = schema.as_ref().to_arrow();

let row_group_write_options = WriteOptions {
write_statistics: false,
compression: CompressionOptions::Lz4Raw,
compression,
version: Version::V2,
};
let batches = blocks
Expand Down Expand Up @@ -75,6 +76,14 @@ pub fn serialize_data_blocks(
}
}

pub fn serialize_data_blocks(
blocks: Vec<DataBlock>,
schema: impl AsRef<DataSchema>,
buf: &mut Vec<u8>,
) -> Result<(u64, ThriftFileMetaData)> {
serialize_data_blocks_with_compression(blocks, schema, buf, CompressionOptions::Lz4Raw)
}

fn col_encoding(_data_type: &ArrowDataType) -> Encoding {
// Although encoding does work, parquet2 has not implemented decoding of DeltaLengthByteArray yet, we fallback to Plain
// From parquet2: Decoding "DeltaLengthByteArray"-encoded required V2 pages is not yet implemented for Binary.
Expand Down
1 change: 1 addition & 0 deletions common/fuse-meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ edition = "2021"
[features]

[dependencies]
common-arrow = { path = "../arrow" }
common-base = { path = "../base" }
common-cache = { path = "../cache" }
common-config = { path = "../config" }
Expand Down
46 changes: 40 additions & 6 deletions common/fuse-meta/src/caches/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,26 @@

use common_config::QueryConfig;

use crate::caches::new_memory_cache;
use crate::caches::MemoryCache;
use crate::caches::memory_cache::new_bytes_cache;
use crate::caches::memory_cache::BloomIndexCache;
use crate::caches::memory_cache::BloomIndexMetaCache;
use crate::caches::memory_cache::BytesCache;
use crate::caches::new_item_cache;
use crate::caches::ItemCache;
use crate::caches::SegmentInfoCache;
use crate::caches::TableSnapshotCache;

// default number of index meta cached, default 3000 items
static DEFAULT_BLOOM_INDEX_META_CACHE_ITEMS: u64 = 3000;
// default size of cached bloom filter index (in bytes), 1G
static DEFAULT_BLOOM_INDEX_COLUMN_CACHE_SIZE: u64 = 1024 * 1024 * 1024;

/// Where all the caches reside
pub struct CacheManager {
table_snapshot_cache: Option<TableSnapshotCache>,
segment_info_cache: Option<SegmentInfoCache>,
bloom_index_cache: Option<BloomIndexCache>,
bloom_index_meta_cache: Option<BloomIndexMetaCache>,
cluster_id: String,
tenant_id: String,
}
Expand All @@ -36,15 +47,22 @@ impl CacheManager {
Self {
table_snapshot_cache: None,
segment_info_cache: None,
bloom_index_cache: None,
bloom_index_meta_cache: None,
cluster_id: config.cluster_id.clone(),
tenant_id: config.tenant_id.clone(),
}
} else {
let table_snapshot_cache = Self::with_capacity(config.table_cache_snapshot_count);
let segment_info_cache = Self::with_capacity(config.table_cache_segment_count);
let table_snapshot_cache = Self::new_item_cache(config.table_cache_snapshot_count);
let segment_info_cache = Self::new_item_cache(config.table_cache_segment_count);
let bloom_index_cache = Self::new_bytes_cache(DEFAULT_BLOOM_INDEX_META_CACHE_ITEMS);
let bloom_index_meta_cache =
Self::new_item_cache(DEFAULT_BLOOM_INDEX_COLUMN_CACHE_SIZE);
Self {
table_snapshot_cache,
segment_info_cache,
bloom_index_cache,
bloom_index_meta_cache,
cluster_id: config.cluster_id.clone(),
tenant_id: config.tenant_id.clone(),
}
Expand All @@ -59,6 +77,14 @@ impl CacheManager {
self.segment_info_cache.clone()
}

pub fn get_bloom_index_cache(&self) -> Option<BloomIndexCache> {
self.bloom_index_cache.clone()
}

pub fn get_bloom_index_meta_cache(&self) -> Option<BloomIndexMetaCache> {
self.bloom_index_meta_cache.clone()
}

pub fn get_tenant_id(&self) -> &str {
self.tenant_id.as_str()
}
Expand All @@ -67,9 +93,17 @@ impl CacheManager {
self.cluster_id.as_str()
}

fn with_capacity<T>(capacity: u64) -> Option<MemoryCache<T>> {
fn new_item_cache<T>(capacity: u64) -> Option<ItemCache<T>> {
if capacity > 0 {
Some(new_item_cache(capacity))
} else {
None
}
}

fn new_bytes_cache(capacity: u64) -> Option<BytesCache> {
if capacity > 0 {
Some(new_memory_cache(capacity))
Some(new_bytes_cache(capacity))
} else {
None
}
Expand Down
26 changes: 22 additions & 4 deletions common/fuse-meta/src/caches/memory_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,37 @@

use std::sync::Arc;

use common_arrow::parquet::metadata::FileMetaData;
use common_base::base::tokio::sync::RwLock;
use common_cache::BytesMeter;
use common_cache::Cache;
use common_cache::Count;
use common_cache::DefaultHashBuilder;
use common_cache::LruCache;

use crate::meta::SegmentInfo;
use crate::meta::TableSnapshot;

pub type MemoryCache<V> = Arc<RwLock<LruCache<String, Arc<V>, DefaultHashBuilder, Count>>>;
// cache meters by counting number of items
pub type ItemCache<V> = Arc<RwLock<LruCache<String, Arc<V>, DefaultHashBuilder, Count>>>;

pub fn new_memory_cache<V>(capacity: u64) -> MemoryCache<V> {
// cache meters by bytes
pub type BytesCache = Arc<RwLock<LruCache<String, Arc<Vec<u8>>, DefaultHashBuilder, BytesMeter>>>;

pub fn new_item_cache<V>(capacity: u64) -> ItemCache<V> {
Arc::new(RwLock::new(LruCache::new(capacity)))
}

pub type SegmentInfoCache = MemoryCache<SegmentInfo>;
pub type TableSnapshotCache = MemoryCache<TableSnapshot>;
pub fn new_bytes_cache(capacity: u64) -> BytesCache {
let c = LruCache::with_meter_and_hasher(capacity, BytesMeter, DefaultHashBuilder::new());
Arc::new(RwLock::new(c))
}

pub type SegmentInfoCache = ItemCache<SegmentInfo>;
pub type TableSnapshotCache = ItemCache<TableSnapshot>;
/// Cache bloom filter.
/// For each index block, columns are cached individually.
pub type BloomIndexCache = BytesCache;
/// FileMetaCache of bloom filter index data.
/// Each cache item per block
pub type BloomIndexMetaCache = ItemCache<FileMetaData>;
4 changes: 2 additions & 2 deletions common/fuse-meta/src/caches/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ mod memory_cache;
mod metrics;

pub use cache::CacheManager;
pub use memory_cache::new_memory_cache;
pub use memory_cache::MemoryCache;
pub use memory_cache::new_item_cache;
pub use memory_cache::ItemCache;
pub use memory_cache::SegmentInfoCache;
pub use memory_cache::TableSnapshotCache;

Expand Down
2 changes: 2 additions & 0 deletions common/fuse-meta/src/meta/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ pub struct Statistics {

pub uncompressed_byte_size: u64,
pub compressed_byte_size: u64,
#[serde(default)]
pub index_size: u64,

pub col_stats: HashMap<ColumnId, ColumnStatistics>,
}
Expand Down
1 change: 1 addition & 0 deletions common/fuse-meta/src/meta/current/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

pub use v0::ColumnMeta;
pub use v1::BlockBloomFilterIndex;
pub use v1::BlockMeta;
pub use v1::SegmentInfo;
pub use v1::TableSnapshot;
Expand Down
1 change: 1 addition & 0 deletions common/fuse-meta/src/meta/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,6 @@ pub use common::Statistics;
pub use common::StatisticsOfColumns;
pub use common::Versioned;
pub use current::*;
pub use versions::BlockBloomFilterIndexVersion;
pub use versions::SegmentInfoVersion;
pub use versions::SnapshotVersion;
33 changes: 33 additions & 0 deletions common/fuse-meta/src/meta/v1/index.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use common_datablocks::DataBlock;

// index data of Block, which itself is also a DataBlock.
//
// depends on the query conditions, columns of index data will be loaded on demand.
pub struct BlockBloomFilterIndex {
// Before index mod is extracted from databend-query, we just keep the DataBlock here
data: DataBlock,
}

impl BlockBloomFilterIndex {
pub fn new(data: DataBlock) -> Self {
Self { data }
}

pub fn into_data(self) -> DataBlock {
self.data
}
}
2 changes: 2 additions & 0 deletions common/fuse-meta/src/meta/v1/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod index;
mod segment;
mod snapshot;

pub use index::BlockBloomFilterIndex;
pub use segment::BlockMeta;
pub use segment::SegmentInfo;
pub use snapshot::TableSnapshot;
13 changes: 13 additions & 0 deletions common/fuse-meta/src/meta/v1/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,13 @@ pub struct BlockMeta {
pub col_stats: HashMap<ColumnId, ColumnStatistics>,
pub col_metas: HashMap<ColumnId, ColumnMeta>,
pub cluster_stats: Option<ClusterStatistics>,
/// location of data block
pub location: Location,
/// location of bloom filter index
pub bloom_filter_index_location: Option<Location>,

#[serde(default)]
pub bloom_filter_index_size: u64,

/// Compression algo used to compress the columns of blocks
///
Expand All @@ -61,6 +67,7 @@ pub struct BlockMeta {
}

impl BlockMeta {
#[allow(clippy::too_many_arguments)]
pub fn new(
row_count: u64,
block_size: u64,
Expand All @@ -69,6 +76,8 @@ impl BlockMeta {
col_metas: HashMap<ColumnId, ColumnMeta>,
cluster_stats: Option<ClusterStatistics>,
location: Location,
bloom_filter_index_location: Option<Location>,
bloom_filter_index_size: u64,
) -> Self {
Self {
row_count,
Expand All @@ -78,6 +87,8 @@ impl BlockMeta {
col_metas,
cluster_stats,
location,
bloom_filter_index_location,
bloom_filter_index_size,
compression: Compression::Lz4Raw,
}
}
Expand Down Expand Up @@ -123,6 +134,8 @@ impl From<v0::BlockMeta> for BlockMeta {
col_metas: s.col_metas,
cluster_stats: None,
location: (s.location.path, DataBlock::VERSION),
bloom_filter_index_location: None,
bloom_filter_index_size: 0,
compression: Compression::Lz4,
}
}
Expand Down
Loading