Skip to content

Commit

Permalink
Refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
FloatingCrowbar committed Feb 9, 2024
1 parent 8b52a2f commit bb8dce6
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 58 deletions.
2 changes: 1 addition & 1 deletion ydb/core/persqueue/partition_sourcemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ std::optional<ui64> TPartitionSourceManager::TSourceManager::UpdatedSeqNo() cons

void TPartitionSourceManager::TSourceManager::Update(ui64 seqNo, ui64 offset, TInstant timestamp) {
if (InMemory == MemoryStorage().end()) {
Batch.SourceIdWriter.RegisterSourceId(SourceId, seqNo, seqNo, offset, timestamp);
Batch.SourceIdWriter.RegisterSourceId(SourceId, seqNo, offset, timestamp);
} else {
Batch.SourceIdWriter.RegisterSourceId(SourceId, InMemory->second.Updated(seqNo, offset, timestamp));
}
Expand Down
10 changes: 5 additions & 5 deletions ydb/core/persqueue/partition_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) {
if (it == SourceIdStorage.GetInMemorySourceIds().end()) {
Y_ABORT_UNLESS(!writeResponse.Msg.HeartbeatVersion);
TabletCounters.Cumulative()[COUNTER_PQ_SID_CREATED].Increment(1);
SourceIdStorage.RegisterSourceId(s, seqNo, 0, offset, CurrentTimestamp);
SourceIdStorage.RegisterSourceId(s, seqNo, offset, CurrentTimestamp);
} else if (const auto& hbVersion = writeResponse.Msg.HeartbeatVersion) {
SourceIdStorage.RegisterSourceId(s, it->second.Updated(
seqNo, offset, CurrentTimestamp, THeartbeat{*hbVersion, writeResponse.Msg.Data}
Expand Down Expand Up @@ -378,7 +378,7 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) {
}

Y_ABORT_UNLESS(body.AssignedOffset);
SourceIdStorage.RegisterSourceId(body.SourceId, body.SeqNo, 0, *body.AssignedOffset, CurrentTimestamp, std::move(keyRange));
SourceIdStorage.RegisterSourceId(body.SourceId, body.SeqNo, *body.AssignedOffset, CurrentTimestamp, std::move(keyRange));
ReplyOk(ctx, response.GetCookie());
} else if (response.IsDeregisterMessageGroup()) {
const auto& body = response.GetDeregisterMessageGroup().Body;
Expand All @@ -399,7 +399,7 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) {
}

Y_ABORT_UNLESS(body.AssignedOffset);
SourceIdStorage.RegisterSourceId(body.SourceId, body.SeqNo, 0, *body.AssignedOffset, CurrentTimestamp, std::move(keyRange), true);
SourceIdStorage.RegisterSourceId(body.SourceId, body.SeqNo, *body.AssignedOffset, CurrentTimestamp, std::move(keyRange), true);
}

ReplyOk(ctx, response.GetCookie());
Expand Down Expand Up @@ -836,7 +836,7 @@ TPartition::ProcessResult TPartition::ProcessRequest(TRegisterMessageGroupMsg& m
}

body.AssignedOffset = parameters.CurOffset;
parameters.SourceIdBatch.RegisterSourceId(body.SourceId, body.SeqNo, 0, parameters.CurOffset, CurrentTimestamp, std::move(keyRange));
parameters.SourceIdBatch.RegisterSourceId(body.SourceId, body.SeqNo, parameters.CurOffset, CurrentTimestamp, std::move(keyRange));

return ProcessResult::Continue;
}
Expand All @@ -859,7 +859,7 @@ TPartition::ProcessResult TPartition::ProcessRequest(TSplitMessageGroupMsg& msg,
}

body.AssignedOffset = parameters.CurOffset;
parameters.SourceIdBatch.RegisterSourceId(body.SourceId, body.SeqNo, 0, parameters.CurOffset, CurrentTimestamp, std::move(keyRange), true);
parameters.SourceIdBatch.RegisterSourceId(body.SourceId, body.SeqNo, parameters.CurOffset, CurrentTimestamp, std::move(keyRange), true);
}

return ProcessResult::Continue;
Expand Down
40 changes: 20 additions & 20 deletions ydb/core/persqueue/pq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,11 @@ class TReadProxy : public TActorBootstrapped<TReadProxy> {
PreparedResponse = std::make_shared<NKikimrClient::TResponse>();
}
}

auto& responseRecord = isDirectRead ? *PreparedResponse : Response->Record;
responseRecord.SetStatus(NMsgBusProxy::MSTATUS_OK);
responseRecord.SetErrorCode(NPersQueue::NErrorCode::OK);
responseRecord.SetErrorCode(NPersQueue::NErrorCode::OK);

Y_ABORT_UNLESS(readResult.ResultSize() > 0);
bool isStart = false;
if (!responseRecord.HasPartitionResponse()) {
Expand Down Expand Up @@ -191,7 +191,7 @@ class TReadProxy : public TActorBootstrapped<TReadProxy> {
}

if (isNewMsg) {
if (!isStart && readResult.GetResult(i).HasTotalParts()
if (!isStart && readResult.GetResult(i).HasTotalParts()
&& readResult.GetResult(i).GetTotalParts() + i > readResult.ResultSize()) //last blob is not full
break;
partResp->AddResult()->CopyFrom(readResult.GetResult(i));
Expand Down Expand Up @@ -292,7 +292,7 @@ class TReadProxy : public TActorBootstrapped<TReadProxy> {
};


TActorId CreateReadProxy(const TActorId& sender, const TActorId& tablet, ui32 tabletGeneration,
TActorId CreateReadProxy(const TActorId& sender, const TActorId& tablet, ui32 tabletGeneration,
const TDirectReadKey& directReadKey, const NKikimrClient::TPersQueueRequest& request,
const TActorContext& ctx)
{
Expand All @@ -304,7 +304,7 @@ class TResponseBuilder {
public:

TResponseBuilder(const TActorId& sender, const TActorId& tablet, const TString& topicName, const ui32 partition, const ui64 messageNo,
const TString& reqId, const TMaybe<ui64> cookie, NMetrics::TResourceMetrics* resourceMetrics,
const TString& reqId, const TMaybe<ui64> cookie, NMetrics::TResourceMetrics* resourceMetrics,
const TActorContext& ctx)
: Sender(sender)
, Tablet(tablet)
Expand Down Expand Up @@ -639,7 +639,7 @@ struct TPersQueue::TReplyToActor {
Event(std::move(event))
{
}

TActorId ActorId;
TEventBasePtr Event;
};
Expand Down Expand Up @@ -840,7 +840,7 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult&
ctx.Send(ctx.SelfID, new TEvents::TEvPoisonPill());
return;
}

Y_ABORT_UNLESS(readRange.HasStatus());
if (readRange.GetStatus() != NKikimrProto::OK && readRange.GetStatus() != NKikimrProto::NODATA) {
LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE,
Expand Down Expand Up @@ -1267,7 +1267,7 @@ void TPersQueue::FinishResponse(THashMap<ui64, TAutoPtr<TResponseBuilder>>::iter


void TPersQueue::Handle(TEvPersQueue::TEvUpdateConfig::TPtr& ev, const TActorContext& ctx)
{
{
if (!ConfigInited) {
UpdateConfigRequests.emplace_back(ev->Release(), ev->Sender);
return;
Expand Down Expand Up @@ -1304,7 +1304,7 @@ void TPersQueue::TrySendUpdateConfigResponses(const TActorContext& ctx)

ChangeConfigNotification.clear();
}

void TPersQueue::CreateTopicConverter(const NKikimrPQ::TPQTabletConfig& config,
NPersQueue::TConverterFactoryPtr& converterFactory,
NPersQueue::TTopicConverterPtr& topicConverter,
Expand Down Expand Up @@ -1500,7 +1500,7 @@ void TPersQueue::AddCmdWriteConfig(TEvKeyValue::TEvRequest* request,
keyRange = TPartitionKeyRange::Parse(mg.GetKeyRange());
}

sourceIdWriter.RegisterSourceId(mg.GetId(), 0, 0, 0, ctx.Now(), std::move(keyRange));
sourceIdWriter.RegisterSourceId(mg.GetId(), 0, 0, ctx.Now(), std::move(keyRange));
}

for (const auto& partition : cfg.GetPartitions()) {
Expand Down Expand Up @@ -2109,7 +2109,7 @@ void TPersQueue::HandleReadRequest(
ReplyError(ctx, responseCookie, NPersQueue::NErrorCode::READ_ERROR_NO_SESSION,
TStringBuilder() << "Read prepare request with unknown(old?) session id " << cmd.GetSessionId());
return;
}
}
}

THolder<TEvPQ::TEvRead> event =
Expand Down Expand Up @@ -2375,7 +2375,7 @@ void TPersQueue::Handle(TEvPersQueue::TEvRequest::TPtr& ev, const TActorContext&
}
ResponseProxy[responseCookie] = ans;
Counters->Simple()[COUNTER_PQ_TABLET_INFLIGHT].Set(ResponseProxy.size());

if (!ConfigInited) {
ReplyError(ctx, responseCookie, NPersQueue::NErrorCode::INITIALIZING, "tablet is not ready");
return;
Expand All @@ -2396,11 +2396,11 @@ void TPersQueue::Handle(TEvPersQueue::TEvRequest::TPtr& ev, const TActorContext&
ReplyError(ctx, responseCookie, NPersQueue::NErrorCode::BAD_REQUEST, "no partition number");
return;
}

TPartitionId partition(req.GetPartition());
auto it = Partitions.find(partition);

LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " got client message batch for topic '"
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " got client message batch for topic '"
<< (TopicConverter ? TopicConverter->GetClientsideName() : "Undefined") << "' partition " << partition);

if (it == Partitions.end()) {
Expand Down Expand Up @@ -2859,7 +2859,7 @@ void TPersQueue::Handle(TEvTxProcessing::TEvReadSet::TPtr& ev, const TActorConte

std::unique_ptr<TEvTxProcessing::TEvReadSetAck> ack;
if (!(event.GetFlags() & NKikimrTx::TEvReadSet::FLAG_NO_ACK)) {
ack = std::make_unique<TEvTxProcessing::TEvReadSetAck>(*ev->Get(), TabletID());
ack = std::make_unique<TEvTxProcessing::TEvReadSetAck>(*ev->Get(), TabletID());
}

if (auto tx = GetTransaction(ctx, event.GetTxId()); tx && tx->Senders.contains(event.GetTabletProducer())) {
Expand Down Expand Up @@ -2927,7 +2927,7 @@ void TPersQueue::Handle(TEvPQ::TEvTxCalcPredicateResult::TPtr& ev, const TActorC
void TPersQueue::Handle(TEvPQ::TEvProposePartitionConfigResult::TPtr& ev, const TActorContext& ctx)
{
const TEvPQ::TEvProposePartitionConfigResult& event = *ev->Get();

auto tx = GetTransaction(ctx, event.TxId);
if (!tx) {
return;
Expand Down Expand Up @@ -3582,7 +3582,7 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
Y_ABORT_UNLESS(tx.PartitionRepliesCount <= tx.PartitionRepliesExpected);

PQ_LOG_T("TxId="<< tx.TxId << ", State=EXECUTING" <<
", tx.PartitionRepliesCount=" << tx.PartitionRepliesCount <<
", tx.PartitionRepliesCount=" << tx.PartitionRepliesCount <<
", tx.PartitionRepliesExpected=" << tx.PartitionRepliesExpected);
if (tx.PartitionRepliesCount == tx.PartitionRepliesExpected) {
Y_ABORT_UNLESS(!TxQueue.empty());
Expand Down Expand Up @@ -3728,7 +3728,7 @@ TPartition* TPersQueue::CreatePartitionActor(const TPartitionId& partitionId,
const TActorContext& ctx)
{
int channels = Info()->Channels.size() - NKeyValue::BLOB_CHANNEL; // channels 0,1 are reserved in tablet
Y_ABORT_UNLESS(channels > 0);
Y_ABORT_UNLESS(channels > 0);

return new TPartition(TabletID(),
partitionId,
Expand Down Expand Up @@ -3793,7 +3793,7 @@ void TPersQueue::EnsurePartitionsAreNotDeleted(const NKikimrPQ::TPQTabletConfig&
Y_VERIFY_S(was.contains(partition.GetPartitionId()), "New config is bad, missing partition " << partition.GetPartitionId());
}
}

void TPersQueue::InitTransactions(const NKikimrClient::TKeyValueResponse::TReadRangeResult& readRange,
THashMap<ui32, TVector<TTransaction>>& partitionTxs)
{
Expand Down
14 changes: 7 additions & 7 deletions ydb/core/persqueue/sourceid.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,28 +101,28 @@ void THeartbeatProcessor::ForgetSourceId(const TString& sourceId) {
SourceIdsWithHeartbeat.erase(sourceId);
}

TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant createTs)
TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs)
: SeqNo(seqNo)
, MinSeqNo(minSeqNo)
, MinSeqNo(seqNo)
, Offset(offset)
, WriteTimestamp(createTs)
, CreateTimestamp(createTs)
{
}

TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant createTs, THeartbeat&& heartbeat)
TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, THeartbeat&& heartbeat)
: SeqNo(seqNo)
, MinSeqNo(minSeqNo)
, MinSeqNo(seqNo)
, Offset(offset)
, WriteTimestamp(createTs)
, CreateTimestamp(createTs)
, LastHeartbeat(std::move(heartbeat))
{
}

TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant createTs, TMaybe<TPartitionKeyRange>&& keyRange, bool isInSplit)
TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, TMaybe<TPartitionKeyRange>&& keyRange, bool isInSplit)
: SeqNo(seqNo)
, MinSeqNo(minSeqNo)
, MinSeqNo(seqNo)
, Offset(offset)
, CreateTimestamp(createTs)
, Explicit(true)
Expand All @@ -136,7 +136,7 @@ TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant cr
TSourceIdInfo TSourceIdInfo::Updated(ui64 seqNo, ui64 offset, TInstant writeTs) const {
auto copy = *this;
copy.SeqNo = seqNo;
if (copy.MinSeqNo == 0 || copy.MinSeqNo > seqNo) {
if (copy.MinSeqNo == 0) {
copy.MinSeqNo = seqNo;
}
copy.Offset = offset;
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/persqueue/sourceid.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ struct TSourceIdInfo {
EState State = EState::Registered;

TSourceIdInfo() = default;
TSourceIdInfo(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant createTs);
TSourceIdInfo(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant createTs, THeartbeat&& heartbeat);
TSourceIdInfo(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant createTs, TMaybe<TPartitionKeyRange>&& keyRange, bool isInSplit = false);
TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs);
TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, THeartbeat&& heartbeat);
TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, TMaybe<TPartitionKeyRange>&& keyRange, bool isInSplit = false);

TSourceIdInfo Updated(ui64 seqNo, ui64 offset, TInstant writeTs) const;
TSourceIdInfo Updated(ui64 seqNo, ui64 offset, TInstant writeTs, THeartbeat&& heartbeat) const;
Expand Down
Loading

0 comments on commit bb8dce6

Please sign in to comment.