Skip to content

Commit

Permalink
Merge 022bb1e into ec22f24
Browse files Browse the repository at this point in the history
  • Loading branch information
nshestakov authored May 20, 2024
2 parents ec22f24 + 022bb1e commit ff6f1d0
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 24 deletions.
4 changes: 2 additions & 2 deletions ydb/core/protos/pqconfig.proto
Original file line number Diff line number Diff line change
Expand Up @@ -620,11 +620,11 @@ message TReleasePartition {
optional uint64 Generation = 2;
optional string Session = 3;
optional string ClientId = 4;
optional uint32 Count = 5;
optional uint32 Count = 5 [deprecated = true]; // Remove at 2025-1
optional NActorsProto.TActorId PipeClient = 6;
optional uint32 Group = 7;
optional string Path = 8;
repeated uint32 Partition = 9;
reserved 9; // repeated uint32 Partition = 9
}

message TPartitionReleased {
Expand Down
32 changes: 10 additions & 22 deletions ydb/services/persqueue_v1/actors/read_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1445,15 +1445,7 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPersQueue::TEvReleasePar
}
};

std::unordered_set<ui32> partitionsForRealese;
for (ui32 p : record.GetPartition()) {
partitionsForRealese.insert(p);
}
if (group) {
partitionsForRealese.insert(group - 1);
}

if (partitionsForRealese.empty()) {
if (!group) {
// Release partitions by count
for (ui32 c = 0; c < record.GetCount(); ++c) {
if (Partitions.empty()) {
Expand Down Expand Up @@ -1484,32 +1476,28 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPersQueue::TEvReleasePar
doRelease(jt);
}
} else {
ui32 partitionId = group - 1;
bool found = false;

// Release partitions by partition id
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " gone release"
<< ": partitions# " << JoinRange(", ", partitionsForRealese.begin(), partitionsForRealese.end()));
<< ": partition# " << partitionId);

for (auto it = Partitions.begin(); it != Partitions.end(); ++it) {
auto& partitionInfo = it->second;
if (partitionInfo.Topic->GetInternalName() == converter->GetInternalName()) {
auto pt = partitionsForRealese.find(partitionInfo.Partition.Partition);
if (pt == partitionsForRealese.end()) {
continue;
}

if (partitionInfo.Topic->GetInternalName() == converter->GetInternalName() && partitionId == partitionInfo.Partition.Partition) {
if (!partitionInfo.Releasing) {
doRelease(it);
}

partitionsForRealese.erase(pt);
if (partitionsForRealese.empty()) {
break;
}
found = true;
break;
}
}

if (!partitionsForRealese.empty()) {
if (!found) {
return CloseSession(PersQueue::ErrorCode::ErrorCode::ERROR,
TStringBuilder() << "internal error: releasing unknown partitions " << JoinRange(", ", partitionsForRealese.begin(), partitionsForRealese.end()),
TStringBuilder() << "internal error: releasing unknown partition " << partitionId,
ctx);
}
}
Expand Down

0 comments on commit ff6f1d0

Please sign in to comment.