Skip to content

Commit

Permalink
Merge 88cda24 into 051c6cb
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL authored Oct 1, 2024
2 parents 051c6cb + 88cda24 commit f877a3e
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 103 deletions.
106 changes: 5 additions & 101 deletions ydb/core/tx/datashard/change_sender_cdc_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@

#include <ydb/core/change_exchange/change_sender.h>
#include <ydb/core/change_exchange/change_sender_monitoring.h>
#include <ydb/core/persqueue/partition_key_range/partition_key_range.h>
#include <ydb/core/persqueue/writer/source_id_encoding.h>
#include <ydb/core/persqueue/writer/writer.h>
#include <ydb/core/scheme/protos/type_info.pb.h>
#include <ydb/core/tx/scheme_cache/helpers.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/library/actors/core/actor_bootstrapped.h>
Expand Down Expand Up @@ -351,45 +349,6 @@ class TCdcChangeSenderMain
, public NChangeExchange::IChangeSenderFactory
, private NSchemeCache::TSchemeCacheHelpers
{
struct TPQPartitionInfo {
ui32 PartitionId;
ui64 ShardId;
TPartitionKeyRange KeyRange;

struct TLess {
TConstArrayRef<NScheme::TTypeInfo> Schema;

TLess(const TVector<NScheme::TTypeInfo>& schema)
: Schema(schema)
{
}

bool operator()(const TPQPartitionInfo& lhs, const TPQPartitionInfo& rhs) const {
Y_ABORT_UNLESS(lhs.KeyRange.ToBound || rhs.KeyRange.ToBound);

if (!lhs.KeyRange.ToBound) {
return false;
}

if (!rhs.KeyRange.ToBound) {
return true;
}

Y_ABORT_UNLESS(lhs.KeyRange.ToBound && rhs.KeyRange.ToBound);

const int compares = CompareTypedCellVectors(
lhs.KeyRange.ToBound->GetCells().data(),
rhs.KeyRange.ToBound->GetCells().data(),
Schema.data(), Schema.size()
);

return (compares < 0);
}

}; // TLess

}; // TPQPartitionInfo

TStringBuf GetLogPrefix() const {
if (!LogPrefix) {
LogPrefix = TStringBuilder()
Expand Down Expand Up @@ -615,74 +574,19 @@ class TCdcChangeSenderMain
const auto& pqDesc = entry.PQGroupInfo->Description;
const auto& pqConfig = pqDesc.GetPQTabletConfig();

TVector<NScheme::TTypeInfo> schema;
PartitionToShard.clear();

schema.reserve(pqConfig.PartitionKeySchemaSize());
for (const auto& keySchema : pqConfig.GetPartitionKeySchema()) {
if (keySchema.GetTypeId() == NScheme::NTypeIds::Pg) {
schema.push_back(NScheme::TTypeInfo(
NPg::TypeDescFromPgTypeId(keySchema.GetTypeInfo().GetPgTypeId())));
} else {
schema.push_back(NScheme::TTypeInfo(keySchema.GetTypeId()));
}
}

TSet<TPQPartitionInfo, TPQPartitionInfo::TLess> partitions(schema);

for (const auto& partition : pqDesc.GetPartitions()) {
const auto partitionId = partition.GetPartitionId();
const auto shardId = partition.GetTabletId();

PartitionToShard.emplace(partitionId, shardId);

auto keyRange = TPartitionKeyRange::Parse(partition.GetKeyRange());
Y_ABORT_UNLESS(!keyRange.FromBound || keyRange.FromBound->GetCells().size() == schema.size());
Y_ABORT_UNLESS(!keyRange.ToBound || keyRange.ToBound->GetCells().size() == schema.size());

partitions.insert({partitionId, shardId, std::move(keyRange)});
}

// used to validate
bool isFirst = true;
const TPQPartitionInfo* prev = nullptr;

TVector<NKikimr::TKeyDesc::TPartitionInfo> partitioning;
partitioning.reserve(partitions.size());
for (const auto& cur : partitions) {
if (isFirst) {
isFirst = false;
Y_ABORT_UNLESS(!cur.KeyRange.FromBound.Defined());
} else {
Y_ABORT_UNLESS(cur.KeyRange.FromBound.Defined());
Y_ABORT_UNLESS(prev);
Y_ABORT_UNLESS(prev->KeyRange.ToBound.Defined());
// TODO: compare cells
}

auto& part = partitioning.emplace_back(cur.PartitionId); // TODO: double-check that it is right partitioning

if (cur.KeyRange.ToBound) {
part.Range = NKikimr::TKeyDesc::TPartitionRangeInfo{
.EndKeyPrefix = *cur.KeyRange.ToBound,
};
} else {
part.Range = NKikimr::TKeyDesc::TPartitionRangeInfo{};
}

prev = &cur;
}

if (prev) {
Y_ABORT_UNLESS(!prev->KeyRange.ToBound.Defined());
PartitionToShard.emplace(partition.GetPartitionId(), partition.GetTabletId());
}

const auto topicVersion = entry.Self->Info.GetVersion().GetGeneralVersion();
const bool versionChanged = !TopicVersion || TopicVersion != topicVersion;
TopicVersion = topicVersion;

KeyDesc = NKikimr::TKeyDesc::CreateMiniKeyDesc(schema);
KeyDesc->Partitioning = std::make_shared<TVector<NKikimr::TKeyDesc::TPartitionInfo>>(std::move(partitioning));
Y_ABORT_UNLESS(entry.PQGroupInfo->Schema);
KeyDesc = NKikimr::TKeyDesc::CreateMiniKeyDesc(entry.PQGroupInfo->Schema);
Y_ABORT_UNLESS(entry.PQGroupInfo->Partitioning);
KeyDesc->Partitioning = std::make_shared<TVector<NKikimr::TKeyDesc::TPartitionInfo>>(entry.PQGroupInfo->Partitioning);

if (::NKikimrPQ::TPQTabletConfig::TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_DISABLED != pqConfig.GetPartitionStrategy().GetPartitionStrategyType()) {
SetPartitionResolver(new TBoundaryPartitionResolver(pqDesc));
Expand Down
1 change: 0 additions & 1 deletion ydb/core/tx/datashard/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,6 @@ PEERDIR(
ydb/core/formats
ydb/core/io_formats/ydb_dump
ydb/core/kqp/runtime
ydb/core/persqueue/partition_key_range
ydb/core/persqueue/writer
ydb/core/protos
ydb/core/tablet
Expand Down
64 changes: 63 additions & 1 deletion ydb/core/tx/scheme_board/cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
#include <ydb/core/base/path.h>
#include <ydb/core/base/tabletid.h>
#include <ydb/core/base/feature_flags.h>
#include <ydb/core/persqueue/partition_key_range/partition_key_range.h>
#include <ydb/core/persqueue/utils.h>
#include <ydb/core/protos/flat_tx_scheme.pb.h>
#include <ydb/library/services/services.pb.h>
#include <ydb/core/scheme/scheme_tabledefs.h>
#include <ydb/core/scheme/scheme_types_proto.h>
#include <ydb/core/sys_view/common/schema.h>
Expand All @@ -26,6 +26,8 @@
#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/actors/core/hfunc.h>
#include <ydb/library/actors/core/log.h>
#include <ydb/library/services/services.pb.h>

#include <library/cpp/json/writer/json.h>

#include <util/generic/algorithm.h>
Expand Down Expand Up @@ -979,6 +981,65 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
return partitions;
}

static void FillTopicPartitioning(
const NKikimrSchemeOp::TPersQueueGroupDescription& pqDesc,
TVector<NScheme::TTypeInfo>& schema,
TVector<NKikimr::TKeyDesc::TPartitionInfo>& partitioning)
{
const auto& pqConfig = pqDesc.GetPQTabletConfig();
if (pqConfig.GetPartitionKeySchema().empty()) {
return;
}

schema.reserve(pqConfig.PartitionKeySchemaSize());
for (const auto& keySchema : pqConfig.GetPartitionKeySchema()) {
if (keySchema.GetTypeId() == NScheme::NTypeIds::Pg) {
schema.push_back(NScheme::TTypeInfo(NPg::TypeDescFromPgTypeId(keySchema.GetTypeInfo().GetPgTypeId())));
} else {
schema.push_back(NScheme::TTypeInfo(keySchema.GetTypeId()));
}
}

partitioning.reserve(pqDesc.PartitionsSize());
for (const auto& partition : pqDesc.GetPartitions()) {
auto keyRange = NPQ::TPartitionKeyRange::Parse(partition.GetKeyRange());
Y_ABORT_UNLESS(!keyRange.FromBound || keyRange.FromBound->GetCells().size() == schema.size());
Y_ABORT_UNLESS(!keyRange.ToBound || keyRange.ToBound->GetCells().size() == schema.size());

auto& info = partitioning.emplace_back(partition.GetPartitionId());
if (keyRange.ToBound) {
info.Range = NKikimr::TKeyDesc::TPartitionRangeInfo{
.EndKeyPrefix = *keyRange.ToBound,
};
} else {
info.Range = NKikimr::TKeyDesc::TPartitionRangeInfo{};
}
}

Sort(partitioning.begin(), partitioning.end(), [&schema](const auto& lhs, const auto& rhs) {
Y_ABORT_UNLESS(lhs.Range && rhs.Range);
Y_ABORT_UNLESS(lhs.Range->EndKeyPrefix || rhs.Range->EndKeyPrefix);

if (!lhs.Range->EndKeyPrefix) {
return false;
}

if (!rhs.Range->EndKeyPrefix) {
return true;
}

Y_ABORT_UNLESS(lhs.Range->EndKeyPrefix && rhs.Range->EndKeyPrefix);

const int compares = CompareTypedCellVectors(
lhs.Range->EndKeyPrefix.GetCells().data(),
rhs.Range->EndKeyPrefix.GetCells().data(),
schema.data(), schema.size()
);

return (compares < 0);
});
}

bool IsSysTable() const {
return Kind == TNavigate::KindTable && PathId.OwnerId == TSysTables::SysSchemeShard;
}
Expand Down Expand Up @@ -1487,6 +1548,7 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
if (Created) {
NPQ::Migrate(*pathDesc.MutablePersQueueGroup()->MutablePQTabletConfig());
FillInfo(Kind, PQGroupInfo, std::move(*pathDesc.MutablePersQueueGroup()));
FillTopicPartitioning(PQGroupInfo->Description, PQGroupInfo->Schema, PQGroupInfo->Partitioning);
}
break;
case NKikimrSchemeOp::EPathTypeCdcStream:
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/scheme_board/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ PEERDIR(
ydb/library/actors/core
ydb/core/base
ydb/core/mon
ydb/core/persqueue/partition_key_range
ydb/core/protos
ydb/core/sys_view/common
ydb/core/tx/scheme_cache
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/scheme_cache/scheme_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ struct TSchemeCacheNavigate {
struct TPQGroupInfo : public TAtomicRefCount<TPQGroupInfo> {
EKind Kind = KindUnknown;
NKikimrSchemeOp::TPersQueueGroupDescription Description;
TVector<NScheme::TTypeInfo> Schema;
TVector<NKikimr::TKeyDesc::TPartitionInfo> Partitioning;
};

struct TRtmrVolumeInfo : public TAtomicRefCount<TRtmrVolumeInfo> {
Expand Down

0 comments on commit f877a3e

Please sign in to comment.