Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make change sender identity explicit #8995

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions ydb/core/change_exchange/change_sender_common_ops.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ class ISenderFactory {
virtual IActor* CreateSender(ui64 partitionId) const = 0;
};

class IChangeSenderIdentity {
public:
virtual ~IChangeSenderIdentity() = default;
virtual TPathId GetChangeSenderIdentity() const = 0;
};

template <typename TChangeRecord>
class TBaseChangeSender {
using TIncompleteRecord = TEvChangeExchange::TEvRequestRecords::TRecordInfo;
Expand Down Expand Up @@ -470,8 +476,8 @@ class TBaseChangeSender {

void EnqueueRecords(TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records) {
for (auto& record : records) {
Y_VERIFY_S(PathId == record.PathId, "Unexpected record's path id"
<< ": expected# " << PathId
Y_VERIFY_S(Identity->GetChangeSenderIdentity() == record.PathId, "Unexpected record's path id"
<< ": expected# " << Identity->GetChangeSenderIdentity()
<< ", got# " << record.PathId);
Enqueued.emplace(record.Order, record.BodySize);
}
Expand Down Expand Up @@ -561,15 +567,15 @@ class TBaseChangeSender {

explicit TBaseChangeSender(
IActorOps* const actorOps,
IChangeSenderIdentity* const identity,
IChangeSenderResolver* const resolver,
ISenderFactory* const senderFactory,
const TActorId changeServer,
const TPathId& pathId)
const TActorId changeServer)
: ActorOps(actorOps)
, Identity(identity)
, Resolver(resolver)
, SenderFactory(senderFactory)
, ChangeServer(changeServer)
, PathId(pathId)
, MemLimit(192_KB)
, MemUsage(0)
{}
Expand All @@ -585,7 +591,7 @@ class TBaseChangeSender {
ctx.Send(ev->Forward(to));
} else {
ActorOps->Send(ev->Sender, new NMon::TEvRemoteHttpInfoRes(TStringBuilder()
<< "Change sender '" << PathId << ":" << partitionId << "' is not running"));
<< "Change sender '" << Identity->GetChangeSenderIdentity() << ":" << partitionId << "' is not running"));
}
} else {
ActorOps->Send(ev->Sender, new NMon::TEvRemoteBinaryInfoRes(NMonitoring::HTTPNOTFOUND));
Expand Down Expand Up @@ -635,7 +641,7 @@ class TBaseChangeSender {
TABLED() { html << sender.Pending.size(); }
TABLED() { html << sender.Prepared.size(); }
TABLED() { html << sender.Broadcasting.size(); }
TABLED() { ActorLink(html, tabletId, PathId, partitionId); }
TABLED() { ActorLink(html, tabletId, Identity->GetChangeSenderIdentity(), partitionId); }
}
}
}
Expand Down Expand Up @@ -763,13 +769,13 @@ class TBaseChangeSender {

private:
IActorOps* const ActorOps;
IChangeSenderIdentity* const Identity;
IChangeSenderResolver* const Resolver;
ISenderFactory* const SenderFactory;
THolder<IChangeSenderPartitioner<TChangeRecord>> Partitioner;

protected:
TActorId ChangeServer;
const TPathId PathId;

private:
const ui64 MemLimit;
Expand Down
16 changes: 11 additions & 5 deletions ydb/core/tx/datashard/change_sender_async_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ class TAsyncIndexChangeSenderShard: public TActorBootstrapped<TAsyncIndexChangeS

class TAsyncIndexChangeSenderMain
: public TActorBootstrapped<TAsyncIndexChangeSenderMain>
, public NChangeExchange::IChangeSenderIdentity
, public NChangeExchange::TBaseChangeSender<TChangeRecord>
Enjection marked this conversation as resolved.
Show resolved Hide resolved
, public NChangeExchange::IChangeSenderResolver
, public NChangeExchange::ISenderFactory
Expand Down Expand Up @@ -502,7 +503,7 @@ class TAsyncIndexChangeSenderMain

void ResolveIndex() {
auto request = MakeHolder<TNavigate>();
request->ResultSet.emplace_back(MakeNavigateEntry(PathId, TNavigate::OpList));
request->ResultSet.emplace_back(MakeNavigateEntry(IndexPathId, TNavigate::OpList));

Send(MakeSchemeCacheID(), new TEvNavigate(request.Release()));
Become(&TThis::StateResolveIndex);
Expand Down Expand Up @@ -533,7 +534,7 @@ class TAsyncIndexChangeSenderMain

const auto& entry = result->ResultSet.at(0);

if (!CheckTableId(entry, PathId)) {
if (!CheckTableId(entry, IndexPathId)) {
return;
}

Expand Down Expand Up @@ -746,7 +747,7 @@ class TAsyncIndexChangeSenderMain

void Handle(TEvChangeExchange::TEvRemoveSender::TPtr& ev) {
LOG_D("Handle " << ev->Get()->ToString());
Y_ABORT_UNLESS(ev->Get()->PathId == PathId);
Y_ABORT_UNLESS(ev->Get()->PathId == GetChangeSenderIdentity());
Enjection marked this conversation as resolved.
Show resolved Hide resolved

RemoveRecords();
PassAway();
Expand All @@ -773,7 +774,8 @@ class TAsyncIndexChangeSenderMain

explicit TAsyncIndexChangeSenderMain(const TDataShardId& dataShard, const TTableId& userTableId, const TPathId& indexPathId)
: TActorBootstrapped()
, TBaseChangeSender(this, this, this, dataShard.ActorId, indexPathId)
, TBaseChangeSender(this, this, this, this, dataShard.ActorId)
, IndexPathId(indexPathId)
, DataShard(dataShard)
, UserTableId(userTableId)
, IndexTableVersion(0)
Expand Down Expand Up @@ -806,7 +808,12 @@ class TAsyncIndexChangeSenderMain
}
}

TPathId GetChangeSenderIdentity() const override final {
return IndexPathId;
}

private:
const TPathId IndexPathId;
const TDataShardId DataShard;
const TTableId UserTableId;
mutable TMaybe<TString> LogPrefix;
Expand All @@ -817,7 +824,6 @@ class TAsyncIndexChangeSenderMain
TPathId IndexTablePathId;
ui64 IndexTableVersion;
THolder<TKeyDesc> KeyDesc;

}; // TAsyncIndexChangeSenderMain

IActor* CreateAsyncIndexChangeSender(const TDataShardId& dataShard, const TTableId& userTableId, const TPathId& indexPathId) {
Expand Down
15 changes: 11 additions & 4 deletions ydb/core/tx/datashard/change_sender_cdc_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ class TBoundaryPartitioner final : public NChangeExchange::IChangeSenderPartitio

class TCdcChangeSenderMain
: public TActorBootstrapped<TCdcChangeSenderMain>
, public NChangeExchange::IChangeSenderIdentity
, public NChangeExchange::TBaseChangeSender<TChangeRecord>
Enjection marked this conversation as resolved.
Show resolved Hide resolved
, public NChangeExchange::IChangeSenderResolver
, public NChangeExchange::ISenderFactory
Expand Down Expand Up @@ -474,7 +475,7 @@ class TCdcChangeSenderMain

void ResolveCdcStream() {
auto request = MakeHolder<TNavigate>();
request->ResultSet.emplace_back(MakeNavigateEntry(PathId, TNavigate::OpList));
request->ResultSet.emplace_back(MakeNavigateEntry(StreamPathId, TNavigate::OpList));

Send(MakeSchemeCacheID(), new TEvNavigate(request.Release()));
Become(&TThis::StateResolveCdcStream);
Expand Down Expand Up @@ -505,7 +506,7 @@ class TCdcChangeSenderMain

const auto& entry = result->ResultSet.at(0);

if (!CheckTableId(entry, PathId)) {
if (!CheckTableId(entry, StreamPathId)) {
return;
}

Expand Down Expand Up @@ -718,7 +719,7 @@ class TCdcChangeSenderMain

void Handle(TEvChangeExchange::TEvRemoveSender::TPtr& ev) {
LOG_D("Handle " << ev->Get()->ToString());
Y_ABORT_UNLESS(ev->Get()->PathId == PathId);
Y_ABORT_UNLESS(ev->Get()->PathId == GetChangeSenderIdentity());
Enjection marked this conversation as resolved.
Show resolved Hide resolved

RemoveRecords();
PassAway();
Expand All @@ -745,7 +746,8 @@ class TCdcChangeSenderMain

explicit TCdcChangeSenderMain(const TDataShardId& dataShard, const TPathId& streamPathId)
: TActorBootstrapped()
, TBaseChangeSender(this, this, this, dataShard.ActorId, streamPathId)
, TBaseChangeSender(this, this, this, this, dataShard.ActorId)
, StreamPathId(streamPathId)
, DataShard(dataShard)
, TopicVersion(0)
{
Expand Down Expand Up @@ -777,7 +779,12 @@ class TCdcChangeSenderMain
}
}

TPathId GetChangeSenderIdentity() const override final {
return StreamPathId;
}

private:
const TPathId StreamPathId;
const TDataShardId DataShard;
mutable TMaybe<TString> LogPrefix;

Expand Down
21 changes: 14 additions & 7 deletions ydb/core/tx/replication/service/table_writer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ class TTablePartitionWriter: public TActorBootstrapped<TTablePartitionWriter<TCh
template <typename TChangeRecord>
class TLocalTableWriter
: public TActor<TLocalTableWriter<TChangeRecord>>
, public NChangeExchange::IChangeSenderIdentity
, public NChangeExchange::TBaseChangeSender<TChangeRecord>
Enjection marked this conversation as resolved.
Show resolved Hide resolved
, public NChangeExchange::IChangeSenderResolver
, public NChangeExchange::ISenderFactory
Expand All @@ -227,7 +228,7 @@ class TLocalTableWriter
if (!LogPrefix) {
LogPrefix = TStringBuilder()
<< "[LocalTableWriter]"
<< this->PathId
<< TablePathId
<< TBase::SelfId() << " ";
}

Expand Down Expand Up @@ -318,7 +319,7 @@ class TLocalTableWriter
Resolving = true;

auto request = MakeHolder<TNavigate>();
request->ResultSet.emplace_back(MakeNavigateEntry(this->PathId, TNavigate::OpTable));
request->ResultSet.emplace_back(MakeNavigateEntry(TablePathId, TNavigate::OpTable));
this->Send(MakeSchemeCacheID(), new TEvNavigate(request.Release()));
}

Expand All @@ -338,7 +339,7 @@ class TLocalTableWriter

const auto& entry = result->ResultSet.at(0);

if (!CheckTableId(entry, this->PathId)) {
if (!CheckTableId(entry, TablePathId)) {
return;
}

Expand Down Expand Up @@ -407,7 +408,7 @@ class TLocalTableWriter

auto& entry = result->ResultSet.at(0);

if (!CheckTableId(entry, this->PathId)) {
if (!CheckTableId(entry, TablePathId)) {
return;
}

Expand Down Expand Up @@ -437,7 +438,7 @@ class TLocalTableWriter
return new TTablePartitionWriter<TChangeRecord>(
this->SelfId(),
partitionId,
TTableId(this->PathId, Schema->Version),
TTableId(TablePathId, Schema->Version),
BuilderContext);
}

Expand All @@ -448,7 +449,7 @@ class TLocalTableWriter
TVector<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo> records(::Reserve(ev->Get()->Records.size()));

for (auto& record : ev->Get()->Records) {
records.emplace_back(record.Offset, this->PathId, record.Data.size());
records.emplace_back(record.Offset, TablePathId, record.Data.size());
auto res = PendingRecords.emplace(record.Offset, TChangeRecordBuilderTrait<TChangeRecord>()
.WithSourceId(ev->Get()->Source)
.WithOrder(record.Offset)
Expand Down Expand Up @@ -528,11 +529,16 @@ class TLocalTableWriter
template <class... TArgs>
explicit TLocalTableWriter(const TPathId& tablePathId, TArgs&&... args)
: TBase(&TThis::StateWork)
, TBaseSender(this, this, this, TActorId(), tablePathId)
, TBaseSender(this, this, this, this, TActorId())
, TablePathId(tablePathId)
, BuilderContext(std::forward<TArgs>(args)...)
{
}

TPathId GetChangeSenderIdentity() const override final {
return TablePathId;
}

STFUNC(StateWork) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvWorker::TEvHandshake, Handle);
Expand All @@ -550,6 +556,7 @@ class TLocalTableWriter

private:
mutable TMaybe<TString> LogPrefix;
const TPathId TablePathId;
TChangeRecordBuilderContextTrait<TChangeRecord> BuilderContext;

TActorId Worker;
Expand Down
Loading