Skip to content

Commit

Permalink
Merge f7bd921 into 805fb4b
Browse files Browse the repository at this point in the history
  • Loading branch information
Enjection authored Sep 9, 2024
2 parents 805fb4b + f7bd921 commit 21d6fd5
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 24 deletions.
22 changes: 14 additions & 8 deletions ydb/core/change_exchange/change_sender_common_ops.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ class ISenderFactory {
virtual IActor* CreateSender(ui64 partitionId) const = 0;
};

class IChangeSenderIdentity {
public:
virtual ~IChangeSenderIdentity() = default;
virtual TPathId GetChangeSenderIdentity() const = 0;
};

template <typename TChangeRecord>
class TBaseChangeSender {
using TIncompleteRecord = TEvChangeExchange::TEvRequestRecords::TRecordInfo;
Expand Down Expand Up @@ -470,8 +476,8 @@ class TBaseChangeSender {

void EnqueueRecords(TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records) {
for (auto& record : records) {
Y_VERIFY_S(PathId == record.PathId, "Unexpected record's path id"
<< ": expected# " << PathId
Y_VERIFY_S(Identity->GetChangeSenderIdentity() == record.PathId, "Unexpected record's path id"
<< ": expected# " << Identity->GetChangeSenderIdentity()
<< ", got# " << record.PathId);
Enqueued.emplace(record.Order, record.BodySize);
}
Expand Down Expand Up @@ -561,15 +567,15 @@ class TBaseChangeSender {

explicit TBaseChangeSender(
IActorOps* const actorOps,
IChangeSenderIdentity* const identity,
IChangeSenderResolver* const resolver,
ISenderFactory* const senderFactory,
const TActorId changeServer,
const TPathId& pathId)
const TActorId changeServer)
: ActorOps(actorOps)
, Identity(identity)
, Resolver(resolver)
, SenderFactory(senderFactory)
, ChangeServer(changeServer)
, PathId(pathId)
, MemLimit(192_KB)
, MemUsage(0)
{}
Expand All @@ -585,7 +591,7 @@ class TBaseChangeSender {
ctx.Send(ev->Forward(to));
} else {
ActorOps->Send(ev->Sender, new NMon::TEvRemoteHttpInfoRes(TStringBuilder()
<< "Change sender '" << PathId << ":" << partitionId << "' is not running"));
<< "Change sender '" << Identity->GetChangeSenderIdentity() << ":" << partitionId << "' is not running"));
}
} else {
ActorOps->Send(ev->Sender, new NMon::TEvRemoteBinaryInfoRes(NMonitoring::HTTPNOTFOUND));
Expand Down Expand Up @@ -635,7 +641,7 @@ class TBaseChangeSender {
TABLED() { html << sender.Pending.size(); }
TABLED() { html << sender.Prepared.size(); }
TABLED() { html << sender.Broadcasting.size(); }
TABLED() { ActorLink(html, tabletId, PathId, partitionId); }
TABLED() { ActorLink(html, tabletId, Identity->GetChangeSenderIdentity(), partitionId); }
}
}
}
Expand Down Expand Up @@ -763,13 +769,13 @@ class TBaseChangeSender {

private:
IActorOps* const ActorOps;
IChangeSenderIdentity* const Identity;
IChangeSenderResolver* const Resolver;
ISenderFactory* const SenderFactory;
THolder<IChangeSenderPartitioner<TChangeRecord>> Partitioner;

protected:
TActorId ChangeServer;
const TPathId PathId;

private:
const ui64 MemLimit;
Expand Down
16 changes: 11 additions & 5 deletions ydb/core/tx/datashard/change_sender_async_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ class TAsyncIndexChangeSenderShard: public TActorBootstrapped<TAsyncIndexChangeS

class TAsyncIndexChangeSenderMain
: public TActorBootstrapped<TAsyncIndexChangeSenderMain>
, public NChangeExchange::IChangeSenderIdentity
, public NChangeExchange::TBaseChangeSender<TChangeRecord>
, public NChangeExchange::IChangeSenderResolver
, public NChangeExchange::ISenderFactory
Expand Down Expand Up @@ -502,7 +503,7 @@ class TAsyncIndexChangeSenderMain

void ResolveIndex() {
auto request = MakeHolder<TNavigate>();
request->ResultSet.emplace_back(MakeNavigateEntry(PathId, TNavigate::OpList));
request->ResultSet.emplace_back(MakeNavigateEntry(IndexPathId, TNavigate::OpList));

Send(MakeSchemeCacheID(), new TEvNavigate(request.Release()));
Become(&TThis::StateResolveIndex);
Expand Down Expand Up @@ -533,7 +534,7 @@ class TAsyncIndexChangeSenderMain

const auto& entry = result->ResultSet.at(0);

if (!CheckTableId(entry, PathId)) {
if (!CheckTableId(entry, IndexPathId)) {
return;
}

Expand Down Expand Up @@ -746,7 +747,7 @@ class TAsyncIndexChangeSenderMain

void Handle(TEvChangeExchange::TEvRemoveSender::TPtr& ev) {
LOG_D("Handle " << ev->Get()->ToString());
Y_ABORT_UNLESS(ev->Get()->PathId == PathId);
Y_ABORT_UNLESS(ev->Get()->PathId == GetChangeSenderIdentity());

RemoveRecords();
PassAway();
Expand All @@ -773,7 +774,8 @@ class TAsyncIndexChangeSenderMain

explicit TAsyncIndexChangeSenderMain(const TDataShardId& dataShard, const TTableId& userTableId, const TPathId& indexPathId)
: TActorBootstrapped()
, TBaseChangeSender(this, this, this, dataShard.ActorId, indexPathId)
, TBaseChangeSender(this, this, this, this, dataShard.ActorId)
, IndexPathId(indexPathId)
, DataShard(dataShard)
, UserTableId(userTableId)
, IndexTableVersion(0)
Expand Down Expand Up @@ -806,7 +808,12 @@ class TAsyncIndexChangeSenderMain
}
}

TPathId GetChangeSenderIdentity() const override final {
return IndexPathId;
}

private:
const TPathId IndexPathId;
const TDataShardId DataShard;
const TTableId UserTableId;
mutable TMaybe<TString> LogPrefix;
Expand All @@ -817,7 +824,6 @@ class TAsyncIndexChangeSenderMain
TPathId IndexTablePathId;
ui64 IndexTableVersion;
THolder<TKeyDesc> KeyDesc;

}; // TAsyncIndexChangeSenderMain

IActor* CreateAsyncIndexChangeSender(const TDataShardId& dataShard, const TTableId& userTableId, const TPathId& indexPathId) {
Expand Down
15 changes: 11 additions & 4 deletions ydb/core/tx/datashard/change_sender_cdc_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ class TBoundaryPartitioner final : public NChangeExchange::IChangeSenderPartitio

class TCdcChangeSenderMain
: public TActorBootstrapped<TCdcChangeSenderMain>
, public NChangeExchange::IChangeSenderIdentity
, public NChangeExchange::TBaseChangeSender<TChangeRecord>
, public NChangeExchange::IChangeSenderResolver
, public NChangeExchange::ISenderFactory
Expand Down Expand Up @@ -474,7 +475,7 @@ class TCdcChangeSenderMain

void ResolveCdcStream() {
auto request = MakeHolder<TNavigate>();
request->ResultSet.emplace_back(MakeNavigateEntry(PathId, TNavigate::OpList));
request->ResultSet.emplace_back(MakeNavigateEntry(StreamPathId, TNavigate::OpList));

Send(MakeSchemeCacheID(), new TEvNavigate(request.Release()));
Become(&TThis::StateResolveCdcStream);
Expand Down Expand Up @@ -505,7 +506,7 @@ class TCdcChangeSenderMain

const auto& entry = result->ResultSet.at(0);

if (!CheckTableId(entry, PathId)) {
if (!CheckTableId(entry, StreamPathId)) {
return;
}

Expand Down Expand Up @@ -718,7 +719,7 @@ class TCdcChangeSenderMain

void Handle(TEvChangeExchange::TEvRemoveSender::TPtr& ev) {
LOG_D("Handle " << ev->Get()->ToString());
Y_ABORT_UNLESS(ev->Get()->PathId == PathId);
Y_ABORT_UNLESS(ev->Get()->PathId == GetChangeSenderIdentity());

RemoveRecords();
PassAway();
Expand All @@ -745,7 +746,8 @@ class TCdcChangeSenderMain

explicit TCdcChangeSenderMain(const TDataShardId& dataShard, const TPathId& streamPathId)
: TActorBootstrapped()
, TBaseChangeSender(this, this, this, dataShard.ActorId, streamPathId)
, TBaseChangeSender(this, this, this, this, dataShard.ActorId)
, StreamPathId(streamPathId)
, DataShard(dataShard)
, TopicVersion(0)
{
Expand Down Expand Up @@ -777,7 +779,12 @@ class TCdcChangeSenderMain
}
}

TPathId GetChangeSenderIdentity() const override final {
return StreamPathId;
}

private:
const TPathId StreamPathId;
const TDataShardId DataShard;
mutable TMaybe<TString> LogPrefix;

Expand Down
21 changes: 14 additions & 7 deletions ydb/core/tx/replication/service/table_writer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ class TTablePartitionWriter: public TActorBootstrapped<TTablePartitionWriter<TCh
template <typename TChangeRecord>
class TLocalTableWriter
: public TActor<TLocalTableWriter<TChangeRecord>>
, public NChangeExchange::IChangeSenderIdentity
, public NChangeExchange::TBaseChangeSender<TChangeRecord>
, public NChangeExchange::IChangeSenderResolver
, public NChangeExchange::ISenderFactory
Expand All @@ -227,7 +228,7 @@ class TLocalTableWriter
if (!LogPrefix) {
LogPrefix = TStringBuilder()
<< "[LocalTableWriter]"
<< this->PathId
<< TablePathId
<< TBase::SelfId() << " ";
}

Expand Down Expand Up @@ -318,7 +319,7 @@ class TLocalTableWriter
Resolving = true;

auto request = MakeHolder<TNavigate>();
request->ResultSet.emplace_back(MakeNavigateEntry(this->PathId, TNavigate::OpTable));
request->ResultSet.emplace_back(MakeNavigateEntry(TablePathId, TNavigate::OpTable));
this->Send(MakeSchemeCacheID(), new TEvNavigate(request.Release()));
}

Expand All @@ -338,7 +339,7 @@ class TLocalTableWriter

const auto& entry = result->ResultSet.at(0);

if (!CheckTableId(entry, this->PathId)) {
if (!CheckTableId(entry, TablePathId)) {
return;
}

Expand Down Expand Up @@ -407,7 +408,7 @@ class TLocalTableWriter

auto& entry = result->ResultSet.at(0);

if (!CheckTableId(entry, this->PathId)) {
if (!CheckTableId(entry, TablePathId)) {
return;
}

Expand Down Expand Up @@ -437,7 +438,7 @@ class TLocalTableWriter
return new TTablePartitionWriter<TChangeRecord>(
this->SelfId(),
partitionId,
TTableId(this->PathId, Schema->Version),
TTableId(TablePathId, Schema->Version),
BuilderContext);
}

Expand All @@ -448,7 +449,7 @@ class TLocalTableWriter
TVector<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo> records(::Reserve(ev->Get()->Records.size()));

for (auto& record : ev->Get()->Records) {
records.emplace_back(record.Offset, this->PathId, record.Data.size());
records.emplace_back(record.Offset, TablePathId, record.Data.size());
auto res = PendingRecords.emplace(record.Offset, TChangeRecordBuilderTrait<TChangeRecord>()
.WithSourceId(ev->Get()->Source)
.WithOrder(record.Offset)
Expand Down Expand Up @@ -528,11 +529,16 @@ class TLocalTableWriter
template <class... TArgs>
explicit TLocalTableWriter(const TPathId& tablePathId, TArgs&&... args)
: TBase(&TThis::StateWork)
, TBaseSender(this, this, this, TActorId(), tablePathId)
, TBaseSender(this, this, this, this, TActorId())
, TablePathId(tablePathId)
, BuilderContext(std::forward<TArgs>(args)...)
{
}

TPathId GetChangeSenderIdentity() const override final {
return TablePathId;
}

STFUNC(StateWork) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvWorker::TEvHandshake, Handle);
Expand All @@ -550,6 +556,7 @@ class TLocalTableWriter

private:
mutable TMaybe<TString> LogPrefix;
const TPathId TablePathId;
TChangeRecordBuilderContextTrait<TChangeRecord> BuilderContext;

TActorId Worker;
Expand Down

0 comments on commit 21d6fd5

Please sign in to comment.