Skip to content

Commit

Permalink
Topic autopartitioning for CDC (#8996)
Browse files Browse the repository at this point in the history
  • Loading branch information
nshestakov authored Oct 2, 2024
1 parent 8030e8b commit 2355ac3
Show file tree
Hide file tree
Showing 35 changed files with 491 additions and 148 deletions.
31 changes: 31 additions & 0 deletions ydb/core/kqp/provider/yql_kikimr_exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1880,6 +1880,37 @@ class TKiSinkCallableExecutionTransformer : public TAsyncCallbackTransformer<TKi
const auto duration = TDuration::FromValue(value);
auto& retention = *add_changefeed->mutable_retention_period();
retention.set_seconds(duration.Seconds());
} else if (name == "topic_auto_partitioning") {
auto* settings = add_changefeed->mutable_topic_partitioning_settings()->mutable_auto_partitioning_settings();

auto val = to_lower(TString(
setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value()
));
if (val == "enabled") {
settings->set_strategy(::Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_SCALE_UP_AND_DOWN);
} else if (val == "disabled") {
settings->set_strategy(::Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED);
} else {
ctx.AddError(
TIssue(ctx.GetPosition(setting.Name().Pos()),
TStringBuilder() << "Unknown changefeed topic autopartitioning '" << val << "'"
)
);
return SyncError();
}
} else if (name == "topic_max_active_partitions") {
auto value = TString(
setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value()
);

i64 maxActivePartitions;
if (!TryFromString(value, maxActivePartitions) || maxActivePartitions <= 0) {
ctx.AddError(TIssue(ctx.GetPosition(setting.Name().Pos()),
TStringBuilder() << name << " must be greater than 0"));
return SyncError();
}

add_changefeed->mutable_topic_partitioning_settings()->set_max_active_partitions(maxActivePartitions);
} else if (name == "topic_min_active_partitions") {
auto value = TString(
setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value()
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/ut/common/kqp_ut_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ struct TKikimrSettings: public TTestFeatureFlagsHolder<TKikimrSettings> {
AppConfig.MutableColumnShardConfig()->SetDisabledOnSchemeShard(false);
FeatureFlags.SetEnableSparsedColumns(true);
FeatureFlags.SetEnableParameterizedDecimal(true);
FeatureFlags.SetEnableTopicAutopartitioningForCDC(true);
}

TKikimrSettings& SetAppConfig(const NKikimrConfig::TAppConfig& value) { AppConfig = value; return *this; }
Expand Down
114 changes: 86 additions & 28 deletions ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2411,7 +2411,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
UNIT_ASSERT_VALUES_EQUAL(std::get<TVectorIndexSettings::ESimilarity>(vectorIndexSettings.Metric), TVectorIndexSettings::ESimilarity::InnerProduct);
UNIT_ASSERT_VALUES_EQUAL(vectorIndexSettings.VectorType, TVectorIndexSettings::EVectorType::Float);
UNIT_ASSERT_VALUES_EQUAL(vectorIndexSettings.VectorDimension, 1024);
}
}
}
}

Expand Down Expand Up @@ -2466,7 +2466,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
UNIT_ASSERT_C(describe.IsSuccess(), describe.GetIssues().ToString());
auto indexDesc = describe.GetTableDescription();
UNIT_ASSERT_VALUES_EQUAL(indexDesc.GetPartitioningSettings().GetPartitionSizeMb(), partitionSizeMb);
}
}
}

Y_UNIT_TEST(AlterTableAlterVectorIndex) {
Expand All @@ -2483,8 +2483,8 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
Key Uint64,
Embedding String,
PRIMARY KEY (Key),
INDEX vector_idx
GLOBAL USING vector_kmeans_tree
INDEX vector_idx
GLOBAL USING vector_kmeans_tree
ON (Embedding)
WITH (similarity=cosine, vector_type=bit, vector_dimension=1)
);
Expand All @@ -2498,29 +2498,29 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
auto indexDesc = describe.GetTableDescription();
constexpr int defaultPartitionSizeMb = 2048;
UNIT_ASSERT_VALUES_EQUAL(indexDesc.GetPartitioningSettings().GetPartitionSizeMb(), defaultPartitionSizeMb);
}
}
{
auto result = session.ExecuteSchemeQuery(R"(
ALTER TABLE `/Root/TestTable` ALTER INDEX vector_idx SET AUTO_PARTITIONING_MIN_PARTITIONS_COUNT 1;
)").ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString());
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Only index with one impl table is supported" );
}
}
}

Y_UNIT_TEST(AlterTableAlterMissedIndex) {
TKikimrRunner kikimr;
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
CreateSampleTablesWithIndex(session);
CreateSampleTablesWithIndex(session);
{
auto result = session.ExecuteSchemeQuery(R"(
ALTER TABLE `/Root/SecondaryKeys` ALTER INDEX WrongIndexName SET AUTO_PARTITIONING_MIN_PARTITIONS_COUNT 1;
)").ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SCHEME_ERROR, result.GetIssues().ToString());
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Unknown index name: WrongIndexName");
}
}
}

Y_UNIT_TEST(AlterIndexImplTable) {
TKikimrRunner kikimr;
Expand Down Expand Up @@ -2739,13 +2739,13 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
Key Uint64,
Embedding String,
PRIMARY KEY (Key),
INDEX vector_idx
GLOBAL USING vector_kmeans_tree
INDEX vector_idx
GLOBAL USING vector_kmeans_tree
ON (Embedding)
WITH (similarity=inner_product, vector_type=float, vector_dimension=1024)
);
)";

auto result = session.ExecuteSchemeQuery(create_index_query).ExtractValueSync();

UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
Expand All @@ -2770,7 +2770,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
UNIT_ASSERT_C(describeLevelTable.IsSuccess(), describeLevelTable.GetIssues().ToString());
auto describePostingTable = session.DescribeTable("/Root/TestTable/vector_idx/indexImplPostingTable").GetValueSync();
UNIT_ASSERT_C(describePostingTable.IsSuccess(), describePostingTable.GetIssues().ToString());
}
}
}


Expand All @@ -2789,8 +2789,8 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
Embedding String,
Covered String,
PRIMARY KEY (Key),
INDEX vector_idx
GLOBAL USING vector_kmeans_tree
INDEX vector_idx
GLOBAL USING vector_kmeans_tree
ON (Embedding)
COVER (Covered)
WITH (similarity=inner_product, vector_type=float, vector_dimension=1024)
Expand All @@ -2816,7 +2816,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
UNIT_ASSERT_VALUES_EQUAL(indexDesc.GetVectorIndexSettings()->VectorType, NYdb::NTable::TVectorIndexSettings::EVectorType::Float);
UNIT_ASSERT_VALUES_EQUAL(indexDesc.GetVectorIndexSettings()->VectorDimension, 1024);
}
}
}


Y_UNIT_TEST(CreateTableWithVectorIndexCaseIncentive) {
Expand All @@ -2833,15 +2833,15 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
Key Uint64,
Embedding String,
PRIMARY KEY (Key),
INDEX vector_idx
GLOBAL USING vector_KMEANS_tree
INDEX vector_idx
GLOBAL USING vector_KMEANS_tree
ON (Embedding)
WITH (similarity=COSINE, VECTOR_TYPE=float, vector_DIMENSION=1024)
);
)";
auto result = session.ExecuteSchemeQuery(create_index_query).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}
}
}

Y_UNIT_TEST(CreateTableWithVectorIndexNoFeatureFlag) {
Expand All @@ -2856,8 +2856,8 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
Embedding String,
Covered String,
PRIMARY KEY (Key),
INDEX vector_idx
GLOBAL USING vector_kmeans_tree
INDEX vector_idx
GLOBAL USING vector_kmeans_tree
ON (Embedding)
WITH (similarity=inner_product, vector_type=float, vector_dimension=1024)
);
Expand All @@ -2866,8 +2866,8 @@ Y_UNIT_TEST_SUITE(KqpScheme) {

UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::PRECONDITION_FAILED, result.GetIssues().ToString());
}
}
}

Y_UNIT_TEST(CreateTableWithVectorIndexPublicApi) {
NKikimrConfig::TFeatureFlags featureFlags;
featureFlags.SetEnableVectorIndex(true);
Expand All @@ -2881,8 +2881,8 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
.AddNullableColumn("Embedding", EPrimitiveType::String)
.SetPrimaryKeyColumn("Key")
.AddVectorKMeansTreeSecondaryIndex("vector_idx", {"Embedding"},
{ NYdb::NTable::TVectorIndexSettings::EDistance::Cosine,
NYdb::NTable::TVectorIndexSettings::EVectorType::Float,
{ NYdb::NTable::TVectorIndexSettings::EDistance::Cosine,
NYdb::NTable::TVectorIndexSettings::EVectorType::Float,
1024});

auto result = session.CreateTable("/Root/TestTable", builder.Build()).ExtractValueSync();
Expand All @@ -2903,7 +2903,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
UNIT_ASSERT_VALUES_EQUAL(indexDesc.GetVectorIndexSettings()->VectorType, NYdb::NTable::TVectorIndexSettings::EVectorType::Float);
UNIT_ASSERT_VALUES_EQUAL(indexDesc.GetVectorIndexSettings()->VectorDimension, 1024);
}
}
}

Y_UNIT_TEST(CreateTableWithVectorIndexCoveredPublicApi) {
NKikimrConfig::TFeatureFlags featureFlags;
Expand All @@ -2919,8 +2919,8 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
.AddNullableColumn("Covered", EPrimitiveType::String)
.SetPrimaryKeyColumn("Key")
.AddVectorKMeansTreeSecondaryIndex("vector_idx", {"Embedding"}, {"Covered"},
{ NYdb::NTable::TVectorIndexSettings::EDistance::Cosine,
NYdb::NTable::TVectorIndexSettings::EVectorType::Float,
{ NYdb::NTable::TVectorIndexSettings::EDistance::Cosine,
NYdb::NTable::TVectorIndexSettings::EVectorType::Float,
1024});

auto result = session.CreateTable("/Root/TestTable", builder.Build()).ExtractValueSync();
Expand All @@ -2942,7 +2942,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
UNIT_ASSERT_VALUES_EQUAL(indexDesc.GetVectorIndexSettings()->VectorType, NYdb::NTable::TVectorIndexSettings::EVectorType::Float);
UNIT_ASSERT_VALUES_EQUAL(indexDesc.GetVectorIndexSettings()->VectorDimension, 1024);
}
}
}

Y_UNIT_TEST(AlterTableWithDecimalColumn) {
TKikimrRunner kikimr;
Expand Down Expand Up @@ -4371,6 +4371,64 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
}
}

Y_UNIT_TEST(ChangefeedTopicAutoPartitioning) {
using namespace NTopic;

TKikimrRunner kikimr(TKikimrSettings().SetPQConfig(DefaultPQConfig()));
auto pq = TTopicClient(kikimr.GetDriver(), TTopicClientSettings().Database("/Root"));
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

{ // Uint64 key
auto query = R"(
--!syntax_v1
CREATE TABLE `/Root/table_tap` (
Key Uint64,
Value String,
PRIMARY KEY (Key)
);
)";

auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

{ // default
auto query = R"(
--!syntax_v1
ALTER TABLE `/Root/table_tap` ADD CHANGEFEED `feed_1` WITH (
MODE = 'KEYS_ONLY', FORMAT = 'JSON', TOPIC_MIN_ACTIVE_PARTITIONS = 7, TOPIC_MAX_ACTIVE_PARTITIONS = 777, TOPIC_AUTO_PARTITIONING = 'ENABLED'
);
)";

const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

auto desc = pq.DescribeTopic("/Root/table_tap/feed_1").ExtractValueSync();
UNIT_ASSERT_C(desc.IsSuccess(), desc.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(desc.GetTopicDescription().GetPartitions().size(), 7);
UNIT_ASSERT_VALUES_EQUAL(desc.GetTopicDescription().GetPartitioningSettings().GetMinActivePartitions(), 7);
UNIT_ASSERT_VALUES_EQUAL(desc.GetTopicDescription().GetPartitioningSettings().GetMaxActivePartitions(), 777);
UNIT_ASSERT_VALUES_EQUAL(desc.GetTopicDescription().GetPartitioningSettings().GetAutoPartitioningSettings().GetStrategy(), NYdb::NTopic::EAutoPartitioningStrategy::ScaleUp);
}

{ // disabled
auto query = R"(
--!syntax_v1
ALTER TABLE `/Root/table_tap` ADD CHANGEFEED `feed_2` WITH (
MODE = 'KEYS_ONLY', FORMAT = 'JSON', TOPIC_AUTO_PARTITIONING='DISABLED'
);
)";

const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

auto desc = pq.DescribeTopic("/Root/table_tap/feed_2").ExtractValueSync();
UNIT_ASSERT_C(desc.IsSuccess(), desc.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(desc.GetTopicDescription().GetPartitioningSettings().GetAutoPartitioningSettings().GetStrategy(), NYdb::NTopic::EAutoPartitioningStrategy::Disabled);
}
}

Y_UNIT_TEST(ChangefeedAttributes) {
TKikimrRunner kikimr(TKikimrSettings().SetPQConfig(DefaultPQConfig()));
auto db = kikimr.GetTableClient();
Expand Down
29 changes: 20 additions & 9 deletions ydb/core/persqueue/events/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
#include <ydb/core/base/row_version.h>
#include <ydb/core/protos/pqconfig.pb.h>
#include <ydb/core/persqueue/blob.h>
#include <ydb/core/persqueue/percentile_counter.h>
#include <ydb/core/persqueue/key.h>
#include <ydb/core/persqueue/sourceid_info.h>
#include <ydb/core/persqueue/metering_sink.h>
#include <ydb/core/persqueue/partition_key_range/partition_key_range.h>
#include <ydb/core/persqueue/percentile_counter.h>
#include <ydb/core/persqueue/sourceid_info.h>
#include <ydb/core/persqueue/write_id.h>
#include <ydb/core/tablet/tablet_counters.h>
#include <ydb/library/persqueue/topic_parser/topic_parser.h>
Expand Down Expand Up @@ -293,6 +294,14 @@ struct TEvPQ {
ui64 LastOffset;
};

struct TMessageGroup {
ui64 SeqNo;
NPQ::TPartitionKeyRange KeyRange;
};

using TMessageGroups = std::unordered_map<TString, TMessageGroup>;
using TMessageGroupsPtr = std::shared_ptr<TMessageGroups>;

struct TEvDirectReadBase {
TEvDirectReadBase(ui64 cookie, const NPQ::TDirectReadKey& readKey, const TActorId& pipeClient)
: Cookie(cookie)
Expand Down Expand Up @@ -576,15 +585,13 @@ struct TEvPQ {
};

struct TEvChangePartitionConfig : public TEventLocal<TEvChangePartitionConfig, EvChangePartitionConfig> {
TEvChangePartitionConfig(const NPersQueue::TTopicConverterPtr& topicConverter, const NKikimrPQ::TPQTabletConfig& config, const NKikimrPQ::TBootstrapConfig& bootstrapConfig)
TEvChangePartitionConfig(const NPersQueue::TTopicConverterPtr& topicConverter, const NKikimrPQ::TPQTabletConfig& config)
: TopicConverter(topicConverter)
, Config(config)
, BootstrapConfig(bootstrapConfig)
{}

NPersQueue::TTopicConverterPtr TopicConverter;
NKikimrPQ::TPQTabletConfig Config;
NKikimrPQ::TBootstrapConfig BootstrapConfig;
};

struct TEvPartitionConfigChanged : public TEventLocal<TEvPartitionConfigChanged, EvPartitionConfigChanged> {
Expand Down Expand Up @@ -847,7 +854,6 @@ struct TEvPQ {
ui64 TxId;
NPersQueue::TTopicConverterPtr TopicConverter;
NKikimrPQ::TPQTabletConfig Config;
NKikimrPQ::TBootstrapConfig BootstrapConfig;
};

struct TEvProposePartitionConfigResult : public TEventLocal<TEvProposePartitionConfigResult, EvProposePartitionConfigResult> {
Expand All @@ -861,17 +867,22 @@ struct TEvPQ {
ui64 Step;
ui64 TxId;
NPQ::TPartitionId Partition;

NKikimrPQ::TPartitions::TPartitionInfo Data;
};

struct TEvTxCommit : public TEventLocal<TEvTxCommit, EvTxCommit> {
TEvTxCommit(ui64 step, ui64 txId) :
Step(step),
TxId(txId)
TEvTxCommit(ui64 step, ui64 txId, TMessageGroupsPtr explicitMessageGroups = nullptr)
: Step(step)
, TxId(txId)
, ExplicitMessageGroups(std::move(explicitMessageGroups))
{
}

ui64 Step;
ui64 TxId;

TMessageGroupsPtr ExplicitMessageGroups;
};

struct TEvTxCommitDone : public TEventLocal<TEvTxCommitDone, EvTxCommitDone> {
Expand Down
Loading

0 comments on commit 2355ac3

Please sign in to comment.