Skip to content

Commit

Permalink
type TPartitionId (ydb-platform#1472)
Browse files Browse the repository at this point in the history
  • Loading branch information
Alek5andr-Kotov authored Feb 1, 2024
1 parent ec89d22 commit 922d72d
Show file tree
Hide file tree
Showing 36 changed files with 353 additions and 204 deletions.
2 changes: 1 addition & 1 deletion ydb/core/persqueue/account_read_quoter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ TAccountReadQuoter::TAccountReadQuoter(
TActorId recepient,
ui64 tabletId,
const NPersQueue::TTopicConverterPtr& topicConverter,
ui32 partition,
const TPartitionId& partition,
const TString& user,
const TTabletCountersBase& counters
)
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/persqueue/account_read_quoter.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class TAccountReadQuoter : public TActorBootstrapped<TAccountReadQuoter> {
TActorId recepient,
ui64 tabletId,
const NPersQueue::TTopicConverterPtr& topicConverter,
ui32 partition,
const TPartitionId& partition,
const TString& user,
const TTabletCountersBase& counters
);
Expand All @@ -112,7 +112,7 @@ class TAccountReadQuoter : public TActorBootstrapped<TAccountReadQuoter> {
const TActorId Recepient;
const ui64 TabletId;
const NPersQueue::TTopicConverterPtr TopicConverter;
const ui32 Partition;
const TPartitionId Partition;
const TString User;
const TString ConsumerPath;
const ui64 ReadCreditBytes;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/blob.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ TPartitionedBlob::TPartitionedBlob(const TPartitionedBlob& x)
, MaxBlobSize(x.MaxBlobSize)
{}

TPartitionedBlob::TPartitionedBlob(const ui32 partition, const ui64 offset, const TString& sourceId, const ui64 seqNo, const ui16 totalParts,
TPartitionedBlob::TPartitionedBlob(const TPartitionId& partition, const ui64 offset, const TString& sourceId, const ui64 seqNo, const ui16 totalParts,
const ui32 totalSize, THead& head, THead& newHead, bool headCleared, bool needCompactHead, const ui32 maxBlobSize)
: Partition(partition)
, Offset(offset)
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/persqueue/blob.h
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ class TPartitionedBlob {

TPartitionedBlob(const TPartitionedBlob& x);

TPartitionedBlob(const ui32 partition, const ui64 offset, const TString& sourceId, const ui64 seqNo,
TPartitionedBlob(const TPartitionId& partition, const ui64 offset, const TString& sourceId, const ui64 seqNo,
const ui16 totalParts, const ui32 totalSize, THead& head, THead& newHead, bool headCleared, bool needCompactHead, const ui32 maxBlobSize);

std::optional<std::pair<TKey, TString>> Add(TClientBlob&& blob);
Expand All @@ -287,7 +287,7 @@ class TPartitionedBlob {
TString CompactHead(bool glueHead, THead& head, bool glueNewHead, THead& newHead, ui32 estimatedSize);

private:
ui32 Partition;
TPartitionId Partition;
ui64 Offset;
ui16 InternalPartsCount;
ui64 StartOffset;
Expand Down
22 changes: 11 additions & 11 deletions ydb/core/persqueue/cache_eviction.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,19 @@ namespace NPQ {
ERequestType Type;
TActorId Sender;
ui64 CookiePQ;
ui32 Partition;
TPartitionId Partition;
ui32 MetadataWritesCount;
TVector<TRequestedBlob> Blobs;

TKvRequest(ERequestType type, TActorId sender, ui64 cookie, ui32 partition)
TKvRequest(ERequestType type, TActorId sender, ui64 cookie, const TPartitionId& partition)
: Type(type)
, Sender(sender)
, CookiePQ(cookie)
, Partition(partition)
, MetadataWritesCount(0)
{}

TBlobId GetBlobId(ui32 pos) const { return TBlobId(Partition, Blobs[pos].Offset, Blobs[pos].PartNo, Blobs[pos].Count, Blobs[pos].InternalPartsCount); }
TBlobId GetBlobId(ui32 pos) const { return TBlobId(Partition.InternalPartitionId, Blobs[pos].Offset, Blobs[pos].PartNo, Blobs[pos].Count, Blobs[pos].InternalPartsCount); }

THolder<TEvKeyValue::TEvRequest> MakeKvRequest() const
{
Expand Down Expand Up @@ -110,7 +110,7 @@ namespace NPQ {
}

void Verify(const TRequestedBlob& blob) const {
TKey key(TKeyPrefix::TypeData, 0, blob.Offset, blob.PartNo, blob.Count, blob.InternalPartsCount, false);
TKey key(TKeyPrefix::TypeData, TPartitionId(0), blob.Offset, blob.PartNo, blob.Count, blob.InternalPartsCount, false);
Y_ABORT_UNLESS(blob.Value.size() == blob.Size);
TClientBlob::CheckBlob(key, blob.Value);
}
Expand Down Expand Up @@ -258,7 +258,7 @@ namespace NPQ {

for (const auto& blob : kvReq.Blobs) {
// Touching blobs in L2. We don't need data here
TCacheBlobL2 key = {kvReq.Partition, blob.Offset, blob.PartNo, nullptr};
TCacheBlobL2 key = {kvReq.Partition.InternalPartitionId, blob.Offset, blob.PartNo, nullptr};
if (blob.Cached)
reqData->RequestedBlobs.push_back(key);
else
Expand All @@ -275,10 +275,10 @@ namespace NPQ {
THolder<TCacheL2Request> reqData = MakeHolder<TCacheL2Request>(TabletId);

for (const TRequestedBlob& reqBlob : kvReq.Blobs) {
TBlobId blob(kvReq.Partition, reqBlob.Offset, reqBlob.PartNo, reqBlob.Count, reqBlob.InternalPartsCount);
TBlobId blob(kvReq.Partition.InternalPartitionId, reqBlob.Offset, reqBlob.PartNo, reqBlob.Count, reqBlob.InternalPartsCount);
{ // there could be a new blob with same id (for big messages)
if (RemoveExists(ctx, blob)) {
TCacheBlobL2 removed = {kvReq.Partition, reqBlob.Offset, reqBlob.PartNo, nullptr};
TCacheBlobL2 removed = {kvReq.Partition.InternalPartitionId, reqBlob.Offset, reqBlob.PartNo, nullptr};
reqData->RemovedBlobs.push_back(removed);
}
}
Expand All @@ -290,7 +290,7 @@ namespace NPQ {
if (L1Strategy)
L1Strategy->SaveHeadBlob(blob);

TCacheBlobL2 blobL2 = {kvReq.Partition, reqBlob.Offset, reqBlob.PartNo, cached};
TCacheBlobL2 blobL2 = {kvReq.Partition.InternalPartitionId, reqBlob.Offset, reqBlob.PartNo, cached};
reqData->StoredBlobs.push_back(blobL2);

LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Caching head blob in L1. Partition "
Expand All @@ -314,7 +314,7 @@ namespace NPQ {
continue;

const TRequestedBlob& reqBlob = kvReq.Blobs[i];
TBlobId blob(kvReq.Partition, reqBlob.Offset, reqBlob.PartNo, reqBlob.Count, reqBlob.InternalPartsCount);
TBlobId blob(kvReq.Partition.InternalPartitionId, reqBlob.Offset, reqBlob.PartNo, reqBlob.Count, reqBlob.InternalPartsCount);
{
TValueL1 value;
if (CheckExists(ctx, blob, value)) {
Expand All @@ -328,7 +328,7 @@ namespace NPQ {
Cache[blob] = valL1; // weak
Counters.Inc(valL1);

TCacheBlobL2 blobL2 = {kvReq.Partition, reqBlob.Offset, reqBlob.PartNo, cached};
TCacheBlobL2 blobL2 = {kvReq.Partition.InternalPartitionId, reqBlob.Offset, reqBlob.PartNo, cached};
reqData->StoredBlobs.push_back(blobL2);

LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Prefetched blob in L1. Partition "
Expand Down Expand Up @@ -428,7 +428,7 @@ namespace NPQ {
++numCached;
continue;
}
TBlobId blobId(kvReq.Partition, blob.Offset, blob.PartNo, blob.Count, blob.InternalPartsCount);
TBlobId blobId(kvReq.Partition.InternalPartitionId, blob.Offset, blob.PartNo, blob.Count, blob.InternalPartsCount);
TCacheValue::TPtr cached = GetValue(ctx, blobId);
if (cached) {
++numCached;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/event_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ void ReplyPersQueueError(
const TActorContext& ctx,
ui64 tabletId,
const TString& topicName,
TMaybe<ui32> partition,
TMaybe<TPartitionId> partition,
NKikimr::TTabletCountersBase& counters,
NKikimrServices::EServiceKikimr service,
const ui64 responseCookie,
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/persqueue/event_helpers.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include "partition_id.h"

#include <ydb/core/tablet/tablet_counters.h>
#include <ydb/library/services/services.pb.h>
#include <ydb/public/api/protos/draft/persqueue_error_codes.pb.h>
Expand All @@ -14,7 +16,7 @@ void ReplyPersQueueError(
const TActorContext& ctx,
ui64 tabletId,
const TString& topicName,
TMaybe<ui32> partition,
TMaybe<TPartitionId> partition,
NKikimr::TTabletCountersBase& counters,
NKikimrServices::EServiceKikimr service,
const ui64 responseCookie,
Expand Down
55 changes: 31 additions & 24 deletions ydb/core/persqueue/events/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,6 @@ struct TEvPQ {
EvQuotaCountersUpdated,
EvConsumerRemoved,
EvFetchResponse,
EvSourceIdRequest,
EvSourceIdResponse,
EvPublishRead,
EvForgetRead,
EvRegisterDirectReadSession,
Expand Down Expand Up @@ -313,13 +311,18 @@ struct TEvPQ {
};

struct TEvMonResponse : public TEventLocal<TEvMonResponse, EvMonResponse> {
TEvMonResponse(ui32 partition, const TVector<TString>& res, const TString& str)
TEvMonResponse(const NPQ::TPartitionId& partition, const TVector<TString>& res, const TString& str)
: Partition(partition)
, Res(res)
, Str(str)
{}

ui32 Partition;
TEvMonResponse(const TVector<TString>& res, const TString& str)
: Res(res)
, Str(str)
{}

TMaybe<NPQ::TPartitionId> Partition;
TVector<TString> Res;
TString Str;
};
Expand Down Expand Up @@ -398,11 +401,13 @@ struct TEvPQ {
};

struct TEvPartitionOffsetsResponse : public TEventLocal<TEvPartitionOffsetsResponse, EvPartitionOffsetsResponse> {
explicit TEvPartitionOffsetsResponse(NKikimrPQ::TOffsetsResponse::TPartResult& partResult)
TEvPartitionOffsetsResponse(NKikimrPQ::TOffsetsResponse::TPartResult& partResult, const NPQ::TPartitionId& partition)
: PartResult(partResult)
, Partition(partition)
{}

NKikimrPQ::TOffsetsResponse::TPartResult PartResult;
NPQ::TPartitionId Partition;
};

struct TEvPartitionStatus : public TEventLocal<TEvPartitionStatus, EvPartitionStatus> {
Expand All @@ -424,11 +429,13 @@ struct TEvPQ {
};

struct TEvPartitionStatusResponse : public TEventLocal<TEvPartitionStatusResponse, EvPartitionStatusResponse> {
explicit TEvPartitionStatusResponse(NKikimrPQ::TStatusResponse::TPartResult& partResult)
TEvPartitionStatusResponse(NKikimrPQ::TStatusResponse::TPartResult& partResult, const NPQ::TPartitionId& partition)
: PartResult(partResult)
, Partition(partition)
{}

NKikimrPQ::TStatusResponse::TPartResult PartResult;
NPQ::TPartitionId Partition;
};


Expand All @@ -442,11 +449,11 @@ struct TEvPQ {
};

struct TEvInitComplete : public TEventLocal<TEvInitComplete, EvInitComplete> {
explicit TEvInitComplete(const ui32 partition)
explicit TEvInitComplete(const NPQ::TPartitionId& partition)
: Partition(partition)
{}

ui32 Partition;
NPQ::TPartitionId Partition;
};

struct TEvError : public TEventLocal<TEvError, EvError> {
Expand All @@ -462,7 +469,7 @@ struct TEvPQ {
};

struct TEvBlobRequest : public TEventLocal<TEvBlobRequest, EvBlobRequest> {
TEvBlobRequest(const TString& user, const ui64 cookie, const ui32 partition, const ui64 readOffset,
TEvBlobRequest(const TString& user, const ui64 cookie, const NPQ::TPartitionId& partition, const ui64 readOffset,
TVector<NPQ::TRequestedBlob>&& blobs)
: User(user)
, Cookie(cookie)
Expand All @@ -473,7 +480,7 @@ struct TEvPQ {

TString User;
ui64 Cookie;
ui32 Partition;
NPQ::TPartitionId Partition;
ui64 ReadOffset;
TVector<NPQ::TRequestedBlob> Blobs;
};
Expand Down Expand Up @@ -565,12 +572,12 @@ struct TEvPQ {
};

struct TEvPartitionConfigChanged : public TEventLocal<TEvPartitionConfigChanged, EvPartitionConfigChanged> {
explicit TEvPartitionConfigChanged(ui32 partition) :
explicit TEvPartitionConfigChanged(const NPQ::TPartitionId& partition) :
Partition(partition)
{
}

ui32 Partition;
NPQ::TPartitionId Partition;
};

struct TEvChangeCacheConfig : public TEventLocal<TEvChangeCacheConfig, EvChangeCacheConfig> {
Expand All @@ -588,35 +595,35 @@ struct TEvPQ {
};

struct TEvPartitionCounters : public TEventLocal<TEvPartitionCounters, EvPartitionCounters> {
TEvPartitionCounters(const ui32 partition, const TTabletCountersBase& counters)
TEvPartitionCounters(const NPQ::TPartitionId& partition, const TTabletCountersBase& counters)
: Partition(partition)
{
Counters.Populate(counters);
}

const ui32 Partition;
const NPQ::TPartitionId Partition;
TTabletCountersBase Counters;
};

struct TEvPartitionLabeledCounters : public TEventLocal<TEvPartitionLabeledCounters, EvPartitionLabeledCounters> {
TEvPartitionLabeledCounters(const ui32 partition, const TTabletLabeledCountersBase& labeledCounters)
TEvPartitionLabeledCounters(const NPQ::TPartitionId& partition, const TTabletLabeledCountersBase& labeledCounters)
: Partition(partition)
, LabeledCounters(labeledCounters)
{
}

const ui32 Partition;
const NPQ::TPartitionId Partition;
TTabletLabeledCountersBase LabeledCounters;
};

struct TEvPartitionLabeledCountersDrop : public TEventLocal<TEvPartitionLabeledCountersDrop, EvPartitionLabeledCountersDrop> {
TEvPartitionLabeledCountersDrop(const ui32 partition, const TString& group)
TEvPartitionLabeledCountersDrop(const NPQ::TPartitionId& partition, const TString& group)
: Partition(partition)
, Group(group)
{
}

const ui32 Partition;
const NPQ::TPartitionId Partition;
TString Group;
};

Expand Down Expand Up @@ -798,7 +805,7 @@ struct TEvPQ {
};

struct TEvTxCalcPredicateResult : public TEventLocal<TEvTxCalcPredicateResult, EvTxCalcPredicateResult> {
TEvTxCalcPredicateResult(ui64 step, ui64 txId, ui32 partition, bool predicate) :
TEvTxCalcPredicateResult(ui64 step, ui64 txId, const NPQ::TPartitionId& partition, bool predicate) :
Step(step),
TxId(txId),
Partition(partition),
Expand All @@ -808,7 +815,7 @@ struct TEvPQ {

ui64 Step;
ui64 TxId;
ui32 Partition;
NPQ::TPartitionId Partition;
bool Predicate = false;
};

Expand All @@ -826,7 +833,7 @@ struct TEvPQ {
};

struct TEvProposePartitionConfigResult : public TEventLocal<TEvProposePartitionConfigResult, EvProposePartitionConfigResult> {
TEvProposePartitionConfigResult(ui64 step, ui64 txId, ui32 partition) :
TEvProposePartitionConfigResult(ui64 step, ui64 txId, const NPQ::TPartitionId& partition) :
Step(step),
TxId(txId),
Partition(partition)
Expand All @@ -835,7 +842,7 @@ struct TEvPQ {

ui64 Step;
ui64 TxId;
ui32 Partition;
NPQ::TPartitionId Partition;
};

struct TEvTxCommit : public TEventLocal<TEvTxCommit, EvTxCommit> {
Expand All @@ -850,7 +857,7 @@ struct TEvPQ {
};

struct TEvTxCommitDone : public TEventLocal<TEvTxCommitDone, EvTxCommitDone> {
TEvTxCommitDone(ui64 step, ui64 txId, ui32 partition) :
TEvTxCommitDone(ui64 step, ui64 txId, const NPQ::TPartitionId& partition) :
Step(step),
TxId(txId),
Partition(partition)
Expand All @@ -859,7 +866,7 @@ struct TEvPQ {

ui64 Step;
ui64 TxId;
ui32 Partition;
NPQ::TPartitionId Partition;
};

struct TEvTxRollback : public TEventLocal<TEvTxRollback, EvTxRollback> {
Expand Down
Loading

0 comments on commit 922d72d

Please sign in to comment.