Skip to content

Commit

Permalink
Merge c859f59 into 66970a9
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL authored Oct 3, 2024
2 parents 66970a9 + c859f59 commit fa8b876
Show file tree
Hide file tree
Showing 12 changed files with 136 additions and 144 deletions.
16 changes: 14 additions & 2 deletions ydb/core/change_exchange/change_sender_common_ops.h
Original file line number Diff line number Diff line change
Expand Up @@ -413,8 +413,10 @@ class TBaseChangeSender {
}

TActorId GetChangeServer() const { return ChangeServer; }
void CreateSenders(const TVector<ui64>& partitionIds, bool partitioningChanged = true) {
if (partitioningChanged) {

private:
void CreateSendersImpl(const TVector<ui64>& partitionIds) {
if (partitionIds) {
CreateMissingSenders(partitionIds);
} else {
RecreateSenders(GonePartitions);
Expand All @@ -427,6 +429,16 @@ class TBaseChangeSender {
}
}

protected:
void CreateSenders(const TVector<ui64>& partitionIds) {
Y_ABORT_UNLESS(partitionIds);
CreateSendersImpl(partitionIds);
}

void CreateSenders() {
CreateSendersImpl({});
}

void KillSenders() {
for (const auto& [_, sender] : std::exchange(Senders, {})) {
if (sender.ActorId) {
Expand Down
15 changes: 15 additions & 0 deletions ydb/core/change_exchange/util.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#include "util.h"

namespace NKikimr::NChangeExchange {

TVector<ui64> MakePartitionIds(const TVector<TKeyDesc::TPartitionInfo>& partitions) {
TVector<ui64> result(::Reserve(partitions.size()));

for (const auto& partition : partitions) {
result.push_back(partition.ShardId);
}

return result;
}

}
9 changes: 9 additions & 0 deletions ydb/core/change_exchange/util.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#pragma once

#include <ydb/core/scheme/scheme_tabledefs.h>

namespace NKikimr::NChangeExchange {

TVector<ui64> MakePartitionIds(const TVector<TKeyDesc::TPartitionInfo>& partitions);

}
1 change: 1 addition & 0 deletions ydb/core/change_exchange/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ SRCS(
change_exchange.cpp
change_record.cpp
change_sender_monitoring.cpp
util.cpp
)

GENERATE_ENUM_SERIALIZATION(change_record.h)
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/scheme/scheme_tablecell.h
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,10 @@ class TSerializedCellVec {
return Cells;
}

explicit operator bool() const {
return !Cells.empty();
}

static void Serialize(TString& res, TConstArrayRef<TCell> cells);

static TString Serialize(TConstArrayRef<TCell> cells);
Expand Down
20 changes: 7 additions & 13 deletions ydb/core/tx/datashard/change_sender_async_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/core/change_exchange/change_sender_common_ops.h>
#include <ydb/core/change_exchange/change_sender_monitoring.h>
#include <ydb/core/change_exchange/util.h>
#include <ydb/core/tablet_flat/flat_row_eggs.h>
#include <ydb/core/tx/scheme_cache/helpers.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
Expand Down Expand Up @@ -435,16 +436,6 @@ class TAsyncIndexChangeSenderMain
return Check(&TSchemeCacheHelpers::CheckEntryKind<T>, &TThis::LogWarnAndRetry, entry, expected);
}

static TVector<ui64> MakePartitionIds(const TVector<TKeyDesc::TPartitionInfo>& partitions) {
TVector<ui64> result(Reserve(partitions.size()));

for (const auto& partition : partitions) {
result.push_back(partition.ShardId); // partition = shard
}

return result;
}

/// ResolveUserTable

void ResolveUserTable() {
Expand Down Expand Up @@ -611,6 +602,11 @@ class TAsyncIndexChangeSenderMain
return;
}

if (IndexTableVersion && IndexTableVersion == entry.Self->Info.GetVersion().GetGeneralVersion()) {
CreateSenders();
return Become(&TThis::StateMain);
}

TagMap.clear();
TVector<NScheme::TTypeInfo> keyColumnTypes;

Expand Down Expand Up @@ -692,11 +688,9 @@ class TAsyncIndexChangeSenderMain
return Retry();
}

const bool versionChanged = !IndexTableVersion || IndexTableVersion != entry.GeneralVersion;
IndexTableVersion = entry.GeneralVersion;

KeyDesc = std::move(entry.KeyDescription);
CreateSenders(MakePartitionIds(KeyDesc->GetPartitions()), versionChanged);
CreateSenders(NChangeExchange::MakePartitionIds(KeyDesc->GetPartitions()));

Become(&TThis::StateMain);
}
Expand Down
129 changes: 15 additions & 114 deletions ydb/core/tx/datashard/change_sender_cdc_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

#include <ydb/core/change_exchange/change_sender_common_ops.h>
#include <ydb/core/change_exchange/change_sender_monitoring.h>
#include <ydb/core/persqueue/partition_key_range/partition_key_range.h>
#include <ydb/core/change_exchange/util.h>
#include <ydb/core/persqueue/writer/source_id_encoding.h>
#include <ydb/core/persqueue/writer/writer.h>
#include <ydb/core/tx/scheme_cache/helpers.h>
Expand Down Expand Up @@ -300,45 +300,6 @@ class TCdcChangeSenderMain
, public NChangeExchange::ISenderFactory
, private NSchemeCache::TSchemeCacheHelpers
{
struct TPQPartitionInfo {
ui32 PartitionId;
ui64 ShardId;
TPartitionKeyRange KeyRange;

struct TLess {
TConstArrayRef<NScheme::TTypeInfo> Schema;

TLess(const TVector<NScheme::TTypeInfo>& schema)
: Schema(schema)
{
}

bool operator()(const TPQPartitionInfo& lhs, const TPQPartitionInfo& rhs) const {
Y_ABORT_UNLESS(lhs.KeyRange.ToBound || rhs.KeyRange.ToBound);

if (!lhs.KeyRange.ToBound) {
return false;
}

if (!rhs.KeyRange.ToBound) {
return true;
}

Y_ABORT_UNLESS(lhs.KeyRange.ToBound && rhs.KeyRange.ToBound);

const int compares = CompareTypedCellVectors(
lhs.KeyRange.ToBound->GetCells().data(),
rhs.KeyRange.ToBound->GetCells().data(),
Schema.data(), Schema.size()
);

return (compares < 0);
}

}; // TLess

}; // TPQPartitionInfo

TStringBuf GetLogPrefix() const {
if (!LogPrefix) {
LogPrefix = TStringBuilder()
Expand Down Expand Up @@ -430,16 +391,6 @@ class TCdcChangeSenderMain
return false;
}

static TVector<ui64> MakePartitionIds(const TVector<NKikimr::TKeyDesc::TPartitionInfo>& partitions) {
TVector<ui64> result(Reserve(partitions.size()));

for (const auto& partition : partitions) {
result.push_back(partition.ShardId);
}

return result;
}

/// ResolveCdcStream

void ResolveCdcStream() {
Expand Down Expand Up @@ -561,77 +512,27 @@ class TCdcChangeSenderMain
return;
}

const auto& pqDesc = entry.PQGroupInfo->Description;
const auto& pqConfig = pqDesc.GetPQTabletConfig();

TVector<NScheme::TTypeInfo> schema;
PartitionToShard.clear();

schema.reserve(pqConfig.PartitionKeySchemaSize());
for (const auto& keySchema : pqConfig.GetPartitionKeySchema()) {
// TODO: support pg types
schema.push_back(NScheme::TTypeInfo(keySchema.GetTypeId()));
const auto topicVersion = entry.Self->Info.GetVersion().GetGeneralVersion();
if (TopicVersion && TopicVersion == topicVersion) {
CreateSenders();
return Become(&TThis::StateMain);
}

TSet<TPQPartitionInfo, TPQPartitionInfo::TLess> partitions(schema);
THashSet<ui64> shards;

for (const auto& partition : pqDesc.GetPartitions()) {
const auto partitionId = partition.GetPartitionId();
const auto shardId = partition.GetTabletId();

PartitionToShard.emplace(partitionId, shardId);

auto keyRange = TPartitionKeyRange::Parse(partition.GetKeyRange());
Y_ABORT_UNLESS(!keyRange.FromBound || keyRange.FromBound->GetCells().size() == schema.size());
Y_ABORT_UNLESS(!keyRange.ToBound || keyRange.ToBound->GetCells().size() == schema.size());

partitions.insert({partitionId, shardId, std::move(keyRange)});
shards.insert(shardId);
}

// used to validate
bool isFirst = true;
const TPQPartitionInfo* prev = nullptr;

TVector<NKikimr::TKeyDesc::TPartitionInfo> partitioning;
partitioning.reserve(partitions.size());
for (const auto& cur : partitions) {
if (isFirst) {
isFirst = false;
Y_ABORT_UNLESS(!cur.KeyRange.FromBound.Defined());
} else {
Y_ABORT_UNLESS(cur.KeyRange.FromBound.Defined());
Y_ABORT_UNLESS(prev);
Y_ABORT_UNLESS(prev->KeyRange.ToBound.Defined());
// TODO: compare cells
}

auto& part = partitioning.emplace_back(cur.PartitionId); // TODO: double-check that it is right partitioning

if (cur.KeyRange.ToBound) {
part.Range = NKikimr::TKeyDesc::TPartitionRangeInfo{
.EndKeyPrefix = *cur.KeyRange.ToBound,
};
} else {
part.Range = NKikimr::TKeyDesc::TPartitionRangeInfo{};
}
TopicVersion = topicVersion;

prev = &cur;
}
const auto& pqDesc = entry.PQGroupInfo->Description;

if (prev) {
Y_ABORT_UNLESS(!prev->KeyRange.ToBound.Defined());
PartitionToShard.clear();
for (const auto& partition : pqDesc.GetPartitions()) {
PartitionToShard.emplace(partition.GetPartitionId(), partition.GetTabletId());
}

const auto topicVersion = entry.Self->Info.GetVersion().GetGeneralVersion();
const bool versionChanged = !TopicVersion || TopicVersion != topicVersion;
TopicVersion = topicVersion;

KeyDesc = NKikimr::TKeyDesc::CreateMiniKeyDesc(schema);
KeyDesc->Partitioning = std::make_shared<TVector<NKikimr::TKeyDesc::TPartitionInfo>>(std::move(partitioning));
Y_ABORT_UNLESS(entry.PQGroupInfo->Schema);
KeyDesc = NKikimr::TKeyDesc::CreateMiniKeyDesc(entry.PQGroupInfo->Schema);
Y_ABORT_UNLESS(entry.PQGroupInfo->Partitioning);
KeyDesc->Partitioning = std::make_shared<TVector<NKikimr::TKeyDesc::TPartitionInfo>>(entry.PQGroupInfo->Partitioning);

CreateSenders(MakePartitionIds(*KeyDesc->Partitioning), versionChanged);
CreateSenders(NChangeExchange::MakePartitionIds(*KeyDesc->Partitioning));
Become(&TThis::StateMain);
}

Expand Down
1 change: 0 additions & 1 deletion ydb/core/tx/datashard/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,6 @@ PEERDIR(
ydb/core/formats
ydb/core/io_formats/ydb_dump
ydb/core/kqp/runtime
ydb/core/persqueue/partition_key_range
ydb/core/persqueue/writer
ydb/core/protos
ydb/core/tablet
Expand Down
21 changes: 8 additions & 13 deletions ydb/core/tx/replication/service/table_writer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/core/change_exchange/change_sender_common_ops.h>
#include <ydb/core/change_exchange/util.h>
#include <ydb/core/scheme/scheme_tabledefs.h>
#include <ydb/core/tablet_flat/flat_row_eggs.h>
#include <ydb/core/tx/datashard/datashard.h>
Expand Down Expand Up @@ -278,16 +279,6 @@ class TLocalTableWriter
return Check(&TSchemeCacheHelpers::CheckEntryKind<T>, &TThis::LogCritAndLeave, entry, expected);
}

static TVector<ui64> MakePartitionIds(const TVector<TKeyDesc::TPartitionInfo>& partitions) {
TVector<ui64> result(::Reserve(partitions.size()));

for (const auto& partition : partitions) {
result.push_back(partition.ShardId);
}

return result;
}

void Registered(TActorSystem*, const TActorId&) override {
this->ChangeServer = this->SelfId();
}
Expand Down Expand Up @@ -348,6 +339,12 @@ class TLocalTableWriter
return;
}

if (TableVersion && TableVersion == entry.Self->Info.GetVersion().GetGeneralVersion()) {
Y_ABORT_UNLESS(Initialized);
Resolving = false;
return this->CreateSenders();
}

auto schema = MakeIntrusive<TLightweightSchema>();
if (entry.Self && entry.Self->Info.HasVersion()) {
schema->Version = entry.Self->Info.GetVersion().GetTableSchemaVersion();
Expand Down Expand Up @@ -415,11 +412,9 @@ class TLocalTableWriter
return LogWarnAndRetry("Empty partitions");
}

const bool versionChanged = !TableVersion || TableVersion != entry.GeneralVersion;
TableVersion = entry.GeneralVersion;

KeyDesc = std::move(entry.KeyDescription);
this->CreateSenders(MakePartitionIds(KeyDesc->GetPartitions()), versionChanged);
this->CreateSenders(NChangeExchange::MakePartitionIds(KeyDesc->GetPartitions()));

if (!Initialized) {
this->Send(Worker, new TEvWorker::TEvHandshake());
Expand Down
Loading

0 comments on commit fa8b876

Please sign in to comment.