diff --git a/ydb/core/kqp/ut/olap/indexes_ut.cpp b/ydb/core/kqp/ut/olap/indexes_ut.cpp index ec03987adfff..75728f06f975 100644 --- a/ydb/core/kqp/ut/olap/indexes_ut.cpp +++ b/ydb/core/kqp/ut/olap/indexes_ut.cpp @@ -302,9 +302,8 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) { } AFL_VERIFY(updatesCount + 6 == - (ui64)csController->GetActualizationRefreshSchemeCount().Val())( - "updates", updatesCount)("count", - csController->GetActualizationRefreshSchemeCount().Val()); + (ui64)csController->GetActualizationRefreshSchemeCount().Val())("updates", updatesCount)( + "count", csController->GetActualizationRefreshSchemeCount().Val()); } class TTestIndexesScenario { @@ -340,6 +339,17 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) { auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString()); } + { + auto alterQuery = + TStringBuilder() << Sprintf( + R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_ngramm_uid, TYPE=BLOOM_NGRAMM_FILTER, + FEATURES=`{"column_name" : "resource_id", "ngramm_size" : 3, "hashes_count" : 2, "filter_size_bytes" : 64024}`); + )", + StorageId.data()); + auto session = tableClient.CreateSession().GetValueSync().GetSession(); + auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString()); + } { auto alterQuery = TStringBuilder() << Sprintf( @@ -416,6 +426,27 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) { // important checker for control compactions (<=21) and control indexes constructed (>=21) AFL_VERIFY(csController->GetCompactionStartedCounter().Val() == 21)("count", csController->GetCompactionStartedCounter().Val()); + { + auto it = tableClient + .StreamExecuteScanQuery(R"( + --!syntax_v1 + + SELECT + COUNT(*) + FROM `/Root/olapStore/olapTable` + WHERE resource_id LIKE '%110a151%' + )") + .GetValueSync(); + + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + TString result = StreamResultToYson(it); + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("result", result); + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("skip", csController->GetIndexesSkippingOnSelect().Val())( + "check", csController->GetIndexesApprovedOnSelect().Val()); + CompareYson(result, R"([[0u;]])"); + AFL_VERIFY(!csController->GetIndexesApprovedOnSelect().Val()); + AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val()); + } { auto it = tableClient .StreamExecuteScanQuery(R"( @@ -441,30 +472,56 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) { } AFL_VERIFY(csController->GetIndexesApprovedOnSelect().Val() < csController->GetIndexesSkippingOnSelect().Val()); } - ui32 requestsCount = 100; - for (ui32 i = 0; i < requestsCount; ++i) { - const ui32 idx = RandomNumber(uids.size()); - const auto query = [](const TString& res, const TString& uid, const ui32 level) { - TStringBuilder sb; - sb << "SELECT COUNT(*) FROM `/Root/olapStore/olapTable`" << Endl; - sb << "WHERE(" << Endl; - sb << "resource_id = '" << res << "' AND" << Endl; - sb << "uid= '" << uid << "' AND" << Endl; - sb << "level= " << level << Endl; - sb << ")"; - return sb; - }; - auto it = tableClient.StreamExecuteScanQuery(query(resourceIds[idx], uids[idx], levels[idx])).GetValueSync(); - - UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); - TString result = StreamResultToYson(it); - Cout << csController->GetIndexesSkippingOnSelect().Val() << " / " << csController->GetIndexesApprovedOnSelect().Val() << " / " - << csController->GetIndexesSkippedNoData().Val() << Endl; - CompareYson(result, R"([[1u;]])"); + { + ui32 requestsCount = 100; + for (ui32 i = 0; i < requestsCount; ++i) { + const ui32 idx = RandomNumber(uids.size()); + const auto query = [](const TString& res, const TString& uid, const ui32 level) { + TStringBuilder sb; + sb << "SELECT COUNT(*) FROM `/Root/olapStore/olapTable`" << Endl; + sb << "WHERE(" << Endl; + sb << "resource_id = '" << res << "' AND" << Endl; + sb << "uid= '" << uid << "' AND" << Endl; + sb << "level= " << level << Endl; + sb << ")"; + return sb; + }; + auto it = tableClient.StreamExecuteScanQuery(query(resourceIds[idx], uids[idx], levels[idx])).GetValueSync(); + + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + TString result = StreamResultToYson(it); + Cout << csController->GetIndexesSkippingOnSelect().Val() << " / " << csController->GetIndexesApprovedOnSelect().Val() + << " / " << csController->GetIndexesSkippedNoData().Val() << Endl; + CompareYson(result, R"([[1u;]])"); + } + AFL_VERIFY(csController->GetIndexesApprovedOnSelect().Val() * 5 < csController->GetIndexesSkippingOnSelect().Val()) + ("approved", csController->GetIndexesApprovedOnSelect().Val())("skipped", csController->GetIndexesSkippingOnSelect().Val()); + } + { + const ui64 skipStart = csController->GetIndexesSkippingOnSelect().Val(); + const ui64 approveStart = csController->GetIndexesApprovedOnSelect().Val(); + ui32 requestsCount = 100; + for (ui32 i = 0; i < requestsCount; ++i) { + const ui32 idx = RandomNumber(uids.size()); + const auto query = [](const TString& res, const TString& uid, const ui32 level) { + TStringBuilder sb; + sb << "SELECT COUNT(*) FROM `/Root/olapStore/olapTable`" << Endl; + sb << "WHERE" << Endl; + sb << "resource_id LIKE '%" << res << "%'" << Endl; + return sb; + }; + auto it = tableClient.StreamExecuteScanQuery(query(resourceIds[idx], uids[idx], levels[idx])).GetValueSync(); + + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + TString result = StreamResultToYson(it); + Cout << resourceIds[idx] << "/" << csController->GetIndexesSkippingOnSelect().Val() - skipStart << " / " + << csController->GetIndexesApprovedOnSelect().Val() - approveStart << " / " + << csController->GetIndexesSkippedNoData().Val() << Endl; + CompareYson(result, R"([[1u;]])"); + } + AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val() - skipStart > 1)("approved", csController->GetIndexesApprovedOnSelect().Val() - approveStart)( + "skipped", csController->GetIndexesSkippingOnSelect().Val() - skipStart); } - - AFL_VERIFY(csController->GetIndexesApprovedOnSelect().Val() * 5 < csController->GetIndexesSkippingOnSelect().Val()) - ("approved", csController->GetIndexesApprovedOnSelect().Val())("skipped", csController->GetIndexesSkippingOnSelect().Val()); } }; diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index 72151d9d6d6b..4c52ff262e8d 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -391,6 +391,13 @@ message TRequestedBloomFilter { repeated string ColumnNames = 3; } +message TRequestedBloomNGrammFilter { + optional uint32 NGrammSize = 1; + optional uint32 FilterSizeBytes = 2; + optional uint32 HashesCount = 3; + optional string ColumnName = 4; +} + message TRequestedMaxIndex { optional string ColumnName = 1; } @@ -410,6 +417,7 @@ message TOlapIndexRequested { TRequestedBloomFilter BloomFilter = 40; TRequestedMaxIndex MaxIndex = 41; TRequestedCountMinSketch CountMinSketch = 42; + TRequestedBloomNGrammFilter BloomNGrammFilter = 43; } } @@ -419,6 +427,13 @@ message TBloomFilter { repeated uint32 ColumnIds = 3; } +message TBloomNGrammFilter { + optional uint32 NGrammSize = 1; + optional uint32 FilterSizeBytes = 2; + optional uint32 HashesCount = 3; + optional uint32 ColumnId = 4; +} + message TMaxIndex { optional uint32 ColumnId = 1; } @@ -441,6 +456,7 @@ message TOlapIndexDescription { TBloomFilter BloomFilter = 41; TMaxIndex MaxIndex = 42; TCountMinSketch CountMinSketch = 43; + TBloomNGrammFilter BloomNGrammFilter = 44; } } diff --git a/ydb/core/protos/ssa.proto b/ydb/core/protos/ssa.proto deleted file mode 100644 index 5ffbf067b33d..000000000000 --- a/ydb/core/protos/ssa.proto +++ /dev/null @@ -1,209 +0,0 @@ -package NKikimrSSA; -option java_package = "ru.yandex.kikimr.proto"; - -// Program to pushdown to ColumnShard -// -// > 'SELECT y, z WHERE x > 10' -// PROJECTION x, y, z -// ASSIGN tmp = x > 10 -// FILTER BY tmp -// PROJECTION y, z -// -// > 'SELECT min(x), sum(y) GROUP BY z' -// PROJECTION x, y, z -// ASSIGN agg1 = min(x) -// ASSIGN agg2 = sum(y) -// GROUP BY z -// PROJECTION agg1, agg2 -// -message TProgram { - message TColumn { - optional uint64 Id = 1; - optional string Name = 2; - } - - message TConstant { - oneof value { - bool Bool = 1; - int32 Int32 = 2; - uint32 Uint32 = 3; - int64 Int64 = 4; - uint64 Uint64 = 5; - float Float = 6; - double Double = 7; - bytes Bytes = 8; - string Text = 9; - int32 Int8 = 10; - uint32 Uint8 = 11; - int32 Int16 = 12; - uint32 Uint16 = 13; - uint64 Timestamp = 14; - } - } - - message TBloomFilterChecker { - repeated uint64 HashValues = 1; - } - - message TOlapIndexChecker { - optional uint32 IndexId = 1; - optional string ClassName = 2; - - message TCompositeChecker { - repeated TOlapIndexChecker ChildrenCheckers = 1; - } - - oneof Implementation { - TBloomFilterChecker BloomFilter = 40; - TCompositeChecker Composite = 41; - } - } - - message TParameter { - optional string Name = 1; - } - - enum EFunctionType { - SIMPLE_ARROW = 1; - YQL_KERNEL = 2; - } - - message TAssignment { - enum EFunction { - FUNC_UNSPECIFIED = 0; - FUNC_CMP_EQUAL = 1; - FUNC_CMP_NOT_EQUAL = 2; - FUNC_CMP_LESS = 3; - FUNC_CMP_LESS_EQUAL = 4; - FUNC_CMP_GREATER = 5; - FUNC_CMP_GREATER_EQUAL = 6; - FUNC_IS_NULL = 7; - FUNC_STR_LENGTH = 8; - FUNC_STR_MATCH = 9; - FUNC_BINARY_NOT = 10; - FUNC_BINARY_AND = 11; - FUNC_BINARY_OR = 12; - FUNC_BINARY_XOR = 13; - FUNC_MATH_ADD = 14; - FUNC_MATH_SUBTRACT = 15; - FUNC_MATH_MULTIPLY = 16; - FUNC_MATH_DIVIDE = 17; - FUNC_CAST_TO_BOOLEAN = 18; - FUNC_CAST_TO_INT8 = 19; - FUNC_CAST_TO_INT16 = 20; - FUNC_CAST_TO_INT32 = 21; - FUNC_CAST_TO_INT64 = 22; - FUNC_CAST_TO_UINT8 = 23; - FUNC_CAST_TO_UINT16 = 24; - FUNC_CAST_TO_UINT32 = 25; - FUNC_CAST_TO_UINT64 = 26; - FUNC_CAST_TO_FLOAT = 27; - FUNC_CAST_TO_DOUBLE = 28; - FUNC_CAST_TO_BINARY = 29; - FUNC_CAST_TO_FIXED_SIZE_BINARY = 30; - FUNC_CAST_TO_TIMESTAMP = 31; - FUNC_STR_MATCH_LIKE = 32; - FUNC_STR_STARTS_WITH = 33; - FUNC_STR_ENDS_WITH = 34; - FUNC_STR_MATCH_IGNORE_CASE = 35; - FUNC_STR_STARTS_WITH_IGNORE_CASE = 36; - FUNC_STR_ENDS_WITH_IGNORE_CASE = 37; - } - - message TFunction { - optional uint32 Id = 1; // EFunction - repeated TColumn Arguments = 2; - optional EFunctionType FunctionType = 3 [ default = SIMPLE_ARROW ]; - optional uint32 KernelIdx = 4; - optional uint32 YqlOperationId = 5; // TKernelRequestBuilder::EBinaryOp - } - - message TExternalFunction { - optional string Name = 1; - repeated TColumn Arguments = 2; - } - - optional TColumn Column = 1; - oneof expression { - TFunction Function = 2; - TExternalFunction ExternalFunction = 3; - TConstant Constant = 4; - bool Null = 5; - TParameter Parameter = 6; - } - } - - message TAggregateAssignment { - enum EAggregateFunction { - AGG_UNSPECIFIED = 0; - AGG_SOME = 1; - AGG_COUNT = 2; - AGG_MIN = 3; - AGG_MAX = 4; - AGG_SUM = 5; - //AGG_AVG = 6; - //AGG_VAR = 7; - //AGG_COVAR = 8; - //AGG_STDDEV = 9; - //AGG_CORR = 10; - //AGG_ARG_MIN = 11; - //AGG_ARG_MAX = 12; - //AGG_COUNT_DISTINCT = 13; - //AGG_QUANTILES = 14; - //AGG_TOP_COUNT = 15; - //AGG_TOP_SUM = 16; - } - - message TAggregateFunction { - optional uint32 Id = 1; // EAggregateFunction - repeated TColumn Arguments = 2; - optional string Variant = 3; // i.e. POP/SAMP for AGG_VAR, AGG_COVAR, AGG_STDDEV - optional EFunctionType FunctionType = 4 [ default = SIMPLE_ARROW ]; - optional uint32 KernelIdx = 5; - // TODO: Parameters, i.e. N for topK(N)(arg) - } - - optional TColumn Column = 1; - optional TAggregateFunction Function = 2; - } - - message TProjection { - repeated TColumn Columns = 1; - } - - message TFilter { - // Predicate should be a bool column: - // true - keep the row - // false - remove the row - optional TColumn Predicate = 1; - } - - message TGroupBy { - repeated TAggregateAssignment Aggregates = 1; - repeated TColumn KeyColumns = 2; - } - - message TCommand { - oneof line { - TAssignment Assign = 1; - TProjection Projection = 2; - TFilter Filter = 3; - TGroupBy GroupBy = 4; - // TODO: ORDER BY, LIMIT - } - } - - repeated TCommand Command = 1; - optional uint32 Version = 2; - optional bytes Kernels = 3; -} - -message TOlapProgram { - // Store OLAP program in serialized format in case we do not need to deserialize it in TScanTaskMeta - // Note: when this message exists the program must be present. - optional bytes Program = 1; - // RecordBatch deserialization require arrow::Schema, thus store it here - optional bytes ParametersSchema = 2; - optional bytes Parameters = 3; - optional TProgram.TOlapIndexChecker IndexChecker = 4; -} diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/program.cpp b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/program.cpp index 8b62f7890c32..2ae1a996ee04 100644 --- a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/program.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/program.cpp @@ -235,8 +235,10 @@ class TOriginalColumn: public IRequestNode { class TPackAnd: public IRequestNode { private: using TBase = IRequestNode; - THashMap> Conditions; + THashMap> Equals; + THashMap Likes; bool IsEmptyFlag = false; + protected: virtual bool DoCollapse() override { return false; @@ -247,10 +249,19 @@ class TPackAnd: public IRequestNode { if (IsEmptyFlag) { result.InsertValue("empty", true); } - auto& arrJson = result.InsertValue("conditions", NJson::JSON_ARRAY); - for (auto&& i : Conditions) { - auto& jsonCondition = arrJson.AppendValue(NJson::JSON_MAP); - jsonCondition.InsertValue(i.first, i.second->ToString()); + { + auto& arrJson = result.InsertValue("equals", NJson::JSON_ARRAY); + for (auto&& i : Equals) { + auto& jsonCondition = arrJson.AppendValue(NJson::JSON_MAP); + jsonCondition.InsertValue(i.first, i.second->ToString()); + } + } + { + auto& arrJson = result.InsertValue("likes", NJson::JSON_ARRAY); + for (auto&& i : Likes) { + auto& jsonCondition = arrJson.AppendValue(NJson::JSON_MAP); + jsonCondition.InsertValue(i.first, i.second.ToString()); + } } return result; } @@ -259,32 +270,53 @@ class TPackAnd: public IRequestNode { } public: TPackAnd(const TPackAnd&) = default; + TPackAnd(const TString& cName, const std::shared_ptr& value) : TBase(GetNextId("PackAnd")) { - AddCondition(cName, value); + AddEqual(cName, value); + } + + TPackAnd(const TString& cName, const TString& likeMap) + : TBase(GetNextId("PackAnd")) { + AddLike(cName, TLikeDescription(likeMap)); } const THashMap>& GetEquals() const { - return Conditions; + return Equals; + } + + const THashMap& GetLikes() const { + return Likes; } bool IsEmpty() const { return IsEmptyFlag; } - void AddCondition(const TString& cName, const std::shared_ptr& value) { + void AddEqual(const TString& cName, const std::shared_ptr& value) { AFL_VERIFY(value); - auto it = Conditions.find(cName); - if (it == Conditions.end()) { - Conditions.emplace(cName, value); + auto it = Equals.find(cName); + if (it == Equals.end()) { + Equals.emplace(cName, value); } else if (it->second->Equals(*value)) { return; } else { IsEmptyFlag = true; } } + void AddLike(const TString& cName, const TLikeDescription& value) { + auto it = Likes.find(cName); + if (it == Likes.end()) { + Likes.emplace(cName, value); + } else { + it->second.Merge(value); + } + } void Merge(const TPackAnd& add) { - for (auto&& i : add.Conditions) { - AddCondition(i.first, i.second); + for (auto&& i : add.Equals) { + AddEqual(i.first, i.second); + } + for (auto&& i : add.Likes) { + AddLike(i.first, i.second); } } }; @@ -313,6 +345,15 @@ class TOperationNode: public IRequestNode { Parent->Exchange(GetNodeName(), std::make_shared(Children[0]->As()->GetColumnName(), Children[1]->As()->GetConstant())); return true; } + if (Operation == NYql::TKernelRequestBuilder::EBinaryOp::StringContains && Children.size() == 2 && Children[1]->Is() && + Children[0]->Is()) { + auto scalar = Children[1]->As()->GetConstant(); + AFL_VERIFY(scalar->type->id() == arrow::binary()->id()); + auto scalarString = static_pointer_cast(scalar); + TString likeMap((const char*)scalarString->value->data(), scalarString->value->size()); + Parent->Exchange(GetNodeName(), std::make_shared(Children[0]->As()->GetColumnName(), likeMap)); + return true; + } if (Operation == NYql::TKernelRequestBuilder::EBinaryOp::And) { if (Parent->Is() && Parent->As()->Operation == NYql::TKernelRequestBuilder::EBinaryOp::And) { Parent->Attach(Children); @@ -470,14 +511,14 @@ std::shared_ptr TDataForIndexesCheckers::Build(const TP if (orNode->GetOperation() == NYql::TKernelRequestBuilder::EBinaryOp::Or) { for (auto&& i : orNode->GetChildren()) { if (auto* andPackNode = i->As()) { - result->AddBranch(andPackNode->GetEquals()); + result->AddBranch(andPackNode->GetEquals(), andPackNode->GetLikes()); } else if (auto* operationNode = i->As()) { if (operationNode->GetOperation() == NYql::TKernelRequestBuilder::EBinaryOp::And) { TPackAnd* pack = operationNode->FindFirst(); if (!pack) { return nullptr; } - result->AddBranch(pack->GetEquals()); + result->AddBranch(pack->GetEquals(), pack->GetLikes()); } } else { return nullptr; @@ -485,7 +526,7 @@ std::shared_ptr TDataForIndexesCheckers::Build(const TP } } } else if (auto* andPackNode = rootNode->GetChildren().front()->As()) { - result->AddBranch(andPackNode->GetEquals()); + result->AddBranch(andPackNode->GetEquals(), andPackNode->GetLikes()); } else { return nullptr; } diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/program.h b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/program.h index 898c4210b035..4fabff3bede8 100644 --- a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/program.h +++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/program.h @@ -3,13 +3,50 @@ namespace NKikimr::NOlap::NIndexes::NRequest { +class TLikeDescription { +private: + THashMap> LikeSequences; + +public: + TLikeDescription(const TString& likeEquation) { + LikeSequences.emplace(likeEquation, StringSplitter(likeEquation).Split('%').ToList()); + + } + + const THashMap>& GetLikeSequences() const { + return LikeSequences; + } + + void Merge(const TLikeDescription& d) { + for (auto&& i : d.LikeSequences) { + LikeSequences.emplace(i.first, i.second); + } + } + + TString ToString() const { + TStringBuilder sb; + for (auto&& i : LikeSequences) { + sb << "{" << i.first << ":["; + for (auto&& s: i.second) { + sb << s << ","; + } + sb << "];}"; + } + return sb; + } +}; + class TBranchCoverage { private: THashMap> Equals; + THashMap Likes; YDB_ACCESSOR_DEF(std::vector>, Indexes); + public: - TBranchCoverage(const THashMap>& equals) + TBranchCoverage( + const THashMap>& equals, const THashMap& likes) : Equals(equals) + , Likes(likes) { } @@ -18,6 +55,10 @@ class TBranchCoverage { return Equals; } + const THashMap& GetLikes() const { + return Likes; + } + std::shared_ptr GetAndChecker() const; }; @@ -25,8 +66,8 @@ class TDataForIndexesCheckers { private: YDB_READONLY_DEF(std::vector>, Branches); public: - void AddBranch(const THashMap>& equalsData) { - Branches.emplace_back(std::make_shared(equalsData)); + void AddBranch(const THashMap>& equalsData, const THashMap& likesData) { + Branches.emplace_back(std::make_shared(equalsData, likesData)); } static std::shared_ptr Build(const TProgramContainer& program); diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom/meta.cpp b/ydb/core/tx/columnshard/engines/storage/indexes/bloom/meta.cpp index a2d84cb10f6d..e3e3cf7d4281 100644 --- a/ydb/core/tx/columnshard/engines/storage/indexes/bloom/meta.cpp +++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom/meta.cpp @@ -55,13 +55,11 @@ void TBloomIndexMeta::DoFillIndexCheckers(const std::shared_ptrMutableIndexes().emplace_back(std::make_shared(GetIndexId(), std::move(hashes))); } } diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/checker.cpp b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/checker.cpp new file mode 100644 index 000000000000..7243a19ab996 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/checker.cpp @@ -0,0 +1,50 @@ +#include "checker.h" + +#include + +#include + +#include +#include + +namespace NKikimr::NOlap::NIndexes::NBloomNGramm { + +void TFilterChecker::DoSerializeToProtoImpl(NKikimrSSA::TProgram::TOlapIndexChecker& proto) const { + for (auto&& i : HashValues) { + proto.MutableBloomNGrammFilter()->AddHashValues(i); + } +} + +bool TFilterChecker::DoCheckImpl(const std::vector& blobs) const { + AFL_VERIFY(blobs.size() == 1); + for (auto&& blob : blobs) { + TFixStringBitsStorage bits(blob); + bool found = true; + for (auto&& i : HashValues) { + if (!bits.Get(i % bits.GetSizeBits())) { + found = false; + break; + } + } + if (found) { + // AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("size", bArray.length())("data", bArray.ToString())("index_id", GetIndexId()); + return true; + } + } + return false; +} + +bool TFilterChecker::DoDeserializeFromProtoImpl(const NKikimrSSA::TProgram::TOlapIndexChecker& proto) { + if (!proto.HasBloomNGrammFilter()) { + return false; + } + for (auto&& i : proto.GetBloomNGrammFilter().GetHashValues()) { + HashValues.emplace(i); + } + if (HashValues.empty()) { + return false; + } + return true; +} + +} // namespace NKikimr::NOlap::NIndexes::NBloomNGramm diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/checker.h b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/checker.h new file mode 100644 index 000000000000..4c1f3e5d9f4f --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/checker.h @@ -0,0 +1,69 @@ +#pragma once +#include +namespace NKikimr::NOlap::NIndexes::NBloomNGramm { + +class TFixStringBitsStorage { +private: + YDB_READONLY_DEF(TString, Data); + +public: + TFixStringBitsStorage(const TString& data) + : Data(data) { + } + + ui32 GetSizeBits() const { + return Data.size() * 8; + } + + TFixStringBitsStorage(const ui32 sizeBits) + : Data(sizeBits / 8 + ((sizeBits % 8) ? 1 : 0), '\0') { + } + + void Set(const bool val, const ui32 idx) { + AFL_VERIFY(idx < GetSizeBits()); + auto* start = &Data[idx / 8]; + ui8 word = (*(ui8*)start); + if (val) { + word |= 1 << (idx % 8); + } else { + word &= (Max() - (1 << (idx % 8))); + } + memcpy(start, &word, sizeof(ui8)); + } + + bool Get(const ui32 idx) const { + AFL_VERIFY(idx < GetSizeBits()); + const ui8 start = (*(ui8*)&Data[idx / 8]); + return start & (1 << (idx % 8)); + } +}; + +class TFilterChecker: public TSimpleIndexChecker { +public: + static TString GetClassNameStatic() { + return "BLOOM_NGRAMM_FILTER"; + } + +private: + using TBase = TSimpleIndexChecker; + std::set HashValues; + static inline auto Registrator = TFactory::TRegistrator(GetClassNameStatic()); + +protected: + virtual bool DoDeserializeFromProtoImpl(const NKikimrSSA::TProgram::TOlapIndexChecker& proto) override; + virtual void DoSerializeToProtoImpl(NKikimrSSA::TProgram::TOlapIndexChecker& proto) const override; + + virtual bool DoCheckImpl(const std::vector& blobs) const override; + +public: + TFilterChecker() = default; + TFilterChecker(const ui32 indexId, std::set&& hashes) + : TBase(indexId) + , HashValues(std::move(hashes)) { + } + virtual TString GetClassName() const override { + return GetClassNameStatic(); + } +}; + +} // namespace NKikimr::NOlap::NIndexes::NBloomNGramm diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.cpp b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.cpp new file mode 100644 index 000000000000..e0068eeb21f5 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.cpp @@ -0,0 +1,91 @@ +#include "constructor.h" +#include "meta.h" + +#include + +namespace NKikimr::NOlap::NIndexes::NBloomNGramm { + +std::shared_ptr TIndexConstructor::DoCreateIndexMeta( + const ui32 indexId, const TString& indexName, const NSchemeShard::TOlapSchema& currentSchema, NSchemeShard::IErrorCollector& errors) const { + auto* columnInfo = currentSchema.GetColumns().GetByName(ColumnName); + if (!columnInfo) { + errors.AddError("no column with name " + ColumnName); + return nullptr; + } + const ui32 columnId = columnInfo->GetId(); + return std::make_shared(indexId, indexName, GetStorageId().value_or(NBlobOperations::TGlobal::DefaultStorageId), columnId, + HashesCount, FilterSizeBytes, NGrammSize); +} + +NKikimr::TConclusionStatus TIndexConstructor::DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) { + if (!jsonInfo.Has("column_name")) { + return TConclusionStatus::Fail("column_name have to be in bloom ngramm filter features"); + } + if (!jsonInfo["column_name"].GetString(&ColumnName)) { + return TConclusionStatus::Fail("column_name have to be string in bloom ngramm filter features"); + } + if (!ColumnName) { + return TConclusionStatus::Fail("empty column_name in bloom ngramm filter features"); + } + + if (!jsonInfo["ngramm_size"].IsUInteger()) { + return TConclusionStatus::Fail("ngramm_size have to be in bloom filter features as uint field"); + } + NGrammSize = jsonInfo["ngramm_size"].GetUInteger(); + if (NGrammSize < 3 || NGrammSize > 10) { + return TConclusionStatus::Fail("ngramm_size have to be in bloom ngramm filter in interval [3, 10]"); + } + + if (!jsonInfo["filter_size_bytes"].IsUInteger()) { + return TConclusionStatus::Fail("filter_size_bytes have to be in bloom filter features as uint field"); + } + FilterSizeBytes = jsonInfo["filter_size_bytes"].GetUInteger(); + if (FilterSizeBytes < 128 || FilterSizeBytes > (1 << 20)) { + return TConclusionStatus::Fail("filter_size_bytes have to be in bloom ngramm filter in interval [128, 1Mb]"); + } + + if (!jsonInfo["hashes_count"].IsUInteger()) { + return TConclusionStatus::Fail("hashes_count have to be in bloom filter features as uint field"); + } + HashesCount = jsonInfo["hashes_count"].GetUInteger(); + if (HashesCount < 1 || HashesCount > 10) { + return TConclusionStatus::Fail("hashes_count have to be in bloom ngramm filter in interval [1, 10]"); + } + return TConclusionStatus::Success(); +} + +NKikimr::TConclusionStatus TIndexConstructor::DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexRequested& proto) { + if (!proto.HasBloomNGrammFilter()) { + const TString errorMessage = "not found BloomNGrammFilter section in proto: \"" + proto.DebugString() + "\""; + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("problem", errorMessage); + return TConclusionStatus::Fail(errorMessage); + } + auto& bFilter = proto.GetBloomNGrammFilter(); + NGrammSize = bFilter.GetNGrammSize(); + if (NGrammSize < 3 || NGrammSize > 10) { + return TConclusionStatus::Fail("NGrammSize have to be in [3, 10]"); + } + FilterSizeBytes = bFilter.GetFilterSizeBytes(); + if (FilterSizeBytes < 128 || FilterSizeBytes > (1 << 20)) { + return TConclusionStatus::Fail("FilterSizeBytes have to be in [128, 1Mb]"); + } + HashesCount = bFilter.GetHashesCount(); + if (HashesCount < 1 || HashesCount > 10) { + return TConclusionStatus::Fail("HashesCount size have to be in [3, 10]"); + } + ColumnName = bFilter.GetColumnName(); + if (!ColumnName) { + return TConclusionStatus::Fail("empty column name"); + } + return TConclusionStatus::Success(); +} + +void TIndexConstructor::DoSerializeToProto(NKikimrSchemeOp::TOlapIndexRequested& proto) const { + auto* filterProto = proto.MutableBloomNGrammFilter(); + filterProto->SetColumnName(ColumnName); + filterProto->SetNGrammSize(NGrammSize); + filterProto->SetFilterSizeBytes(FilterSizeBytes); + filterProto->SetHashesCount(HashesCount); +} + +} // namespace NKikimr::NOlap::NIndexes::NBloomNGramm diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.h b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.h new file mode 100644 index 000000000000..bf666370393d --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.h @@ -0,0 +1,35 @@ +#pragma once +#include +namespace NKikimr::NOlap::NIndexes::NBloomNGramm { + +class TIndexConstructor: public IIndexMetaConstructor { +public: + static TString GetClassNameStatic() { + return "BLOOM_NGRAMM_FILTER"; + } + +private: + TString ColumnName; + ui32 NGrammSize = 3; + ui32 FilterSizeBytes = 512; + ui32 HashesCount = 2; + static inline auto Registrator = TFactory::TRegistrator(GetClassNameStatic()); + +protected: + virtual std::shared_ptr DoCreateIndexMeta(const ui32 indexId, const TString& indexName, + const NSchemeShard::TOlapSchema& currentSchema, NSchemeShard::IErrorCollector& errors) const override; + + virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) override; + + virtual TConclusionStatus DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexRequested& proto) override; + virtual void DoSerializeToProto(NKikimrSchemeOp::TOlapIndexRequested& proto) const override; + +public: + TIndexConstructor() = default; + + virtual TString GetClassName() const override { + return GetClassNameStatic(); + } +}; + +} // namespace NKikimr::NOlap::NIndexes::NBloomNGramm diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.cpp b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.cpp new file mode 100644 index 000000000000..b66afa52f4ec --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.cpp @@ -0,0 +1,150 @@ +#include "checker.h" +#include "meta.h" + +#include +#include +#include + +#include + +#include +#include + +namespace NKikimr::NOlap::NIndexes::NBloomNGramm { + +class TNGrammBuilder { +private: + NArrow::NHash::NXX64::TStreamStringHashCalcer HashCalcer; + + template + void BuildNGramms(const char* data, const ui32 dataSize, const ui32 nGrammSize, const TAction& pred) const { + for (ui32 c = 1; c <= nGrammSize; ++c) { + TString fakeStart('\0', nGrammSize - c); + fakeStart += TString(data, std::min(c, dataSize)); + if (fakeStart.size() < nGrammSize) { + fakeStart += TString('\0', nGrammSize - fakeStart.size()); + } + pred(fakeStart.data()); + } + for (ui32 c = 0; c < dataSize; ++c) { + if (c + nGrammSize <= dataSize) { + pred(data + c); + } else { + TString fakeStart = TString(data + c, dataSize - c); + fakeStart += TString('\0', nGrammSize - fakeStart.size()); + pred(fakeStart.data()); + } + } + } + +public: + TNGrammBuilder() + : HashCalcer(0) { + } + + template + void FillNGrammHashes(const ui32 nGrammSize, const std::shared_ptr& array, const TFiller& fillData) { + AFL_VERIFY(array->type_id() == arrow::utf8()->id())("id", array->type()->ToString()); + NArrow::SwitchType(array->type_id(), [&](const auto& type) { + using TWrap = std::decay_t; + using T = typename TWrap::T; + using TArray = typename arrow::TypeTraits::ArrayType; + auto& typedArray = static_cast(*array); + + for (ui32 row = 0; row < array->length(); ++row) { + if (array->IsNull(row)) { + continue; + } + if constexpr (arrow::has_string_view()) { + auto value = typedArray.GetView(row); + if (value.size() < nGrammSize) { + continue; + } + const auto pred = [&](const char* data) { + HashCalcer.Start(); + HashCalcer.Update((const ui8*)data, nGrammSize); + fillData(HashCalcer.Finish()); + }; + BuildNGramms(value.data(), value.size(), nGrammSize, pred); + } else { + AFL_VERIFY(false); + } + } + return true; + }); + } + + template + void FillNGrammHashes(const ui32 nGrammSize, const TString& userReq, const TFiller& fillData) { + const auto pred = [&](const char* value) { + HashCalcer.Start(); + HashCalcer.Update((const ui8*)value, nGrammSize); + fillData(HashCalcer.Finish()); + }; + BuildNGramms(userReq.data(), userReq.size(), nGrammSize, pred); + } +}; + +TString TIndexMeta::DoBuildIndexImpl(TChunkedBatchReader& reader) const { + AFL_VERIFY(reader.GetColumnsCount() == 1)("count", reader.GetColumnsCount()); + TNGrammBuilder builder; + + TFixStringBitsStorage bits(FilterSizeBytes * 8); + + const auto pred = [&](const ui64 hash) { + const auto predSet = [&](const ui64 hashSecondary) { + bits.Set(true, hashSecondary % bits.GetSizeBits()); + }; + BuildHashesSet(hash, predSet); + }; + for (reader.Start(); reader.IsCorrect();) { + builder.FillNGrammHashes(NGrammSize, reader.begin()->GetCurrentChunk(), pred); + reader.ReadNext(reader.begin()->GetCurrentChunk()->length()); + } + + return bits.GetData(); +} + +void TIndexMeta::DoFillIndexCheckers( + const std::shared_ptr& info, const NSchemeShard::TOlapSchema& schema) const { + for (auto&& branch : info->GetBranches()) { + std::map foundColumns; + for (auto&& cId : ColumnIds) { + auto c = schema.GetColumns().GetById(cId); + if (!c) { + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("error", "incorrect index column")("id", cId); + return; + } + auto it = branch->GetLikes().find(c->GetName()); + if (it == branch->GetLikes().end()) { + break; + } + foundColumns.emplace(cId, it->second); + } + if (foundColumns.size() != ColumnIds.size()) { + continue; + } + + std::set hashes; + const auto pred = [&](const ui64 hash) { + const auto predSet = [&](const ui64 hashSecondary) { + hashes.emplace(hashSecondary); + }; + BuildHashesSet(hash, predSet); + }; + TNGrammBuilder builder; + for (auto&& c : foundColumns) { + for (auto&& ls : c.second.GetLikeSequences()) { + for (auto&& s : ls.second) { + if (!s) { + continue; + } + builder.FillNGrammHashes(NGrammSize, s, pred); + } + } + } + branch->MutableIndexes().emplace_back(std::make_shared(GetIndexId(), std::move(hashes))); + } +} + +} // namespace NKikimr::NOlap::NIndexes::NBloomNGramm diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.h b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.h new file mode 100644 index 000000000000..98af4556a5a5 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.h @@ -0,0 +1,106 @@ +#pragma once +#include +namespace NKikimr::NOlap::NIndexes::NBloomNGramm { + +class TIndexMeta: public TIndexByColumns { +public: + static TString GetClassNameStatic() { + return "BLOOM_NGRAMM_FILTER"; + } +private: + using TBase = TIndexByColumns; + std::shared_ptr ResultSchema; + ui32 NGrammSize = 3; + ui32 FilterSizeBytes = 512; + ui32 HashesCount = 2; + static inline auto Registrator = TFactory::TRegistrator(GetClassNameStatic()); + void Initialize() { + AFL_VERIFY(!ResultSchema); + std::vector> fields = {std::make_shared("", arrow::boolean())}; + ResultSchema = std::make_shared(fields); + AFL_VERIFY(HashesCount > 0); + AFL_VERIFY(FilterSizeBytes > 0); + AFL_VERIFY(NGrammSize > 2); + } + + static const ui64 HashesConstructorP = ((ui64)2 << 31) - 1; + static const ui64 HashesConstructorA = (ui64)2 << 16; + + template + void BuildHashesSet(const ui64 originalHash, const TActor& actor) const { + AFL_VERIFY(HashesCount < HashesConstructorP); + for (ui32 b = 1; b <= HashesCount; ++b) { + const ui64 hash = (HashesConstructorA * originalHash + b) % HashesConstructorP; + actor(hash); + } + } + + template + void BuildHashesSet(const TContainer& originalHashes, const TActor& actor) const { + AFL_VERIFY(HashesCount < HashesConstructorP); + for (auto&& hOriginal : originalHashes) { + BuildHashesSet(hOriginal, actor); + } + } + +protected: + virtual TConclusionStatus DoCheckModificationCompatibility(const IIndexMeta& /*newMeta*/) const override { + return TConclusionStatus::Fail("not supported"); + } + virtual void DoFillIndexCheckers(const std::shared_ptr& info, const NSchemeShard::TOlapSchema& schema) const override; + + virtual TString DoBuildIndexImpl(TChunkedBatchReader& reader) const override; + + virtual bool DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& proto) override { + AFL_VERIFY(TBase::DoDeserializeFromProto(proto)); + AFL_VERIFY(proto.HasBloomNGrammFilter()); + auto& bFilter = proto.GetBloomNGrammFilter(); + HashesCount = bFilter.GetHashesCount(); + if (HashesCount < 1 || 10 < HashesCount) { + return false; + } + NGrammSize = bFilter.GetNGrammSize(); + if (NGrammSize < 3) { + return false; + } + FilterSizeBytes = bFilter.GetFilterSizeBytes(); + if (FilterSizeBytes < 128) { + return false; + } + if (!bFilter.HasColumnId() || !bFilter.GetColumnId()) { + return false; + } + ColumnIds.emplace(bFilter.GetColumnId()); + Initialize(); + return true; + } + virtual void DoSerializeToProto(NKikimrSchemeOp::TOlapIndexDescription& proto) const override { + auto* filterProto = proto.MutableBloomNGrammFilter(); + AFL_VERIFY(NGrammSize >= 3); + AFL_VERIFY(FilterSizeBytes >= 128); + AFL_VERIFY(HashesCount >= 1); + AFL_VERIFY(ColumnIds.size() == 1); + filterProto->SetNGrammSize(NGrammSize); + filterProto->SetFilterSizeBytes(FilterSizeBytes); + filterProto->SetHashesCount(HashesCount); + filterProto->SetColumnId(*ColumnIds.begin()); + } + +public: + TIndexMeta() = default; + TIndexMeta(const ui32 indexId, const TString& indexName, const TString& storageId, const ui32 columnId, const ui32 hashesCount, + const ui32 filterSizeBytes, const ui32 nGrammSize) + : TBase(indexId, indexName, { columnId }, storageId) + , NGrammSize(nGrammSize) + , FilterSizeBytes(filterSizeBytes) + , HashesCount(hashesCount) + { + Initialize(); + } + + virtual TString GetClassName() const override { + return GetClassNameStatic(); + } +}; + +} // namespace NKikimr::NOlap::NIndexes diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/ya.make b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/ya.make new file mode 100644 index 000000000000..bcba53e477ae --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/ya.make @@ -0,0 +1,15 @@ +LIBRARY() + +SRCS( + GLOBAL constructor.cpp + GLOBAL meta.cpp + GLOBAL checker.cpp +) + +PEERDIR( + ydb/core/protos + ydb/core/formats/arrow + ydb/core/tx/columnshard/engines/storage/indexes/portions +) + +END() diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/ya.make b/ydb/core/tx/columnshard/engines/storage/indexes/ya.make index 0459c906d836..b12360d2627d 100644 --- a/ydb/core/tx/columnshard/engines/storage/indexes/ya.make +++ b/ydb/core/tx/columnshard/engines/storage/indexes/ya.make @@ -3,6 +3,7 @@ LIBRARY() PEERDIR( ydb/core/tx/columnshard/engines/storage/indexes/portions ydb/core/tx/columnshard/engines/storage/indexes/bloom + ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm ydb/core/tx/columnshard/engines/storage/indexes/max ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch ) diff --git a/ydb/core/tx/columnshard/splitter/chunks.h b/ydb/core/tx/columnshard/splitter/chunks.h index 98dc024c2a3f..35063d0ae808 100644 --- a/ydb/core/tx/columnshard/splitter/chunks.h +++ b/ydb/core/tx/columnshard/splitter/chunks.h @@ -148,6 +148,16 @@ class TChunkedBatchReader { } } + bool ReadNext(const ui32 count) { + for (ui32 i = 0; i < count; ++i) { + if (!ReadNext()) { + AFL_VERIFY(i + 1 == count); + return false; + } + } + return true; + } + bool ReadNext() { std::optional result; for (auto&& i : Columns) { diff --git a/ydb/library/formats/arrow/protos/ssa.proto b/ydb/library/formats/arrow/protos/ssa.proto index 193c759a3a80..38a0bb14805b 100644 --- a/ydb/library/formats/arrow/protos/ssa.proto +++ b/ydb/library/formats/arrow/protos/ssa.proto @@ -45,6 +45,10 @@ message TProgram { repeated uint64 HashValues = 1; } + message TBloomNGrammFilterChecker { + repeated uint64 HashValues = 1; + } + message TCountMinSketchChecker { } @@ -60,6 +64,7 @@ message TProgram { TBloomFilterChecker BloomFilter = 40; TCompositeChecker Composite = 41; TCountMinSketchChecker CountMinSketch = 42; + TBloomNGrammFilterChecker BloomNGrammFilter = 43; } }