Skip to content

Commit

Permalink
Merge 03cbedf into cc67495
Browse files Browse the repository at this point in the history
  • Loading branch information
Alek5andr-Kotov authored May 23, 2024
2 parents cc67495 + 03cbedf commit 03f6cbd
Show file tree
Hide file tree
Showing 13 changed files with 494 additions and 36 deletions.
10 changes: 8 additions & 2 deletions ydb/core/kqp/session_actor/kqp_query_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,8 +323,14 @@ void TKqpQueryState::AddOffsetsToTransaction() {
const auto& operations = GetTopicOperations();

TMaybe<TString> consumer;
if (operations.HasConsumer())
if (operations.HasConsumer()) {
consumer = operations.GetConsumer();
}

TMaybe<ui32> supportivePartition;
if (operations.HasSupportivePartition()) {
supportivePartition = operations.GetSupportivePartition();
}

TopicOperations = NTopic::TTopicOperations();

Expand All @@ -334,7 +340,7 @@ void TKqpQueryState::AddOffsetsToTransaction() {

for (auto& partition : topic.partitions()) {
if (partition.partition_offsets().empty()) {
TopicOperations.AddOperation(path, partition.partition_id());
TopicOperations.AddOperation(path, partition.partition_id(), supportivePartition);
} else {
for (auto& range : partition.partition_offsets()) {
YQL_ENSURE(consumer.Defined());
Expand Down
15 changes: 12 additions & 3 deletions ydb/core/kqp/topics/kqp_topics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partitio
Operations_[consumer].AddOperation(consumer, range);
}

void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partition)
void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partition,
TMaybe<ui32> supportivePartition)
{
Y_ABORT_UNLESS(Topic_.Empty() || Topic_ == topic);
Y_ABORT_UNLESS(Partition_.Empty() || Partition_ == partition);
Expand All @@ -88,6 +89,8 @@ void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partitio
Partition_ = partition;
}

SupportivePartition_ = SupportivePartition_ ? Max<ui32>() : supportivePartition;

HasWriteOperations_ = true;
}

Expand All @@ -112,6 +115,9 @@ void TTopicPartitionOperations::BuildTopicTxs(THashMap<ui64, NKikimrPQ::TDataTra
NKikimrPQ::TPartitionOperation* o = tx.MutableOperations()->Add();
o->SetPartitionId(*Partition_);
o->SetPath(*Topic_);
if (SupportivePartition_.Defined()) {
o->SetSupportivePartition(*SupportivePartition_);
}
}
}

Expand All @@ -127,6 +133,8 @@ void TTopicPartitionOperations::Merge(const TTopicPartitionOperations& rhs)
TabletId_ = rhs.TabletId_;
}

SupportivePartition_ = SupportivePartition_ ? Max<ui32>() : rhs.SupportivePartition_;

for (auto& [key, value] : rhs.Operations_) {
Operations_[key].Merge(value);
}
Expand Down Expand Up @@ -240,10 +248,11 @@ void TTopicOperations::AddOperation(const TString& topic, ui32 partition,
HasReadOperations_ = true;
}

void TTopicOperations::AddOperation(const TString& topic, ui32 partition)
void TTopicOperations::AddOperation(const TString& topic, ui32 partition,
TMaybe<ui32> supportivePartition)
{
TTopicPartition key{topic, partition};
Operations_[key].AddOperation(topic, partition);
Operations_[key].AddOperation(topic, partition, supportivePartition);
HasWriteOperations_ = true;
}

Expand Down
7 changes: 5 additions & 2 deletions ydb/core/kqp/topics/kqp_topics.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ class TTopicPartitionOperations {
void AddOperation(const TString& topic, ui32 partition,
const TString& consumer,
const Ydb::Topic::OffsetsRange& range);
void AddOperation(const TString& topic, ui32 partition);
void AddOperation(const TString& topic, ui32 partition,
TMaybe<ui32> supportivePartition);

void BuildTopicTxs(THashMap<ui64, NKikimrPQ::TDataTransaction> &txs);

Expand All @@ -67,6 +68,7 @@ class TTopicPartitionOperations {
THashMap<TString, TConsumerOperations> Operations_;
bool HasWriteOperations_ = false;
TMaybe<ui64> TabletId_;
TMaybe<ui32> SupportivePartition_;
};

struct TTopicPartition {
Expand Down Expand Up @@ -98,7 +100,8 @@ class TTopicOperations {
void AddOperation(const TString& topic, ui32 partition,
const TString& consumer,
const Ydb::Topic::OffsetsRange& range);
void AddOperation(const TString& topic, ui32 partition);
void AddOperation(const TString& topic, ui32 partition,
TMaybe<ui32> supportivePartition);

void FillSchemeCacheNavigate(NSchemeCache::TSchemeCacheNavigate& navigate,
TMaybe<TString> consumer);
Expand Down
8 changes: 7 additions & 1 deletion ydb/core/persqueue/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3040,13 +3040,19 @@ void TPartition::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const T
Send(ev->Sender, response.Release());
}

void TPartition::HandleOnInit(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext&)
void TPartition::HandleOnInit(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext& ctx)
{
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE,
"Delete supportive partition " << Partition);
Y_ABORT_UNLESS(IsSupportive());

PendingEvents.emplace_back(ev->ReleaseBase().Release());
}

void TPartition::Handle(TEvPQ::TEvDeletePartition::TPtr&, const TActorContext& ctx)
{
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE,
"Delete supportive partition " << Partition);
Y_ABORT_UNLESS(IsSupportive());
Y_ABORT_UNLESS(DeletePartitionState == DELETION_NOT_INITED);

Expand Down
5 changes: 3 additions & 2 deletions ydb/core/persqueue/partition_id.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@ class TPartitionId {

size_t GetHash() const
{
return MultiHash(OriginalPartitionId, WriteId);
return MultiHash(MultiHash(OriginalPartitionId, WriteId), InternalPartitionId);
}

bool IsEqual(const TPartitionId& rhs) const
{
return
(OriginalPartitionId == rhs.OriginalPartitionId) &&
(WriteId == rhs.WriteId);
(WriteId == rhs.WriteId) &&
(InternalPartitionId == rhs.InternalPartitionId);
}

void ToStream(IOutputStream& s) const
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/persqueue/partition_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ void TPartition::ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TS
r->SetOwnerCookie(cookie);
r->SetStatus(PartitionConfig ? PartitionConfig->GetStatus() : NKikimrPQ::ETopicPartitionStatus::Active);
r->SetSeqNo(seqNo);
if (IsSupportive()) {
r->SetSupportivePartition(Partition.InternalPartitionId);
}

ctx.Send(Tablet, response.Release());
}
Expand Down
Loading

0 comments on commit 03f6cbd

Please sign in to comment.