Skip to content

Commit

Permalink
Optimize heartbeats emission KIKIMR-20392 (#557)
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL authored Dec 19, 2023
1 parent 00eda0e commit c5b9fe2
Show file tree
Hide file tree
Showing 8 changed files with 262 additions and 79 deletions.
1 change: 1 addition & 0 deletions ydb/core/persqueue/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, ui
, NumChannels(numChannels)
, WriteBufferIsFullCounter(nullptr)
, WriteLagMs(TDuration::Minutes(1), 100)
, LastEmittedHeartbeat(TRowVersion::Min())
{
TabletCounters.Populate(Counters);

Expand Down
1 change: 1 addition & 0 deletions ydb/core/persqueue/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
TInstant LastUsedStorageMeterTimestamp;

TDeque<std::unique_ptr<IEventBase>> PendingEvents;
TRowVersion LastEmittedHeartbeat;
};

} // namespace NKikimr::NPQ
Expand Down
11 changes: 3 additions & 8 deletions ydb/core/persqueue/partition_sourcemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ TPartitionSourceManager::TModificationBatch::~TModificationBatch() {
}
}

TMaybe<THeartbeat> TPartitionSourceManager::TModificationBatch::CanEmit() const {
TMaybe<THeartbeat> TPartitionSourceManager::TModificationBatch::CanEmitHeartbeat() const {
return HeartbeatEmitter.CanEmit();
}

Expand Down Expand Up @@ -331,13 +331,8 @@ void TPartitionSourceManager::TSourceManager::Update(ui64 seqNo, ui64 offset, TI
}
}

void TPartitionSourceManager::TSourceManager::Update(ui64 seqNo, ui64 offset, TInstant timestamp, THeartbeat&& heartbeat) {
Batch.HeartbeatEmitter.Process(SourceId, heartbeat);
if (InMemory == MemoryStorage().end()) {
Batch.SourceIdWriter.RegisterSourceId(SourceId, seqNo, offset, timestamp, std::move(heartbeat));
} else {
Batch.SourceIdWriter.RegisterSourceId(SourceId, InMemory->second.Updated(seqNo, offset, timestamp, std::move(heartbeat)));
}
void TPartitionSourceManager::TSourceManager::Update(THeartbeat&& heartbeat) {
Batch.HeartbeatEmitter.Process(SourceId, std::move(heartbeat));
}

TPartitionSourceManager::TSourceManager::operator bool() const {
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/persqueue/partition_sourcemanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class TPartitionSourceManager {
std::optional<ui64> UpdatedSeqNo() const;

void Update(ui64 seqNo, ui64 offset, TInstant timestamp);
void Update(ui64 seqNo, ui64 offset, TInstant timestamp, THeartbeat&& heartbeat);
void Update(THeartbeat&& heartbeat);

operator bool() const;

Expand All @@ -77,7 +77,7 @@ class TPartitionSourceManager {
TModificationBatch(TPartitionSourceManager& manager, ESourceIdFormat format);
~TModificationBatch();

TMaybe<THeartbeat> CanEmit() const;
TMaybe<THeartbeat> CanEmitHeartbeat() const;
TSourceManager GetSource(const TString& id);

void Cancel();
Expand Down
71 changes: 36 additions & 35 deletions ydb/core/persqueue/partition_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -910,13 +910,7 @@ TPartition::ProcessResult TPartition::ProcessRequest(TWriteMsg& p, ProcessParame
<< " version " << *hbVersion
);

auto heartbeat = THeartbeat{
.Version = *hbVersion,
.Data = p.Msg.Data,
};

sourceId.Update(p.Msg.SeqNo, curOffset, CurrentTimestamp, std::move(heartbeat));

sourceId.Update(THeartbeat{*hbVersion, p.Msg.Data});
return ProcessResult::Continue;
}

Expand Down Expand Up @@ -1188,6 +1182,41 @@ bool TPartition::AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const
}
}

if (const auto heartbeat = sourceIdBatch.CanEmitHeartbeat()) {
if (heartbeat->Version > LastEmittedHeartbeat) {
LOG_INFO_S(
ctx, NKikimrServices::PERSQUEUE,
"Topic '" << TopicName() << "' partition " << Partition
<< " emit heartbeat " << heartbeat->Version
);

auto hbMsg = TWriteMsg{Max<ui64>() /* cookie */, Nothing(), TEvPQ::TEvWrite::TMsg{
.SourceId = NSourceIdEncoding::EncodeSimple(ToString(TabletID)),
.SeqNo = 0, // we don't use SeqNo because we disable deduplication
.PartNo = 0,
.TotalParts = 1,
.TotalSize = static_cast<ui32>(heartbeat->Data.size()),
.CreateTimestamp = CurrentTimestamp.MilliSeconds(),
.ReceiveTimestamp = CurrentTimestamp.MilliSeconds(),
.DisableDeduplication = true,
.WriteTimestamp = CurrentTimestamp.MilliSeconds(),
.Data = heartbeat->Data,
.UncompressedSize = 0,
.PartitionKey = {},
.ExplicitHashKey = {},
.External = false,
.IgnoreQuotaDeadline = true,
.HeartbeatVersion = std::nullopt,
}};

WriteInflightSize += heartbeat->Data.size();
auto result = ProcessRequest(hbMsg, parameters, request, ctx);
Y_ABORT_UNLESS(result == ProcessResult::Continue);

LastEmittedHeartbeat = heartbeat->Version;
}
}

UpdateWriteBufferIsFullState(ctx.Now());

if (!NewHead.Batches.empty() && !NewHead.Batches.back().Packed) {
Expand Down Expand Up @@ -1385,34 +1414,6 @@ bool TPartition::ProcessWrites(TEvKeyValue::TEvRequest* request, TInstant now, c
}
}

if (const auto heartbeat = sourceIdBatch.CanEmit()) {
LOG_INFO_S(
ctx, NKikimrServices::PERSQUEUE,
"Topic '" << TopicName() << "' partition " << Partition
<< " emit heartbeat " << heartbeat->Version
);

EmplaceRequest(TWriteMsg{Max<ui64>() /* cookie */, Nothing(), TEvPQ::TEvWrite::TMsg{
.SourceId = NSourceIdEncoding::EncodeSimple(ToString(TabletID)),
.SeqNo = 0, // we don't use SeqNo because we disable deduplication
.PartNo = 0,
.TotalParts = 1,
.TotalSize = static_cast<ui32>(heartbeat->Data.size()),
.CreateTimestamp = CurrentTimestamp.MilliSeconds(),
.ReceiveTimestamp = CurrentTimestamp.MilliSeconds(),
.DisableDeduplication = true,
.WriteTimestamp = CurrentTimestamp.MilliSeconds(),
.Data = heartbeat->Data,
.UncompressedSize = 0,
.PartitionKey = {},
.ExplicitHashKey = {},
.External = false,
.IgnoreQuotaDeadline = true,
.HeartbeatVersion = std::nullopt,
}}, ctx);
WriteInflightSize += heartbeat->Data.size();
}

if (NewHead.PackedSize == 0) { //nothing added to head - just compaction or tmp part blobs writed
if (!sourceIdBatch.HasModifications()) {
return request->Record.CmdWriteSize() > 0
Expand Down
97 changes: 71 additions & 26 deletions ydb/core/persqueue/sourceid.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,6 @@ void FillDelete(ui32 partition, const TString& sourceId, TKeyPrefix::EMark mark,
void FillDelete(ui32 partition, const TString& sourceId, NKikimrClient::TKeyValueRequest::TCmdDeleteRange& cmd) {
FillDelete(partition, sourceId, TKeyPrefix::MarkProtoSourceId, cmd);
}
THeartbeatProcessor::THeartbeatProcessor(
const THashSet<TString>& sourceIdsWithHeartbeat,
const TMap<TRowVersion, THashSet<TString>>& sourceIdsByHeartbeat)
: SourceIdsWithHeartbeat(sourceIdsWithHeartbeat)
, SourceIdsByHeartbeat(sourceIdsByHeartbeat)
{
}

void THeartbeatProcessor::ApplyHeartbeat(const TString& sourceId, const TRowVersion& version) {
SourceIdsWithHeartbeat.insert(sourceId);
Expand Down Expand Up @@ -501,49 +494,101 @@ void TSourceIdWriter::FillRequest(TEvKeyValue::TEvRequest* request, ui32 partiti

/// THeartbeatEmitter
THeartbeatEmitter::THeartbeatEmitter(const TSourceIdStorage& storage)
: THeartbeatProcessor(storage.SourceIdsWithHeartbeat, storage.SourceIdsByHeartbeat)
, Storage(storage)
: Storage(storage)
{
}

void THeartbeatEmitter::Process(const TString& sourceId, const THeartbeat& heartbeat) {
Y_ABORT_UNLESS(Storage.InMemorySourceIds.contains(sourceId));
const auto& sourceIdInfo = Storage.InMemorySourceIds.at(sourceId);
void THeartbeatEmitter::Process(const TString& sourceId, THeartbeat&& heartbeat) {
auto it = Storage.InMemorySourceIds.find(sourceId);
if (it != Storage.InMemorySourceIds.end() && it->second.LastHeartbeat) {
if (heartbeat.Version <= it->second.LastHeartbeat->Version) {
return;
}
}

if (const auto& lastHeartbeat = sourceIdInfo.LastHeartbeat) {
ForgetHeartbeat(sourceId, lastHeartbeat->Version);
if (!Storage.SourceIdsWithHeartbeat.contains(sourceId)) {
NewSourceIdsWithHeartbeat.insert(sourceId);
}

if (LastHeartbeats.contains(sourceId)) {
ForgetHeartbeat(sourceId, LastHeartbeats.at(sourceId).Version);
if (Heartbeats.contains(sourceId)) {
ForgetHeartbeat(sourceId, Heartbeats.at(sourceId).Version);
}

ApplyHeartbeat(sourceId, heartbeat.Version);
LastHeartbeats[sourceId] = heartbeat;
Heartbeats[sourceId] = std::move(heartbeat);
}

TMaybe<THeartbeat> THeartbeatEmitter::CanEmit() const {
if (SourceIdsWithHeartbeat.size() != Storage.ExplicitSourceIds.size()) {
if (Storage.ExplicitSourceIds.size() != (Storage.SourceIdsWithHeartbeat.size() + NewSourceIdsWithHeartbeat.size())) {
return Nothing();
}

if (SourceIdsByHeartbeat.empty()) {
return Nothing();
}

auto it = SourceIdsByHeartbeat.begin();
if (Storage.SourceIdsByHeartbeat.empty() || it->first > Storage.SourceIdsByHeartbeat.begin()->first) {
Y_ABORT_UNLESS(!it->second.empty());
const auto& someSourceId = *it->second.begin();
if (!NewSourceIdsWithHeartbeat.empty()) { // just got quorum
if (!Storage.SourceIdsByHeartbeat.empty() && Storage.SourceIdsByHeartbeat.begin()->first < SourceIdsByHeartbeat.begin()->first) {
return GetFromStorage(Storage.SourceIdsByHeartbeat.begin());
} else {
return GetFromDiff(SourceIdsByHeartbeat.begin());
}
} else if (SourceIdsByHeartbeat.begin()->first > Storage.SourceIdsByHeartbeat.begin()->first) {
auto storage = Storage.SourceIdsByHeartbeat.begin();
auto diff = SourceIdsByHeartbeat.begin();

TMaybe<TRowVersion> newVersion;
while (storage != Storage.SourceIdsByHeartbeat.end()) {
const auto& [version, sourceIds] = *storage;

auto rest = sourceIds.size();
for (const auto& sourceId : sourceIds) {
auto it = Heartbeats.find(sourceId);
if (it != Heartbeats.end() && it->second.Version > version && version <= diff->first) {
--rest;
} else {
break;
}
}

if (LastHeartbeats.contains(someSourceId)) {
return LastHeartbeats.at(someSourceId);
} else if (Storage.InMemorySourceIds.contains(someSourceId)) {
return Storage.InMemorySourceIds.at(someSourceId).LastHeartbeat;
if (!rest) {
if (++storage != Storage.SourceIdsByHeartbeat.end()) {
newVersion = storage->first;
} else {
newVersion = diff->first;
}
} else {
break;
}
}

if (newVersion) {
storage = Storage.SourceIdsByHeartbeat.find(*newVersion);
if (storage != Storage.SourceIdsByHeartbeat.end()) {
return GetFromStorage(storage);
} else {
return GetFromDiff(diff);
}
}
}

return Nothing();
}

TMaybe<THeartbeat> THeartbeatEmitter::GetFromStorage(TSourceIdsByHeartbeat::const_iterator it) const {
Y_ABORT_UNLESS(!it->second.empty());
const auto& someSourceId = *it->second.begin();

Y_ABORT_UNLESS(Storage.InMemorySourceIds.contains(someSourceId));
return Storage.InMemorySourceIds.at(someSourceId).LastHeartbeat;
}

TMaybe<THeartbeat> THeartbeatEmitter::GetFromDiff(TSourceIdsByHeartbeat::const_iterator it) const {
Y_ABORT_UNLESS(!it->second.empty());
const auto& someSourceId = *it->second.begin();

Y_ABORT_UNLESS(Heartbeats.contains(someSourceId));
return Heartbeats.at(someSourceId);
}

}
19 changes: 11 additions & 8 deletions ydb/core/persqueue/sourceid.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,19 +59,17 @@ struct TSourceIdInfo {
}; // TSourceIdInfo

class THeartbeatProcessor {
public:
THeartbeatProcessor() = default;
explicit THeartbeatProcessor(
const THashSet<TString>& sourceIdsWithHeartbeat,
const TMap<TRowVersion, THashSet<TString>>& sourceIdsByHeartbeat);
protected:
using TSourceIdsByHeartbeat = TMap<TRowVersion, THashSet<TString>>;

public:
void ApplyHeartbeat(const TString& sourceId, const TRowVersion& version);
void ForgetHeartbeat(const TString& sourceId, const TRowVersion& version);
void ForgetSourceId(const TString& sourceId);

protected:
THashSet<TString> SourceIdsWithHeartbeat;
TMap<TRowVersion, THashSet<TString>> SourceIdsByHeartbeat;
TSourceIdsByHeartbeat SourceIdsByHeartbeat;

}; // THeartbeatProcessor

Expand Down Expand Up @@ -151,12 +149,17 @@ class THeartbeatEmitter: private THeartbeatProcessor {
public:
explicit THeartbeatEmitter(const TSourceIdStorage& storage);

void Process(const TString& sourceId, const THeartbeat& heartbeat);
void Process(const TString& sourceId, THeartbeat&& heartbeat);
TMaybe<THeartbeat> CanEmit() const;

private:
TMaybe<THeartbeat> GetFromStorage(TSourceIdsByHeartbeat::const_iterator it) const;
TMaybe<THeartbeat> GetFromDiff(TSourceIdsByHeartbeat::const_iterator it) const;

private:
const TSourceIdStorage& Storage;
THashMap<TString, THeartbeat> LastHeartbeats;
THashSet<TString> NewSourceIdsWithHeartbeat;
THashMap<TString, THeartbeat> Heartbeats;

}; // THeartbeatEmitter

Expand Down
Loading

0 comments on commit c5b9fe2

Please sign in to comment.