Skip to content

Commit

Permalink
Better handling of connection loss (#9993)
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL committed Oct 3, 2024
1 parent 3a91d72 commit 14faff2
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 46 deletions.
20 changes: 15 additions & 5 deletions ydb/core/change_exchange/change_sender_common_ops.h
Original file line number Diff line number Diff line change
Expand Up @@ -413,20 +413,30 @@ class TBaseChangeSender {
}

TActorId GetChangeServer() const { return ChangeServer; }
void CreateSenders(const TVector<ui64>& partitionIds, bool partitioningChanged = true) {
if (partitioningChanged) {

private:
void CreateSendersImpl(const TVector<ui64>& partitionIds) {
if (partitionIds) {
CreateMissingSenders(partitionIds);
} else {
RecreateSenders(GonePartitions);
RecreateSenders(std::exchange(GonePartitions, {}));
}

GonePartitions.clear();

if (!Enqueued || !RequestRecords()) {
SendRecords();
}
}

protected:
void CreateSenders(const TVector<ui64>& partitionIds) {
Y_ABORT_UNLESS(partitionIds);
CreateSendersImpl(partitionIds);
}

void CreateSenders() {
CreateSendersImpl({});
}

void KillSenders() {
for (const auto& [_, sender] : std::exchange(Senders, {})) {
if (sender.ActorId) {
Expand Down
15 changes: 15 additions & 0 deletions ydb/core/change_exchange/util.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#include "util.h"

namespace NKikimr::NChangeExchange {

TVector<ui64> MakePartitionIds(const TVector<TKeyDesc::TPartitionInfo>& partitions) {
TVector<ui64> result(::Reserve(partitions.size()));

for (const auto& partition : partitions) {
result.push_back(partition.ShardId);
}

return result;
}

}
9 changes: 9 additions & 0 deletions ydb/core/change_exchange/util.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#pragma once

#include <ydb/core/scheme/scheme_tabledefs.h>

namespace NKikimr::NChangeExchange {

TVector<ui64> MakePartitionIds(const TVector<TKeyDesc::TPartitionInfo>& partitions);

}
1 change: 1 addition & 0 deletions ydb/core/change_exchange/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ SRCS(
change_exchange.cpp
change_record.cpp
change_sender_monitoring.cpp
util.cpp
)

GENERATE_ENUM_SERIALIZATION(change_record.h)
Expand Down
20 changes: 7 additions & 13 deletions ydb/core/tx/datashard/change_sender_async_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/core/change_exchange/change_sender_common_ops.h>
#include <ydb/core/change_exchange/change_sender_monitoring.h>
#include <ydb/core/change_exchange/util.h>
#include <ydb/core/tablet_flat/flat_row_eggs.h>
#include <ydb/core/tx/scheme_cache/helpers.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
Expand Down Expand Up @@ -435,16 +436,6 @@ class TAsyncIndexChangeSenderMain
return Check(&TSchemeCacheHelpers::CheckEntryKind<T>, &TThis::LogWarnAndRetry, entry, expected);
}

static TVector<ui64> MakePartitionIds(const TVector<TKeyDesc::TPartitionInfo>& partitions) {
TVector<ui64> result(Reserve(partitions.size()));

for (const auto& partition : partitions) {
result.push_back(partition.ShardId); // partition = shard
}

return result;
}

/// ResolveUserTable

void ResolveUserTable() {
Expand Down Expand Up @@ -611,6 +602,11 @@ class TAsyncIndexChangeSenderMain
return;
}

if (IndexTableVersion && IndexTableVersion == entry.Self->Info.GetVersion().GetGeneralVersion()) {
CreateSenders();
return Become(&TThis::StateMain);
}

TagMap.clear();
TVector<NScheme::TTypeInfo> keyColumnTypes;

Expand Down Expand Up @@ -692,11 +688,9 @@ class TAsyncIndexChangeSenderMain
return Retry();
}

const bool versionChanged = !IndexTableVersion || IndexTableVersion != entry.GeneralVersion;
IndexTableVersion = entry.GeneralVersion;

KeyDesc = std::move(entry.KeyDescription);
CreateSenders(MakePartitionIds(KeyDesc->GetPartitions()), versionChanged);
CreateSenders(NChangeExchange::MakePartitionIds(KeyDesc->GetPartitions()));

Become(&TThis::StateMain);
}
Expand Down
25 changes: 10 additions & 15 deletions ydb/core/tx/datashard/change_sender_cdc_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include <ydb/core/change_exchange/change_sender_common_ops.h>
#include <ydb/core/change_exchange/change_sender_monitoring.h>
#include <ydb/core/change_exchange/util.h>
#include <ydb/core/persqueue/writer/source_id_encoding.h>
#include <ydb/core/persqueue/writer/writer.h>
#include <ydb/core/tx/scheme_cache/helpers.h>
Expand Down Expand Up @@ -390,16 +391,6 @@ class TCdcChangeSenderMain
return false;
}

static TVector<ui64> MakePartitionIds(const TVector<NKikimr::TKeyDesc::TPartitionInfo>& partitions) {
TVector<ui64> result(Reserve(partitions.size()));

for (const auto& partition : partitions) {
result.push_back(partition.ShardId);
}

return result;
}

/// ResolveCdcStream

void ResolveCdcStream() {
Expand Down Expand Up @@ -521,6 +512,14 @@ class TCdcChangeSenderMain
return;
}

const auto topicVersion = entry.Self->Info.GetVersion().GetGeneralVersion();
if (TopicVersion && TopicVersion == topicVersion) {
CreateSenders();
return Become(&TThis::StateMain);
}

TopicVersion = topicVersion;

const auto& pqDesc = entry.PQGroupInfo->Description;
const auto& pqConfig = pqDesc.GetPQTabletConfig();

Expand All @@ -529,16 +528,12 @@ class TCdcChangeSenderMain
PartitionToShard.emplace(partition.GetPartitionId(), partition.GetTabletId());
}

const auto topicVersion = entry.Self->Info.GetVersion().GetGeneralVersion();
const bool versionChanged = !TopicVersion || TopicVersion != topicVersion;
TopicVersion = topicVersion;

Y_ABORT_UNLESS(entry.PQGroupInfo->Schema);
KeyDesc = NKikimr::TKeyDesc::CreateMiniKeyDesc(entry.PQGroupInfo->Schema);
Y_ABORT_UNLESS(entry.PQGroupInfo->Partitioning);
KeyDesc->Partitioning = std::make_shared<TVector<NKikimr::TKeyDesc::TPartitionInfo>>(entry.PQGroupInfo->Partitioning);

CreateSenders(MakePartitionIds(*KeyDesc->Partitioning), versionChanged);
CreateSenders(NChangeExchange::MakePartitionIds(*KeyDesc->Partitioning));
Become(&TThis::StateMain);
}

Expand Down
21 changes: 8 additions & 13 deletions ydb/core/tx/replication/service/table_writer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/core/change_exchange/change_sender_common_ops.h>
#include <ydb/core/change_exchange/util.h>
#include <ydb/core/scheme/scheme_tabledefs.h>
#include <ydb/core/tablet_flat/flat_row_eggs.h>
#include <ydb/core/tx/datashard/datashard.h>
Expand Down Expand Up @@ -278,16 +279,6 @@ class TLocalTableWriter
return Check(&TSchemeCacheHelpers::CheckEntryKind<T>, &TThis::LogCritAndLeave, entry, expected);
}

static TVector<ui64> MakePartitionIds(const TVector<TKeyDesc::TPartitionInfo>& partitions) {
TVector<ui64> result(::Reserve(partitions.size()));

for (const auto& partition : partitions) {
result.push_back(partition.ShardId);
}

return result;
}

void Registered(TActorSystem*, const TActorId&) override {
this->ChangeServer = this->SelfId();
}
Expand Down Expand Up @@ -348,6 +339,12 @@ class TLocalTableWriter
return;
}

if (TableVersion && TableVersion == entry.Self->Info.GetVersion().GetGeneralVersion()) {
Y_ABORT_UNLESS(Initialized);
Resolving = false;
return CreateSenders();
}

auto schema = MakeIntrusive<TLightweightSchema>();
if (entry.Self && entry.Self->Info.HasVersion()) {
schema->Version = entry.Self->Info.GetVersion().GetTableSchemaVersion();
Expand Down Expand Up @@ -415,11 +412,9 @@ class TLocalTableWriter
return LogWarnAndRetry("Empty partitions");
}

const bool versionChanged = !TableVersion || TableVersion != entry.GeneralVersion;
TableVersion = entry.GeneralVersion;

KeyDesc = std::move(entry.KeyDescription);
this->CreateSenders(MakePartitionIds(KeyDesc->GetPartitions()), versionChanged);
this->CreateSenders(NChangeExchange::MakePartitionIds(KeyDesc->GetPartitions()));

if (!Initialized) {
this->Send(Worker, new TEvWorker::TEvHandshake());
Expand Down

0 comments on commit 14faff2

Please sign in to comment.