From 3a91d7281a8196e45c2e6e4e07d4203737fc41ba Mon Sep 17 00:00:00 2001 From: Ilnaz Nizametdinov Date: Tue, 1 Oct 2024 15:26:56 +0300 Subject: [PATCH 1/2] Parse topic's partitioning once and more efficiently (#9914) --- .../tx/datashard/change_sender_cdc_stream.cpp | 103 +----------------- ydb/core/tx/datashard/ya.make | 1 - ydb/core/tx/scheme_board/cache.cpp | 61 ++++++++++- ydb/core/tx/scheme_board/ya.make | 1 + ydb/core/tx/scheme_cache/scheme_cache.h | 2 + 5 files changed, 68 insertions(+), 100 deletions(-) diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp index 5300357c24cd..a605c1cc7099 100644 --- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp +++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp @@ -6,7 +6,6 @@ #include #include -#include #include #include #include @@ -300,45 +299,6 @@ class TCdcChangeSenderMain , public NChangeExchange::ISenderFactory , private NSchemeCache::TSchemeCacheHelpers { - struct TPQPartitionInfo { - ui32 PartitionId; - ui64 ShardId; - TPartitionKeyRange KeyRange; - - struct TLess { - TConstArrayRef Schema; - - TLess(const TVector& schema) - : Schema(schema) - { - } - - bool operator()(const TPQPartitionInfo& lhs, const TPQPartitionInfo& rhs) const { - Y_ABORT_UNLESS(lhs.KeyRange.ToBound || rhs.KeyRange.ToBound); - - if (!lhs.KeyRange.ToBound) { - return false; - } - - if (!rhs.KeyRange.ToBound) { - return true; - } - - Y_ABORT_UNLESS(lhs.KeyRange.ToBound && rhs.KeyRange.ToBound); - - const int compares = CompareTypedCellVectors( - lhs.KeyRange.ToBound->GetCells().data(), - rhs.KeyRange.ToBound->GetCells().data(), - Schema.data(), Schema.size() - ); - - return (compares < 0); - } - - }; // TLess - - }; // TPQPartitionInfo - TStringBuf GetLogPrefix() const { if (!LogPrefix) { LogPrefix = TStringBuilder() @@ -564,72 +524,19 @@ class TCdcChangeSenderMain const auto& pqDesc = entry.PQGroupInfo->Description; const auto& pqConfig = pqDesc.GetPQTabletConfig(); - TVector schema; PartitionToShard.clear(); - - schema.reserve(pqConfig.PartitionKeySchemaSize()); - for (const auto& keySchema : pqConfig.GetPartitionKeySchema()) { - // TODO: support pg types - schema.push_back(NScheme::TTypeInfo(keySchema.GetTypeId())); - } - - TSet partitions(schema); - THashSet shards; - for (const auto& partition : pqDesc.GetPartitions()) { - const auto partitionId = partition.GetPartitionId(); - const auto shardId = partition.GetTabletId(); - - PartitionToShard.emplace(partitionId, shardId); - - auto keyRange = TPartitionKeyRange::Parse(partition.GetKeyRange()); - Y_ABORT_UNLESS(!keyRange.FromBound || keyRange.FromBound->GetCells().size() == schema.size()); - Y_ABORT_UNLESS(!keyRange.ToBound || keyRange.ToBound->GetCells().size() == schema.size()); - - partitions.insert({partitionId, shardId, std::move(keyRange)}); - shards.insert(shardId); - } - - // used to validate - bool isFirst = true; - const TPQPartitionInfo* prev = nullptr; - - TVector partitioning; - partitioning.reserve(partitions.size()); - for (const auto& cur : partitions) { - if (isFirst) { - isFirst = false; - Y_ABORT_UNLESS(!cur.KeyRange.FromBound.Defined()); - } else { - Y_ABORT_UNLESS(cur.KeyRange.FromBound.Defined()); - Y_ABORT_UNLESS(prev); - Y_ABORT_UNLESS(prev->KeyRange.ToBound.Defined()); - // TODO: compare cells - } - - auto& part = partitioning.emplace_back(cur.PartitionId); // TODO: double-check that it is right partitioning - - if (cur.KeyRange.ToBound) { - part.Range = NKikimr::TKeyDesc::TPartitionRangeInfo{ - .EndKeyPrefix = *cur.KeyRange.ToBound, - }; - } else { - part.Range = NKikimr::TKeyDesc::TPartitionRangeInfo{}; - } - - prev = &cur; - } - - if (prev) { - Y_ABORT_UNLESS(!prev->KeyRange.ToBound.Defined()); + PartitionToShard.emplace(partition.GetPartitionId(), partition.GetTabletId()); } const auto topicVersion = entry.Self->Info.GetVersion().GetGeneralVersion(); const bool versionChanged = !TopicVersion || TopicVersion != topicVersion; TopicVersion = topicVersion; - KeyDesc = NKikimr::TKeyDesc::CreateMiniKeyDesc(schema); - KeyDesc->Partitioning = std::make_shared>(std::move(partitioning)); + Y_ABORT_UNLESS(entry.PQGroupInfo->Schema); + KeyDesc = NKikimr::TKeyDesc::CreateMiniKeyDesc(entry.PQGroupInfo->Schema); + Y_ABORT_UNLESS(entry.PQGroupInfo->Partitioning); + KeyDesc->Partitioning = std::make_shared>(entry.PQGroupInfo->Partitioning); CreateSenders(MakePartitionIds(*KeyDesc->Partitioning), versionChanged); Become(&TThis::StateMain); diff --git a/ydb/core/tx/datashard/ya.make b/ydb/core/tx/datashard/ya.make index b16535504346..6cfc69901adb 100644 --- a/ydb/core/tx/datashard/ya.make +++ b/ydb/core/tx/datashard/ya.make @@ -244,7 +244,6 @@ PEERDIR( ydb/core/formats ydb/core/io_formats/ydb_dump ydb/core/kqp/runtime - ydb/core/persqueue/partition_key_range ydb/core/persqueue/writer ydb/core/protos ydb/core/tablet diff --git a/ydb/core/tx/scheme_board/cache.cpp b/ydb/core/tx/scheme_board/cache.cpp index b521e7ed7e87..0c0c2061a3c3 100644 --- a/ydb/core/tx/scheme_board/cache.cpp +++ b/ydb/core/tx/scheme_board/cache.cpp @@ -13,9 +13,9 @@ #include #include #include +#include #include #include -#include #include #include #include @@ -26,6 +26,8 @@ #include #include #include +#include + #include #include @@ -976,6 +978,62 @@ class TSchemeCache: public TMonitorableActor { return partitions; } + static void FillTopicPartitioning( + const NKikimrSchemeOp::TPersQueueGroupDescription& pqDesc, + TVector& schema, + TVector& partitioning) + { + const auto& pqConfig = pqDesc.GetPQTabletConfig(); + if (pqConfig.GetPartitionKeySchema().empty()) { + return; + } + + schema.reserve(pqConfig.PartitionKeySchemaSize()); + for (const auto& keySchema : pqConfig.GetPartitionKeySchema()) { + // TODO: support pg types + schema.push_back(NScheme::TTypeInfo(keySchema.GetTypeId())); + } + + partitioning.reserve(pqDesc.PartitionsSize()); + for (const auto& partition : pqDesc.GetPartitions()) { + auto keyRange = NPQ::TPartitionKeyRange::Parse(partition.GetKeyRange()); + Y_ABORT_UNLESS(!keyRange.FromBound || keyRange.FromBound->GetCells().size() == schema.size()); + Y_ABORT_UNLESS(!keyRange.ToBound || keyRange.ToBound->GetCells().size() == schema.size()); + + auto& info = partitioning.emplace_back(partition.GetPartitionId()); + if (keyRange.ToBound) { + info.Range = NKikimr::TKeyDesc::TPartitionRangeInfo{ + .EndKeyPrefix = *keyRange.ToBound, + }; + } else { + info.Range = NKikimr::TKeyDesc::TPartitionRangeInfo{}; + } + } + + Sort(partitioning.begin(), partitioning.end(), [&schema](const auto& lhs, const auto& rhs) { + Y_ABORT_UNLESS(lhs.Range && rhs.Range); + Y_ABORT_UNLESS(lhs.Range->EndKeyPrefix || rhs.Range->EndKeyPrefix); + + if (!lhs.Range->EndKeyPrefix) { + return false; + } + + if (!rhs.Range->EndKeyPrefix) { + return true; + } + + Y_ABORT_UNLESS(lhs.Range->EndKeyPrefix && rhs.Range->EndKeyPrefix); + + const int compares = CompareTypedCellVectors( + lhs.Range->EndKeyPrefix.GetCells().data(), + rhs.Range->EndKeyPrefix.GetCells().data(), + schema.data(), schema.size() + ); + + return (compares < 0); + }); + } + bool IsSysTable() const { return Kind == TNavigate::KindTable && PathId.OwnerId == TSysTables::SysSchemeShard; } @@ -1484,6 +1542,7 @@ class TSchemeCache: public TMonitorableActor { if (Created) { NPQ::Migrate(*pathDesc.MutablePersQueueGroup()->MutablePQTabletConfig()); FillInfo(Kind, PQGroupInfo, std::move(*pathDesc.MutablePersQueueGroup())); + FillTopicPartitioning(PQGroupInfo->Description, PQGroupInfo->Schema, PQGroupInfo->Partitioning); } break; case NKikimrSchemeOp::EPathTypeCdcStream: diff --git a/ydb/core/tx/scheme_board/ya.make b/ydb/core/tx/scheme_board/ya.make index 22f66c217f09..ffb2121ca4c9 100644 --- a/ydb/core/tx/scheme_board/ya.make +++ b/ydb/core/tx/scheme_board/ya.make @@ -4,6 +4,7 @@ PEERDIR( ydb/library/actors/core ydb/core/base ydb/core/mon + ydb/core/persqueue/partition_key_range ydb/core/protos ydb/core/sys_view/common ydb/core/tx/scheme_cache diff --git a/ydb/core/tx/scheme_cache/scheme_cache.h b/ydb/core/tx/scheme_cache/scheme_cache.h index b81736613daa..99c120e99112 100644 --- a/ydb/core/tx/scheme_cache/scheme_cache.h +++ b/ydb/core/tx/scheme_cache/scheme_cache.h @@ -177,6 +177,8 @@ struct TSchemeCacheNavigate { struct TPQGroupInfo : public TAtomicRefCount { EKind Kind = KindUnknown; NKikimrSchemeOp::TPersQueueGroupDescription Description; + TVector Schema; + TVector Partitioning; }; struct TRtmrVolumeInfo : public TAtomicRefCount { From 14faff28dbb0255fe9ff452e15dee6b37c2d2e64 Mon Sep 17 00:00:00 2001 From: Ilnaz Nizametdinov Date: Wed, 2 Oct 2024 21:42:12 +0300 Subject: [PATCH 2/2] Better handling of connection loss (#9993) --- .../change_sender_common_ops.h | 20 +++++++++++---- ydb/core/change_exchange/util.cpp | 15 +++++++++++ ydb/core/change_exchange/util.h | 9 +++++++ ydb/core/change_exchange/ya.make | 1 + .../datashard/change_sender_async_index.cpp | 20 ++++++--------- .../tx/datashard/change_sender_cdc_stream.cpp | 25 ++++++++----------- .../replication/service/table_writer_impl.h | 21 ++++++---------- 7 files changed, 65 insertions(+), 46 deletions(-) create mode 100644 ydb/core/change_exchange/util.cpp create mode 100644 ydb/core/change_exchange/util.h diff --git a/ydb/core/change_exchange/change_sender_common_ops.h b/ydb/core/change_exchange/change_sender_common_ops.h index 823b208f5323..7c472e704443 100644 --- a/ydb/core/change_exchange/change_sender_common_ops.h +++ b/ydb/core/change_exchange/change_sender_common_ops.h @@ -413,20 +413,30 @@ class TBaseChangeSender { } TActorId GetChangeServer() const { return ChangeServer; } - void CreateSenders(const TVector& partitionIds, bool partitioningChanged = true) { - if (partitioningChanged) { + +private: + void CreateSendersImpl(const TVector& partitionIds) { + if (partitionIds) { CreateMissingSenders(partitionIds); } else { - RecreateSenders(GonePartitions); + RecreateSenders(std::exchange(GonePartitions, {})); } - GonePartitions.clear(); - if (!Enqueued || !RequestRecords()) { SendRecords(); } } +protected: + void CreateSenders(const TVector& partitionIds) { + Y_ABORT_UNLESS(partitionIds); + CreateSendersImpl(partitionIds); + } + + void CreateSenders() { + CreateSendersImpl({}); + } + void KillSenders() { for (const auto& [_, sender] : std::exchange(Senders, {})) { if (sender.ActorId) { diff --git a/ydb/core/change_exchange/util.cpp b/ydb/core/change_exchange/util.cpp new file mode 100644 index 000000000000..c4c0516e0e94 --- /dev/null +++ b/ydb/core/change_exchange/util.cpp @@ -0,0 +1,15 @@ +#include "util.h" + +namespace NKikimr::NChangeExchange { + +TVector MakePartitionIds(const TVector& partitions) { + TVector result(::Reserve(partitions.size())); + + for (const auto& partition : partitions) { + result.push_back(partition.ShardId); + } + + return result; +} + +} diff --git a/ydb/core/change_exchange/util.h b/ydb/core/change_exchange/util.h new file mode 100644 index 000000000000..f8ba146fdeaf --- /dev/null +++ b/ydb/core/change_exchange/util.h @@ -0,0 +1,9 @@ +#pragma once + +#include + +namespace NKikimr::NChangeExchange { + +TVector MakePartitionIds(const TVector& partitions); + +} diff --git a/ydb/core/change_exchange/ya.make b/ydb/core/change_exchange/ya.make index b95ab2178442..680c246118ea 100644 --- a/ydb/core/change_exchange/ya.make +++ b/ydb/core/change_exchange/ya.make @@ -4,6 +4,7 @@ SRCS( change_exchange.cpp change_record.cpp change_sender_monitoring.cpp + util.cpp ) GENERATE_ENUM_SERIALIZATION(change_record.h) diff --git a/ydb/core/tx/datashard/change_sender_async_index.cpp b/ydb/core/tx/datashard/change_sender_async_index.cpp index 38492b20728e..106d03406b58 100644 --- a/ydb/core/tx/datashard/change_sender_async_index.cpp +++ b/ydb/core/tx/datashard/change_sender_async_index.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -435,16 +436,6 @@ class TAsyncIndexChangeSenderMain return Check(&TSchemeCacheHelpers::CheckEntryKind, &TThis::LogWarnAndRetry, entry, expected); } - static TVector MakePartitionIds(const TVector& partitions) { - TVector result(Reserve(partitions.size())); - - for (const auto& partition : partitions) { - result.push_back(partition.ShardId); // partition = shard - } - - return result; - } - /// ResolveUserTable void ResolveUserTable() { @@ -611,6 +602,11 @@ class TAsyncIndexChangeSenderMain return; } + if (IndexTableVersion && IndexTableVersion == entry.Self->Info.GetVersion().GetGeneralVersion()) { + CreateSenders(); + return Become(&TThis::StateMain); + } + TagMap.clear(); TVector keyColumnTypes; @@ -692,11 +688,9 @@ class TAsyncIndexChangeSenderMain return Retry(); } - const bool versionChanged = !IndexTableVersion || IndexTableVersion != entry.GeneralVersion; IndexTableVersion = entry.GeneralVersion; - KeyDesc = std::move(entry.KeyDescription); - CreateSenders(MakePartitionIds(KeyDesc->GetPartitions()), versionChanged); + CreateSenders(NChangeExchange::MakePartitionIds(KeyDesc->GetPartitions())); Become(&TThis::StateMain); } diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp index a605c1cc7099..980c269f15ca 100644 --- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp +++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp @@ -6,6 +6,7 @@ #include #include +#include #include #include #include @@ -390,16 +391,6 @@ class TCdcChangeSenderMain return false; } - static TVector MakePartitionIds(const TVector& partitions) { - TVector result(Reserve(partitions.size())); - - for (const auto& partition : partitions) { - result.push_back(partition.ShardId); - } - - return result; - } - /// ResolveCdcStream void ResolveCdcStream() { @@ -521,6 +512,14 @@ class TCdcChangeSenderMain return; } + const auto topicVersion = entry.Self->Info.GetVersion().GetGeneralVersion(); + if (TopicVersion && TopicVersion == topicVersion) { + CreateSenders(); + return Become(&TThis::StateMain); + } + + TopicVersion = topicVersion; + const auto& pqDesc = entry.PQGroupInfo->Description; const auto& pqConfig = pqDesc.GetPQTabletConfig(); @@ -529,16 +528,12 @@ class TCdcChangeSenderMain PartitionToShard.emplace(partition.GetPartitionId(), partition.GetTabletId()); } - const auto topicVersion = entry.Self->Info.GetVersion().GetGeneralVersion(); - const bool versionChanged = !TopicVersion || TopicVersion != topicVersion; - TopicVersion = topicVersion; - Y_ABORT_UNLESS(entry.PQGroupInfo->Schema); KeyDesc = NKikimr::TKeyDesc::CreateMiniKeyDesc(entry.PQGroupInfo->Schema); Y_ABORT_UNLESS(entry.PQGroupInfo->Partitioning); KeyDesc->Partitioning = std::make_shared>(entry.PQGroupInfo->Partitioning); - CreateSenders(MakePartitionIds(*KeyDesc->Partitioning), versionChanged); + CreateSenders(NChangeExchange::MakePartitionIds(*KeyDesc->Partitioning)); Become(&TThis::StateMain); } diff --git a/ydb/core/tx/replication/service/table_writer_impl.h b/ydb/core/tx/replication/service/table_writer_impl.h index 1fd77232fb2d..2af13fa22042 100644 --- a/ydb/core/tx/replication/service/table_writer_impl.h +++ b/ydb/core/tx/replication/service/table_writer_impl.h @@ -6,6 +6,7 @@ #include #include +#include #include #include #include @@ -278,16 +279,6 @@ class TLocalTableWriter return Check(&TSchemeCacheHelpers::CheckEntryKind, &TThis::LogCritAndLeave, entry, expected); } - static TVector MakePartitionIds(const TVector& partitions) { - TVector result(::Reserve(partitions.size())); - - for (const auto& partition : partitions) { - result.push_back(partition.ShardId); - } - - return result; - } - void Registered(TActorSystem*, const TActorId&) override { this->ChangeServer = this->SelfId(); } @@ -348,6 +339,12 @@ class TLocalTableWriter return; } + if (TableVersion && TableVersion == entry.Self->Info.GetVersion().GetGeneralVersion()) { + Y_ABORT_UNLESS(Initialized); + Resolving = false; + return CreateSenders(); + } + auto schema = MakeIntrusive(); if (entry.Self && entry.Self->Info.HasVersion()) { schema->Version = entry.Self->Info.GetVersion().GetTableSchemaVersion(); @@ -415,11 +412,9 @@ class TLocalTableWriter return LogWarnAndRetry("Empty partitions"); } - const bool versionChanged = !TableVersion || TableVersion != entry.GeneralVersion; TableVersion = entry.GeneralVersion; - KeyDesc = std::move(entry.KeyDescription); - this->CreateSenders(MakePartitionIds(KeyDesc->GetPartitions()), versionChanged); + this->CreateSenders(NChangeExchange::MakePartitionIds(KeyDesc->GetPartitions())); if (!Initialized) { this->Send(Worker, new TEvWorker::TEvHandshake());