Skip to content

Commit

Permalink
Merge 045f5a5 into cc67495
Browse files Browse the repository at this point in the history
  • Loading branch information
Alek5andr-Kotov authored May 23, 2024
2 parents cc67495 + 045f5a5 commit c13b904
Show file tree
Hide file tree
Showing 13 changed files with 402 additions and 34 deletions.
10 changes: 8 additions & 2 deletions ydb/core/kqp/session_actor/kqp_query_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,8 +323,14 @@ void TKqpQueryState::AddOffsetsToTransaction() {
const auto& operations = GetTopicOperations();

TMaybe<TString> consumer;
if (operations.HasConsumer())
if (operations.HasConsumer()) {
consumer = operations.GetConsumer();
}

TMaybe<ui32> supportivePartition;
if (operations.HasSupportivePartition()) {
supportivePartition = operations.GetSupportivePartition();
}

TopicOperations = NTopic::TTopicOperations();

Expand All @@ -334,7 +340,7 @@ void TKqpQueryState::AddOffsetsToTransaction() {

for (auto& partition : topic.partitions()) {
if (partition.partition_offsets().empty()) {
TopicOperations.AddOperation(path, partition.partition_id());
TopicOperations.AddOperation(path, partition.partition_id(), supportivePartition);
} else {
for (auto& range : partition.partition_offsets()) {
YQL_ENSURE(consumer.Defined());
Expand Down
15 changes: 12 additions & 3 deletions ydb/core/kqp/topics/kqp_topics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partitio
Operations_[consumer].AddOperation(consumer, range);
}

void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partition)
void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partition,
TMaybe<ui32> supportivePartition)
{
Y_ABORT_UNLESS(Topic_.Empty() || Topic_ == topic);
Y_ABORT_UNLESS(Partition_.Empty() || Partition_ == partition);
Expand All @@ -88,6 +89,8 @@ void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partitio
Partition_ = partition;
}

SupportivePartition_ = SupportivePartition_ ? Max<ui32>() : supportivePartition;

HasWriteOperations_ = true;
}

Expand All @@ -112,6 +115,9 @@ void TTopicPartitionOperations::BuildTopicTxs(THashMap<ui64, NKikimrPQ::TDataTra
NKikimrPQ::TPartitionOperation* o = tx.MutableOperations()->Add();
o->SetPartitionId(*Partition_);
o->SetPath(*Topic_);
if (SupportivePartition_.Defined()) {
o->SetSupportivePartition(*SupportivePartition_);
}
}
}

Expand All @@ -127,6 +133,8 @@ void TTopicPartitionOperations::Merge(const TTopicPartitionOperations& rhs)
TabletId_ = rhs.TabletId_;
}

SupportivePartition_ = SupportivePartition_ ? Max<ui32>() : rhs.SupportivePartition_;

for (auto& [key, value] : rhs.Operations_) {
Operations_[key].Merge(value);
}
Expand Down Expand Up @@ -240,10 +248,11 @@ void TTopicOperations::AddOperation(const TString& topic, ui32 partition,
HasReadOperations_ = true;
}

void TTopicOperations::AddOperation(const TString& topic, ui32 partition)
void TTopicOperations::AddOperation(const TString& topic, ui32 partition,
TMaybe<ui32> supportivePartition)
{
TTopicPartition key{topic, partition};
Operations_[key].AddOperation(topic, partition);
Operations_[key].AddOperation(topic, partition, supportivePartition);
HasWriteOperations_ = true;
}

Expand Down
7 changes: 5 additions & 2 deletions ydb/core/kqp/topics/kqp_topics.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ class TTopicPartitionOperations {
void AddOperation(const TString& topic, ui32 partition,
const TString& consumer,
const Ydb::Topic::OffsetsRange& range);
void AddOperation(const TString& topic, ui32 partition);
void AddOperation(const TString& topic, ui32 partition,
TMaybe<ui32> supportivePartition);

void BuildTopicTxs(THashMap<ui64, NKikimrPQ::TDataTransaction> &txs);

Expand All @@ -67,6 +68,7 @@ class TTopicPartitionOperations {
THashMap<TString, TConsumerOperations> Operations_;
bool HasWriteOperations_ = false;
TMaybe<ui64> TabletId_;
TMaybe<ui32> SupportivePartition_;
};

struct TTopicPartition {
Expand Down Expand Up @@ -98,7 +100,8 @@ class TTopicOperations {
void AddOperation(const TString& topic, ui32 partition,
const TString& consumer,
const Ydb::Topic::OffsetsRange& range);
void AddOperation(const TString& topic, ui32 partition);
void AddOperation(const TString& topic, ui32 partition,
TMaybe<ui32> supportivePartition);

void FillSchemeCacheNavigate(NSchemeCache::TSchemeCacheNavigate& navigate,
TMaybe<TString> consumer);
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/persqueue/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3042,6 +3042,8 @@ void TPartition::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const T

void TPartition::HandleOnInit(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext&)
{
Y_ABORT_UNLESS(IsSupportive());

PendingEvents.emplace_back(ev->ReleaseBase().Release());
}

Expand Down
5 changes: 3 additions & 2 deletions ydb/core/persqueue/partition_id.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@ class TPartitionId {

size_t GetHash() const
{
return MultiHash(OriginalPartitionId, WriteId);
return MultiHash(MultiHash(OriginalPartitionId, WriteId), InternalPartitionId);
}

bool IsEqual(const TPartitionId& rhs) const
{
return
(OriginalPartitionId == rhs.OriginalPartitionId) &&
(WriteId == rhs.WriteId);
(WriteId == rhs.WriteId) &&
(InternalPartitionId == rhs.InternalPartitionId);
}

void ToStream(IOutputStream& s) const
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/persqueue/partition_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ void TPartition::ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TS
r->SetOwnerCookie(cookie);
r->SetStatus(PartitionConfig ? PartitionConfig->GetStatus() : NKikimrPQ::ETopicPartitionStatus::Active);
r->SetSeqNo(seqNo);
if (IsSupportive()) {
r->SetSupportivePartition(Partition.InternalPartitionId);
}

ctx.Send(Tablet, response.Release());
}
Expand Down
82 changes: 61 additions & 21 deletions ydb/core/persqueue/pq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2001,7 +2001,8 @@ void TPersQueue::HandleWriteRequest(const ui64 responseCookie, const TActorId& p
"Tablet " << TabletID() <<
" Write in transaction." <<
" Partition: " << req.GetPartition() <<
", WriteId: " << req.GetWriteId());
", WriteId: " << req.GetWriteId() <<
", FirstWrite: " << req.GetFirstWrite());
}

for (ui32 i = 0; i < req.CmdWriteSize(); ++i) {
Expand Down Expand Up @@ -2198,7 +2199,8 @@ void TPersQueue::HandleReserveBytesRequest(const ui64 responseCookie, const TAct
"Tablet " << TabletID() <<
" Reserve bytes in transaction." <<
" Partition: " << req.GetPartition() <<
", WriteId: " << req.GetWriteId());
", WriteId: " << req.GetWriteId() <<
", FirstWrite: " << req.GetFirstWrite());
}

InitResponseBuilder(responseCookie, 1, COUNTER_LATENCY_PQ_RESERVE_BYTES);
Expand Down Expand Up @@ -2574,6 +2576,8 @@ void TPersQueue::HandleWriteRequestForSupportivePartition(const ui64 responseCoo
const NKikimrClient::TPersQueuePartitionRequest& req,
const TActorContext& ctx)
{
Y_ABORT_UNLESS(req.HasWriteId());

const TPartitionInfo& partition = GetPartitionInfo(req);
const TActorId& actorId = partition.Actor;

Expand Down Expand Up @@ -2618,6 +2622,7 @@ void TPersQueue::HandleEventForSupportivePartition(const ui64 responseCookie,

ui64 writeId = req.GetWriteId();
ui32 originalPartitionId = req.GetPartition();
bool firstWrite = req.GetFirstWrite();

if (TxWrites.contains(writeId) && TxWrites.at(writeId).Partitions.contains(originalPartitionId)) {
//
Expand Down Expand Up @@ -2648,6 +2653,14 @@ void TPersQueue::HandleEventForSupportivePartition(const ui64 responseCookie,
sender);
}
} else {
if (!firstWrite) {
ReplyError(ctx,
responseCookie,
NPersQueue::NErrorCode::BAD_REQUEST,
"lost messages");
return;
}

//
// этап 1:
// - создать запись в TxWrites
Expand All @@ -2665,6 +2678,7 @@ void TPersQueue::HandleEventForSupportivePartition(const ui64 responseCookie,
TPartitionId partitionId(originalPartitionId, writeId, NextSupportivePartitionId++);

writeInfo.Partitions.emplace(originalPartitionId, partitionId);
TxWritesChanged = true;
AddSupportivePartition(partitionId);

Y_ABORT_UNLESS(Partitions.contains(partitionId));
Expand Down Expand Up @@ -3115,6 +3129,38 @@ void TPersQueue::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TAc

}

bool TPersQueue::CheckTxWriteOperation(const NKikimrPQ::TPartitionOperation& operation,
ui64 writeId) const
{
TPartitionId partitionId(operation.GetPartitionId(),
writeId,
operation.GetSupportivePartition());
return Partitions.contains(partitionId);
}

bool TPersQueue::CheckTxWriteOperations(const NKikimrPQ::TDataTransaction& txBody) const
{
if (!txBody.HasWriteId()) {
return true;
}

ui64 writeId = txBody.GetWriteId();

for (auto& operation : txBody.GetOperations()) {
auto isWrite = [](const NKikimrPQ::TPartitionOperation& o) {
return !o.HasBegin();
};

if (isWrite(operation)) {
if (!CheckTxWriteOperation(operation, writeId)) {
return false;
}
}
}

return true;
}

void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransaction> ev,
const TActorContext& ctx)
{
Expand All @@ -3123,23 +3169,6 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
Y_ABORT_UNLESS(event.HasData());
const NKikimrPQ::TDataTransaction& txBody = event.GetData();

for (auto& operation : txBody.GetOperations()) {
Y_ABORT_UNLESS(!operation.HasPath() || (operation.GetPath() == TopicPath));

bool isWriteOperation = !operation.HasBegin();

LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE,
"Tablet " << TabletID() <<
" tx=" << event.GetTxId() <<
", write_id=" << txBody.GetWriteId() <<
", path=" << operation.GetPath() <<
", partition=" << operation.GetPartitionId() <<
", consumer=" << operation.GetConsumer() <<
", begin=" << operation.GetBegin() <<
", end=" << operation.GetEnd() <<
", is_write=" << isWriteOperation);
}

if (TabletState != NKikimrPQ::ENormal) {
SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()),
event.GetTxId(),
Expand All @@ -3158,6 +3187,13 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
return;
}

if (!CheckTxWriteOperations(txBody)) {
SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()),
event.GetTxId(),
ctx);
return;
}

TMaybe<TPartitionId> partitionId = FindPartitionId(txBody);
if (!partitionId.Defined()) {
SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()),
Expand Down Expand Up @@ -3392,7 +3428,8 @@ void TPersQueue::BeginWriteTxs(const TActorContext& ctx)
CanProcessPlanStepQueue() ||
CanProcessWriteTxs() ||
CanProcessDeleteTxs() ||
CanProcessTxWrites()
CanProcessTxWrites() ||
TxWritesChanged
;
if (!canProcess) {
return;
Expand Down Expand Up @@ -3441,6 +3478,8 @@ void TPersQueue::EndWriteTxs(const NKikimrClient::TResponse& resp,
return;
}

TxWritesChanged = false;

SendReplies(ctx);
CheckChangedTxStates(ctx);
CreateSupportivePartitionActors(ctx);
Expand Down Expand Up @@ -4411,7 +4450,7 @@ void TPersQueue::ProcessCheckPartitionStatusRequests(const TPartitionId& partiti
}
}

void TPersQueue::Handle(NLongTxService::TEvLongTxService::TEvLockStatus::TPtr& ev, const TActorContext&)
void TPersQueue::Handle(NLongTxService::TEvLongTxService::TEvLockStatus::TPtr& ev, const TActorContext& ctx)
{
auto& record = ev->Get()->Record;
ui64 writeId = record.GetLockId();
Expand Down Expand Up @@ -4473,6 +4512,7 @@ void TPersQueue::Handle(TEvPQ::TEvDeletePartitionDone::TPtr& ev, const TActorCon
}
TxWrites.erase(writeId);
}
TxWritesChanged = true;

TryWriteTxs(ctx);
}
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/persqueue/pq_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
};

THashMap<ui64, TTxWriteInfo> TxWrites;
bool TxWritesChanged = false;
ui32 NextSupportivePartitionId = 100'000;

TActorId CacheActor;
Expand Down Expand Up @@ -494,6 +495,10 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {

void BeginDeleteTx(const TDistributedTransaction& tx);
void BeginDeletePartitions(TTxWriteInfo& writeInfo);

bool CheckTxWriteOperation(const NKikimrPQ::TPartitionOperation& operation,
ui64 writeId) const;
bool CheckTxWriteOperations(const NKikimrPQ::TDataTransaction& txBody) const;
};


Expand Down
Loading

0 comments on commit c13b904

Please sign in to comment.