Skip to content

Commit

Permalink
Merge 9c64c73 into b2b16e8
Browse files Browse the repository at this point in the history
  • Loading branch information
Enjection authored Sep 22, 2024
2 parents b2b16e8 + 9c64c73 commit ad2b318
Show file tree
Hide file tree
Showing 9 changed files with 644 additions and 305 deletions.
2 changes: 2 additions & 0 deletions ydb/core/change_exchange/change_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
namespace NKikimr::NChangeExchange {

void TChangeSender::LazyCreateSender(THashMap<ui64, TSender>& senders, ui64 partitionId) {
++UninitSenders;
auto res = senders.emplace(partitionId, TSender{});
Y_ABORT_UNLESS(res.second);

Expand All @@ -27,6 +28,7 @@ void TChangeSender::RegisterSender(ui64 partitionId) {

Y_ABORT_UNLESS(!sender.ActorId);
sender.ActorId = ActorOps->RegisterWithSameMailbox(SenderFactory->CreateSender(partitionId));
--UninitSenders;
}

void TChangeSender::CreateMissingSenders(const TVector<ui64>& partitionIds) {
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/change_exchange/change_sender.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,10 @@ class TChangeSender {
return ReadySenders == Senders.size();
}

inline bool IsAllSendersReadyOrUninit() {
return ReadySenders + UninitSenders == Senders.size();
}

void SetPartitionResolver(IPartitionResolverVisitor* partitionResolver) {
PartitionResolver.Reset(partitionResolver);
}
Expand All @@ -194,6 +198,7 @@ class TChangeSender {

THashMap<ui64, TSender> Senders; // ui64 is partition id
ui64 ReadySenders = 0;
ui64 UninitSenders = 0;
TSet<TEnqueuedRecord> Enqueued;
TSet<TIncompleteRecord> PendingBody;
TMap<ui64, IChangeRecord::TPtr> PendingSent; // ui64 is order
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/datashard/change_exchange_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ namespace NDataShard {

IActor* CreateAsyncIndexChangeSender(const TDataShardId& dataShard, const TTableId& userTableId, const TPathId& indexPathId);
IActor* CreateCdcStreamChangeSender(const TDataShardId& dataShard, const TPathId& streamPathId);
IActor* CreateIncrRestoreChangeSender(const TActorId& changeServerActor, const TTableId& userTableId, const TPathId& restoreTargetPathId);

} // NDataShard
} // NKikimr
304 changes: 1 addition & 303 deletions ydb/core/tx/datashard/change_sender_async_index.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
#include "change_exchange.h"
#include "change_exchange_impl.h"
#include "change_record.h"
#include "change_sender_base.h"
#include "datashard_impl.h"

#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/core/change_exchange/change_sender.h>
#include <ydb/core/change_exchange/change_sender_monitoring.h>
#include <ydb/core/tablet_flat/flat_row_eggs.h>
#include <ydb/core/tx/scheme_cache/helpers.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
Expand All @@ -21,7 +20,6 @@

namespace NKikimr::NDataShard {

using namespace NTable;
using ESenderType = TEvChangeExchange::ESenderType;

class TBaseChangeSenderShard: public TActorBootstrapped<TBaseChangeSenderShard> {
Expand Down Expand Up @@ -343,29 +341,6 @@ class TBaseChangeSenderShard: public TActorBootstrapped<TBaseChangeSenderShard>

}; // TBaseChangeSenderShard

#define DEFINE_STATE_INTRO \
public: \
struct TStateTag {}; \
private: \
const TDerived* AsDerived() const { \
return static_cast<const TDerived*>(this); \
} \
TDerived* AsDerived() { \
return static_cast<TDerived*>(this); \
} \
TStringBuf GetLogPrefix() const { \
return AsDerived()->GetLogPrefix(); \
}

#define USE_STATE(STATE) \
friend class T ## STATE ## State; \
STATEFN(State ## STATE) { \
return T ## STATE ## State::State(ev); \
} \
bool Is ## STATE ## State() const { \
return CurrentStateFunc() == static_cast<TReceiveFunc>(&TThis::State ## STATE); \
}

template <typename TDerived>
class TResolveIndexState
: virtual public NSchemeCache::TSchemeCacheHelpers
Expand Down Expand Up @@ -435,283 +410,6 @@ class TResolveIndexState
}
};

template <typename TDerived>
class TResolveUserTableState
: virtual public NSchemeCache::TSchemeCacheHelpers
{
DEFINE_STATE_INTRO;

public:
void ResolveUserTable() {
auto request = MakeHolder<TNavigate>();
request->ResultSet.emplace_back(MakeNavigateEntry(AsDerived()->UserTableId, TNavigate::OpTable));

AsDerived()->Send(MakeSchemeCacheID(), new TEvNavigate(request.Release()));
AsDerived()->Become(&TDerived::StateResolveUserTable);
}

STATEFN(State) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
sFunc(TEvents::TEvWakeup, ResolveUserTable);
default:
return AsDerived()->StateBase(ev);
}
}

private:
void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
const auto& result = ev->Get()->Request;

LOG_D("HandleUserTable TEvTxProxySchemeCache::TEvNavigateKeySetResult"
<< ": result# " << (result ? result->ToString(*AppData()->TypeRegistry) : "nullptr"));

if (!AsDerived()->CheckNotEmpty(result)) {
return;
}

if (!AsDerived()->CheckEntriesCount(result, 1)) {
return;
}

const auto& entry = result->ResultSet.at(0);

if (!AsDerived()->CheckTableId(entry, AsDerived()->UserTableId)) {
return;
}

if (!AsDerived()->CheckEntrySucceeded(entry)) {
return;
}

if (!AsDerived()->CheckEntryKind(entry, TNavigate::KindTable)) {
return;
}

for (const auto& [tag, column] : entry.Columns) {
Y_DEBUG_ABORT_UNLESS(!AsDerived()->MainColumnToTag.contains(column.Name));
AsDerived()->MainColumnToTag.emplace(column.Name, tag);
}

AsDerived()->NextState(TStateTag{});
}
};

template <typename TDerived>
class TResolveTargetTableState
: virtual public NSchemeCache::TSchemeCacheHelpers
{
DEFINE_STATE_INTRO;

public:
void ResolveTargetTable() {
auto request = MakeHolder<TNavigate>();
request->ResultSet.emplace_back(MakeNavigateEntry(AsDerived()->TargetTablePathId, TNavigate::OpTable));

AsDerived()->Send(MakeSchemeCacheID(), new TEvNavigate(request.Release()));
AsDerived()->Become(&TDerived::StateResolveTargetTable);
}

STATEFN(State) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
sFunc(TEvents::TEvWakeup, OnRetry);
default:
return AsDerived()->StateBase(ev);
}
}

private:
void OnRetry() {
AsDerived()->OnRetry(TStateTag{});
}

void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
const auto& result = ev->Get()->Request;

LOG_D("HandleTargetTable TEvTxProxySchemeCache::TEvNavigateKeySetResult"
<< ": result# " << (result ? result->ToString(*AppData()->TypeRegistry) : "nullptr"));

if (!AsDerived()->CheckNotEmpty(result)) {
return;
}

if (!AsDerived()->CheckEntriesCount(result, 1)) {
return;
}

const auto& entry = result->ResultSet.at(0);

if (!AsDerived()->CheckTableId(entry, AsDerived()->TargetTablePathId)) {
return;
}

if (!AsDerived()->CheckEntrySucceeded(entry)) {
return;
}

if (!AsDerived()->CheckEntryKind(entry, TNavigate::KindTable)) {
return;
}

AsDerived()->TagMap.clear();
TVector<NScheme::TTypeInfo> keyColumnTypes;

for (const auto& [tag, column] : entry.Columns) {
auto it = AsDerived()->MainColumnToTag.find(column.Name);
Y_ABORT_UNLESS(it != AsDerived()->MainColumnToTag.end());

Y_DEBUG_ABORT_UNLESS(!AsDerived()->TagMap.contains(it->second));
AsDerived()->TagMap.emplace(it->second, tag);

if (column.KeyOrder < 0) {
continue;
}

if (keyColumnTypes.size() <= static_cast<ui32>(column.KeyOrder)) {
keyColumnTypes.resize(column.KeyOrder + 1);
}

keyColumnTypes[column.KeyOrder] = column.PType;
}

AsDerived()->KeyDesc = MakeHolder<TKeyDesc>(
entry.TableId,
AsDerived()->GetFullRange(keyColumnTypes.size()).ToTableRange(),
TKeyDesc::ERowOperation::Update,
keyColumnTypes,
TVector<TKeyDesc::TColumnOp>()
);

AsDerived()->SetPartitionResolver(CreateDefaultPartitionResolver(*AsDerived()->KeyDesc.Get()));

AsDerived()->NextState(TStateTag{});
}
};

template <typename TDerived>
class TResolveKeysState
: virtual public NSchemeCache::TSchemeCacheHelpers
{
DEFINE_STATE_INTRO;

public:
void ResolveKeys() {
auto request = MakeHolder<TResolve>();
request->ResultSet.emplace_back(std::move(AsDerived()->KeyDesc));

AsDerived()->Send(MakeSchemeCacheID(), new TEvResolve(request.Release()));
AsDerived()->Become(&TDerived::StateResolveKeys);
}

STATEFN(State) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvTxProxySchemeCache::TEvResolveKeySetResult, Handle);
sFunc(TEvents::TEvWakeup, OnRetry);
default:
return AsDerived()->StateBase(ev);
}
}

private:
void OnRetry() {
AsDerived()->OnRetry(TStateTag{});
}

void Handle(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr& ev) {
const auto& result = ev->Get()->Request;

LOG_D("HandleKeys TEvTxProxySchemeCache::TEvResolveKeySetResult"
<< ": result# " << (result ? result->ToString(*AppData()->TypeRegistry) : "nullptr"));

if (!AsDerived()->CheckNotEmpty(result)) {
return;
}

if (!AsDerived()->CheckEntriesCount(result, 1)) {
return;
}

auto& entry = result->ResultSet.at(0);

if (!AsDerived()->CheckTableId(entry, AsDerived()->TargetTablePathId)) {
return;
}

if (!AsDerived()->CheckEntrySucceeded(entry)) {
return;
}

if (!entry.KeyDescription->GetPartitions()) {
LOG_W("Empty partitions list"
<< ": entry# " << entry.ToString(*AppData()->TypeRegistry));
return AsDerived()->Retry();
}

const bool versionChanged = !AsDerived()->TargetTableVersion || AsDerived()->TargetTableVersion != entry.GeneralVersion;
AsDerived()->TargetTableVersion = entry.GeneralVersion;

AsDerived()->KeyDesc = std::move(entry.KeyDescription);
AsDerived()->CreateSenders(MakePartitionIds(AsDerived()->KeyDesc->GetPartitions()), versionChanged);

AsDerived()->NextState(TStateTag{});
}

static TVector<ui64> MakePartitionIds(const TVector<TKeyDesc::TPartitionInfo>& partitions) {
TVector<ui64> result(Reserve(partitions.size()));

for (const auto& partition : partitions) {
result.push_back(partition.ShardId); // partition = shard
}

return result;
}
};

template <typename TDerived>
struct TSchemeChecksMixin
: virtual private NSchemeCache::TSchemeCacheHelpers
{
const TDerived* AsDerived() const {
return static_cast<const TDerived*>(this);
}

TDerived* AsDerived() {
return static_cast<TDerived*>(this);
}

template <typename CheckFunc, typename FailFunc, typename T, typename... Args>
bool Check(CheckFunc checkFunc, FailFunc failFunc, const T& subject, Args&&... args) {
return checkFunc(AsDerived()->CurrentStateName(), subject, std::forward<Args>(args)..., std::bind(failFunc, AsDerived(), std::placeholders::_1));
}

template <typename T>
bool CheckNotEmpty(const TAutoPtr<T>& result) {
return Check(&TSchemeCacheHelpers::CheckNotEmpty<T>, &TDerived::LogCritAndRetry, result);
}

template <typename T>
bool CheckEntriesCount(const TAutoPtr<T>& result, ui32 expected) {
return Check(&TSchemeCacheHelpers::CheckEntriesCount<T>, &TDerived::LogCritAndRetry, result, expected);
}

template <typename T>
bool CheckTableId(const T& entry, const TTableId& expected) {
return Check(&TSchemeCacheHelpers::CheckTableId<T>, &TDerived::LogCritAndRetry, entry, expected);
}

template <typename T>
bool CheckEntrySucceeded(const T& entry) {
return Check(&TSchemeCacheHelpers::CheckEntrySucceeded<T>, &TDerived::LogWarnAndRetry, entry);
}

template <typename T>
bool CheckEntryKind(const T& entry, TNavigate::EKind expected) {
return Check(&TSchemeCacheHelpers::CheckEntryKind<T>, &TDerived::LogWarnAndRetry, entry, expected);
}

};

class TAsyncIndexChangeSenderMain
: public TActorBootstrapped<TAsyncIndexChangeSenderMain>
, public NChangeExchange::TChangeSender
Expand Down
Loading

0 comments on commit ad2b318

Please sign in to comment.