Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

the race between TEvLockStatus and TEvProposeTransaction #4758

Merged
merged 20 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
95bedbb
[+] the response of the supportive partition has 'MinSeqNo' and its i…
Alek5andr-Kotov May 15, 2024
8bf3678
[+] debug TPartitionWriter
Alek5andr-Kotov May 16, 2024
88dcf08
[+] TPartitionWriter sends the supportive partition ID to KQP
Alek5andr-Kotov May 16, 2024
3746399
[+] PQ tablet is waiting for the service partition ID in the TEvPropo…
Alek5andr-Kotov May 16, 2024
eeb75db
[+] FirstWrite flag
Alek5andr-Kotov May 21, 2024
4510f8d
[*] TPartitionWriter sends the supportive partition ID to KQP
Alek5andr-Kotov May 22, 2024
846cfbf
[-] The tablet does not delete WriteId entries
Alek5andr-Kotov May 22, 2024
ded3a4e
[+] Test
Alek5andr-Kotov May 22, 2024
4421081
[+] tests
Alek5andr-Kotov May 22, 2024
03cbedf
[-] comparison operation for partition IDs
Alek5andr-Kotov May 23, 2024
045f5a5
[/] tracing
Alek5andr-Kotov May 23, 2024
2080e17
[-] the function does not return a result
Alek5andr-Kotov May 23, 2024
23e0048
[-] compilation errors
Alek5andr-Kotov May 23, 2024
0c2f6a4
[-] tests TPartitionWriterCacheActorTests::*
Alek5andr-Kotov May 24, 2024
f0b9bed
[-] test TPQTabletTests::ProposeTx_Command_After_Propose
Alek5andr-Kotov May 24, 2024
32cbf0b
[-] transaction and two writing sessions
Alek5andr-Kotov May 27, 2024
7b56087
[*] FirstWrite -> NeedSupportivePartition
Alek5andr-Kotov May 27, 2024
083ea25
[*] PQ returns code PRECONDITION_FAILED
Alek5andr-Kotov May 27, 2024
c1a221c
[*] FirstWrite -> NeedSupportivePartition
Alek5andr-Kotov May 27, 2024
65b9396
[*] FirstWrite -> NeedSupportivePartition
Alek5andr-Kotov May 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions ydb/core/kqp/session_actor/kqp_query_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,8 +323,14 @@ void TKqpQueryState::AddOffsetsToTransaction() {
const auto& operations = GetTopicOperations();

TMaybe<TString> consumer;
if (operations.HasConsumer())
if (operations.HasConsumer()) {
consumer = operations.GetConsumer();
}

TMaybe<ui32> supportivePartition;
if (operations.HasSupportivePartition()) {
supportivePartition = operations.GetSupportivePartition();
}

TopicOperations = NTopic::TTopicOperations();

Expand All @@ -334,7 +340,7 @@ void TKqpQueryState::AddOffsetsToTransaction() {

for (auto& partition : topic.partitions()) {
if (partition.partition_offsets().empty()) {
TopicOperations.AddOperation(path, partition.partition_id());
TopicOperations.AddOperation(path, partition.partition_id(), supportivePartition);
} else {
for (auto& range : partition.partition_offsets()) {
YQL_ENSURE(consumer.Defined());
Expand Down
26 changes: 23 additions & 3 deletions ydb/core/kqp/topics/kqp_topics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,17 @@

namespace NKikimr::NKqp::NTopic {

static void UpdateSupportivePartition(TMaybe<ui32>& lhs, const TMaybe<ui32>& rhs)
{
if (lhs) {
if ((rhs != Nothing()) && (rhs != lhs)) {
lhs = Max<ui32>();
}
} else {
lhs = rhs;
}
}

//
// TConsumerOperations
//
Expand Down Expand Up @@ -78,7 +89,8 @@ void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partitio
Operations_[consumer].AddOperation(consumer, range);
}

void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partition)
void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partition,
TMaybe<ui32> supportivePartition)
{
Y_ABORT_UNLESS(Topic_.Empty() || Topic_ == topic);
Y_ABORT_UNLESS(Partition_.Empty() || Partition_ == partition);
Expand All @@ -88,6 +100,8 @@ void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partitio
Partition_ = partition;
}

UpdateSupportivePartition(SupportivePartition_, supportivePartition);

HasWriteOperations_ = true;
}

Expand All @@ -112,6 +126,9 @@ void TTopicPartitionOperations::BuildTopicTxs(THashMap<ui64, NKikimrPQ::TDataTra
NKikimrPQ::TPartitionOperation* o = tx.MutableOperations()->Add();
o->SetPartitionId(*Partition_);
o->SetPath(*Topic_);
if (SupportivePartition_.Defined()) {
o->SetSupportivePartition(*SupportivePartition_);
}
}
}

Expand All @@ -127,6 +144,8 @@ void TTopicPartitionOperations::Merge(const TTopicPartitionOperations& rhs)
TabletId_ = rhs.TabletId_;
}

UpdateSupportivePartition(SupportivePartition_, rhs.SupportivePartition_);

for (auto& [key, value] : rhs.Operations_) {
Operations_[key].Merge(value);
}
Expand Down Expand Up @@ -240,10 +259,11 @@ void TTopicOperations::AddOperation(const TString& topic, ui32 partition,
HasReadOperations_ = true;
}

void TTopicOperations::AddOperation(const TString& topic, ui32 partition)
void TTopicOperations::AddOperation(const TString& topic, ui32 partition,
TMaybe<ui32> supportivePartition)
{
TTopicPartition key{topic, partition};
Operations_[key].AddOperation(topic, partition);
Operations_[key].AddOperation(topic, partition, supportivePartition);
HasWriteOperations_ = true;
}

Expand Down
7 changes: 5 additions & 2 deletions ydb/core/kqp/topics/kqp_topics.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ class TTopicPartitionOperations {
void AddOperation(const TString& topic, ui32 partition,
const TString& consumer,
const Ydb::Topic::OffsetsRange& range);
void AddOperation(const TString& topic, ui32 partition);
void AddOperation(const TString& topic, ui32 partition,
TMaybe<ui32> supportivePartition);

void BuildTopicTxs(THashMap<ui64, NKikimrPQ::TDataTransaction> &txs);

Expand All @@ -67,6 +68,7 @@ class TTopicPartitionOperations {
THashMap<TString, TConsumerOperations> Operations_;
bool HasWriteOperations_ = false;
TMaybe<ui64> TabletId_;
TMaybe<ui32> SupportivePartition_;
};

struct TTopicPartition {
Expand Down Expand Up @@ -98,7 +100,8 @@ class TTopicOperations {
void AddOperation(const TString& topic, ui32 partition,
const TString& consumer,
const Ydb::Topic::OffsetsRange& range);
void AddOperation(const TString& topic, ui32 partition);
void AddOperation(const TString& topic, ui32 partition,
TMaybe<ui32> supportivePartition);

void FillSchemeCacheNavigate(NSchemeCache::TSchemeCacheNavigate& navigate,
TMaybe<TString> consumer);
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/persqueue/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3042,6 +3042,8 @@ void TPartition::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const T

void TPartition::HandleOnInit(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext&)
{
Y_ABORT_UNLESS(IsSupportive());

PendingEvents.emplace_back(ev->ReleaseBase().Release());
}

Expand Down
5 changes: 3 additions & 2 deletions ydb/core/persqueue/partition_id.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@ class TPartitionId {

size_t GetHash() const
{
return MultiHash(OriginalPartitionId, WriteId);
return MultiHash(MultiHash(OriginalPartitionId, WriteId), InternalPartitionId);
}

bool IsEqual(const TPartitionId& rhs) const
{
return
(OriginalPartitionId == rhs.OriginalPartitionId) &&
(WriteId == rhs.WriteId);
(WriteId == rhs.WriteId) &&
(InternalPartitionId == rhs.InternalPartitionId);
}

void ToStream(IOutputStream& s) const
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/persqueue/partition_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ void TPartition::ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TS
r->SetOwnerCookie(cookie);
r->SetStatus(PartitionConfig ? PartitionConfig->GetStatus() : NKikimrPQ::ETopicPartitionStatus::Active);
r->SetSeqNo(seqNo);
if (IsSupportive()) {
r->SetSupportivePartition(Partition.InternalPartitionId);
}

ctx.Send(Tablet, response.Release());
}
Expand Down
79 changes: 59 additions & 20 deletions ydb/core/persqueue/pq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2001,7 +2001,8 @@ void TPersQueue::HandleWriteRequest(const ui64 responseCookie, const TActorId& p
"Tablet " << TabletID() <<
" Write in transaction." <<
" Partition: " << req.GetPartition() <<
", WriteId: " << req.GetWriteId());
", WriteId: " << req.GetWriteId() <<
", NeedSupportivePartition: " << req.GetNeedSupportivePartition());
}

for (ui32 i = 0; i < req.CmdWriteSize(); ++i) {
Expand Down Expand Up @@ -2198,7 +2199,8 @@ void TPersQueue::HandleReserveBytesRequest(const ui64 responseCookie, const TAct
"Tablet " << TabletID() <<
" Reserve bytes in transaction." <<
" Partition: " << req.GetPartition() <<
", WriteId: " << req.GetWriteId());
", WriteId: " << req.GetWriteId() <<
", NeedSupportivePartition: " << req.GetNeedSupportivePartition());
}

InitResponseBuilder(responseCookie, 1, COUNTER_LATENCY_PQ_RESERVE_BYTES);
Expand Down Expand Up @@ -2574,6 +2576,8 @@ void TPersQueue::HandleWriteRequestForSupportivePartition(const ui64 responseCoo
const NKikimrClient::TPersQueuePartitionRequest& req,
const TActorContext& ctx)
{
Y_ABORT_UNLESS(req.HasWriteId());

const TPartitionInfo& partition = GetPartitionInfo(req);
const TActorId& actorId = partition.Actor;

Expand Down Expand Up @@ -2648,6 +2652,14 @@ void TPersQueue::HandleEventForSupportivePartition(const ui64 responseCookie,
sender);
}
} else {
if (!req.GetNeedSupportivePartition()) {
ReplyError(ctx,
responseCookie,
NPersQueue::NErrorCode::PRECONDITION_FAILED,
"lost messages");
alexnick88 marked this conversation as resolved.
Show resolved Hide resolved
return;
}

//
// этап 1:
// - создать запись в TxWrites
Expand All @@ -2665,6 +2677,7 @@ void TPersQueue::HandleEventForSupportivePartition(const ui64 responseCookie,
TPartitionId partitionId(originalPartitionId, writeId, NextSupportivePartitionId++);

writeInfo.Partitions.emplace(originalPartitionId, partitionId);
TxWritesChanged = true;
AddSupportivePartition(partitionId);

Y_ABORT_UNLESS(Partitions.contains(partitionId));
Expand Down Expand Up @@ -3115,6 +3128,38 @@ void TPersQueue::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TAc

}

bool TPersQueue::CheckTxWriteOperation(const NKikimrPQ::TPartitionOperation& operation,
ui64 writeId) const
{
TPartitionId partitionId(operation.GetPartitionId(),
writeId,
operation.GetSupportivePartition());
return Partitions.contains(partitionId);
}

bool TPersQueue::CheckTxWriteOperations(const NKikimrPQ::TDataTransaction& txBody) const
{
if (!txBody.HasWriteId()) {
return true;
}

ui64 writeId = txBody.GetWriteId();

for (auto& operation : txBody.GetOperations()) {
auto isWrite = [](const NKikimrPQ::TPartitionOperation& o) {
return !o.HasBegin();
};

if (isWrite(operation)) {
if (!CheckTxWriteOperation(operation, writeId)) {
return false;
}
}
}

return true;
}

void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransaction> ev,
const TActorContext& ctx)
{
Expand All @@ -3123,23 +3168,6 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
Y_ABORT_UNLESS(event.HasData());
const NKikimrPQ::TDataTransaction& txBody = event.GetData();

for (auto& operation : txBody.GetOperations()) {
Y_ABORT_UNLESS(!operation.HasPath() || (operation.GetPath() == TopicPath));

bool isWriteOperation = !operation.HasBegin();

LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE,
"Tablet " << TabletID() <<
" tx=" << event.GetTxId() <<
", write_id=" << txBody.GetWriteId() <<
", path=" << operation.GetPath() <<
", partition=" << operation.GetPartitionId() <<
", consumer=" << operation.GetConsumer() <<
", begin=" << operation.GetBegin() <<
", end=" << operation.GetEnd() <<
", is_write=" << isWriteOperation);
}

if (TabletState != NKikimrPQ::ENormal) {
SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()),
event.GetTxId(),
Expand All @@ -3158,6 +3186,13 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
return;
}

if (!CheckTxWriteOperations(txBody)) {
SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()),
event.GetTxId(),
ctx);
return;
}

TMaybe<TPartitionId> partitionId = FindPartitionId(txBody);
if (!partitionId.Defined()) {
SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()),
Expand Down Expand Up @@ -3392,7 +3427,8 @@ void TPersQueue::BeginWriteTxs(const TActorContext& ctx)
CanProcessPlanStepQueue() ||
CanProcessWriteTxs() ||
CanProcessDeleteTxs() ||
CanProcessTxWrites()
CanProcessTxWrites() ||
TxWritesChanged
;
if (!canProcess) {
return;
Expand Down Expand Up @@ -3441,6 +3477,8 @@ void TPersQueue::EndWriteTxs(const NKikimrClient::TResponse& resp,
return;
}

TxWritesChanged = false;

SendReplies(ctx);
CheckChangedTxStates(ctx);
CreateSupportivePartitionActors(ctx);
Expand Down Expand Up @@ -4473,6 +4511,7 @@ void TPersQueue::Handle(TEvPQ::TEvDeletePartitionDone::TPtr& ev, const TActorCon
}
TxWrites.erase(writeId);
}
TxWritesChanged = true;

TryWriteTxs(ctx);
}
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/persqueue/pq_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
};

THashMap<ui64, TTxWriteInfo> TxWrites;
bool TxWritesChanged = false;
ui32 NextSupportivePartitionId = 100'000;

TActorId CacheActor;
Expand Down Expand Up @@ -494,6 +495,10 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {

void BeginDeleteTx(const TDistributedTransaction& tx);
void BeginDeletePartitions(TTxWriteInfo& writeInfo);

bool CheckTxWriteOperation(const NKikimrPQ::TPartitionOperation& operation,
ui64 writeId) const;
bool CheckTxWriteOperations(const NKikimrPQ::TDataTransaction& txBody) const;
};


Expand Down
5 changes: 5 additions & 0 deletions ydb/core/persqueue/ut/pqtablet_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ struct TGetOwnershipRequestParams {
TMaybe<ui32> Partition;
TMaybe<ui64> MsgNo;
TMaybe<ui64> WriteId;
TMaybe<bool> NeedSupportivePartition;
TMaybe<TString> Owner; // o
TMaybe<ui64> Cookie;
};
Expand Down Expand Up @@ -550,6 +551,9 @@ std::unique_ptr<TEvPersQueue::TEvRequest> TPQTabletFixture::MakeGetOwnershipRequ
if (params.WriteId.Defined()) {
request->SetWriteId(*params.WriteId);
}
if (params.NeedSupportivePartition.Defined()) {
request->SetNeedSupportivePartition(*params.NeedSupportivePartition);
}
if (params.Cookie.Defined()) {
request->SetCookie(*params.Cookie);
}
Expand Down Expand Up @@ -1281,6 +1285,7 @@ Y_UNIT_TEST_F(ProposeTx_Command_After_Propose, TPQTabletFixture)

SyncGetOwnership({.Partition=partitionId,
.WriteId=writeId,
.NeedSupportivePartition=true,
.Owner="-=[ 0wn3r ]=-",
.Cookie=4},
{.Cookie=4,
Expand Down
Loading
Loading