From afb93b0f8b1cf844d6961a5110990c3769032796 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Wed, 15 May 2024 17:01:44 +0300 Subject: [PATCH 1/3] async propose phase on CS (config stage on SS) --- .../columnshard/bg_tasks/abstract/adapter.h | 2 + .../tx/columnshard/bg_tasks/manager/actor.h | 6 + .../blobs_action/transaction/tx_write.cpp | 30 +-- .../blobs_action/transaction/tx_write.h | 22 ++- .../columnshard__propose_transaction.cpp | 50 +++-- .../tx/columnshard/columnshard__write.cpp | 12 +- ydb/core/tx/columnshard/columnshard_impl.h | 3 + ydb/core/tx/columnshard/columnshard_schema.h | 6 + .../data_sharing/common/context/context.cpp | 48 +++-- .../data_sharing/common/context/context.h | 9 +- .../data_sharing/common/session/common.h | 6 +- .../destination/session/destination.cpp | 21 ++- .../destination/session/destination.h | 7 +- .../transactions/tx_finish_from_source.cpp | 19 +- .../transactions/tx_finish_from_source.h | 1 + .../transactions/tx_start_from_initiator.cpp | 3 +- .../initiator/controller/schemeshard.cpp | 9 + .../initiator/controller/schemeshard.h | 59 ++++++ .../data_sharing/initiator/controller/ya.make | 1 + .../data_sharing/protos/initiator.proto | 5 + .../data_sharing/protos/sessions.proto | 1 + .../columnshard/export/actor/export_actor.cpp | 29 +++ .../columnshard/export/actor/export_actor.h | 8 +- .../tx/columnshard/export/protos/task.proto | 1 + .../tx/columnshard/export/session/session.h | 3 + .../tx/columnshard/export/session/task.cpp | 12 +- ydb/core/tx/columnshard/export/session/task.h | 7 +- .../columnshard/normalizer/portion/chunks.cpp | 2 +- .../normalizer/portion/normalizer.cpp | 2 +- .../transactions/operators/backup.cpp | 29 +-- .../transactions/operators/backup.h | 70 +++---- .../transactions/operators/ev_write.cpp | 3 +- .../transactions/operators/ev_write.h | 39 ++-- .../transactions/operators/long_tx_write.cpp | 40 +++- .../transactions/operators/long_tx_write.h | 58 ++---- .../transactions/operators/schema.cpp | 127 ++++++++++++- .../transactions/operators/schema.h | 157 +++------------- .../transactions/operators/sharing.cpp | 80 ++++++++ .../transactions/operators/sharing.h | 50 +++++ .../transactions/operators/ss_operation.cpp | 23 +++ .../transactions/operators/ss_operation.h | 17 ++ .../transactions/operators/ya.make | 2 + .../transactions/tx_controller.cpp | 175 +++++++++--------- .../columnshard/transactions/tx_controller.h | 137 +++++++++++++- ydb/core/tx/columnshard/ut_rw/ut_backup.cpp | 7 +- 45 files changed, 969 insertions(+), 429 deletions(-) create mode 100644 ydb/core/tx/columnshard/data_sharing/initiator/controller/schemeshard.cpp create mode 100644 ydb/core/tx/columnshard/data_sharing/initiator/controller/schemeshard.h create mode 100644 ydb/core/tx/columnshard/transactions/operators/sharing.cpp create mode 100644 ydb/core/tx/columnshard/transactions/operators/sharing.h create mode 100644 ydb/core/tx/columnshard/transactions/operators/ss_operation.cpp create mode 100644 ydb/core/tx/columnshard/transactions/operators/ss_operation.h diff --git a/ydb/core/tx/columnshard/bg_tasks/abstract/adapter.h b/ydb/core/tx/columnshard/bg_tasks/abstract/adapter.h index 6eaa6df6bfa5..17def00927cc 100644 --- a/ydb/core/tx/columnshard/bg_tasks/abstract/adapter.h +++ b/ydb/core/tx/columnshard/bg_tasks/abstract/adapter.h @@ -1,6 +1,8 @@ #pragma once #include "session.h" #include +#include + #include #include #include diff --git a/ydb/core/tx/columnshard/bg_tasks/manager/actor.h b/ydb/core/tx/columnshard/bg_tasks/manager/actor.h index 8a6c08c0757e..9d350f2e4cda 100644 --- a/ydb/core/tx/columnshard/bg_tasks/manager/actor.h +++ b/ydb/core/tx/columnshard/bg_tasks/manager/actor.h @@ -34,6 +34,12 @@ class TSessionActor: public NActors::TActorBootstrapped { void SaveSessionProgress(); void SaveSessionState(); + + template + T* GetShardVerified() const { + return &Adapter->GetTabletExecutorVerifiedAs(); + } + public: TSessionActor(const std::shared_ptr& session, const std::shared_ptr& adapter) : TabletId(adapter->GetTabletId()) diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp index 5fc94bb45558..ac498107b8f9 100644 --- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp +++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp @@ -75,6 +75,7 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) { for (auto&& i : buffer.GetRemoveActions()) { i->OnExecuteTxAfterRemoving(blobManagerDb, true); } + Results.clear(); for (auto&& aggr : buffer.GetAggregations()) { const auto& writeMeta = aggr->GetWriteData()->GetWriteMeta(); if (!writeMeta.HasLongTxId()) { @@ -87,22 +88,22 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) { proto.SetLockId(operation->GetLockId()); TString txBody; Y_ABORT_UNLESS(proto.SerializeToString(&txBody)); - auto result = Self->GetProgressTxController().ProposeTransaction(TTxController::TBasicTxInfo(NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE, operation->GetLockId()), txBody, writeMeta.GetSource(), operation->GetCookie(), txc); - AFL_VERIFY(!result.IsError()); - Results.emplace_back(NEvents::TDataEvents::TEvWriteResult::BuildPrepared( - Self->TabletID(), result.GetFullTxInfoVerified().TxId, Self->GetProgressTxController().BuildCoordinatorInfo(result.GetFullTxInfoVerified()))); + auto op = Self->GetProgressTxController().StartProposeOnExecute(TTxController::TBasicTxInfo(NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE, operation->GetLockId()), txBody, writeMeta.GetSource(), operation->GetCookie(), txc); + AFL_VERIFY(!op->IsFail()); + ResultOperators.emplace_back(op); } else { NKikimrDataEvents::TLock lock; lock.SetLockId(operation->GetLockId()); lock.SetDataShard(Self->TabletID()); lock.SetGeneration(1); lock.SetCounter(1); - - Results.emplace_back(NEvents::TDataEvents::TEvWriteResult::BuildCompleted(Self->TabletID(), operation->GetLockId(), lock)); + auto ev = NEvents::TDataEvents::TEvWriteResult::BuildCompleted(Self->TabletID(), operation->GetLockId(), lock); + Results.emplace_back(std::move(ev), writeMeta.GetSource()); } } else { Y_ABORT_UNLESS(aggr->GetWriteIds().size() == 1); - Results.emplace_back(std::make_unique(Self->TabletID(), writeMeta, (ui64)aggr->GetWriteIds().front(), NKikimrTxColumnShard::EResultStatus::SUCCESS)); + auto ev = std::make_unique(Self->TabletID(), writeMeta, (ui64)aggr->GetWriteIds().front(), NKikimrTxColumnShard::EResultStatus::SUCCESS); + Results.emplace_back(std::move(ev), writeMeta.GetSource()); } } return true; @@ -119,16 +120,15 @@ void TTxWrite::Complete(const TActorContext& ctx) { for (auto&& i : buffer.GetRemoveActions()) { i->OnCompleteTxAfterRemoving(true); } - AFL_VERIFY(buffer.GetAggregations().size() == Results.size()); + AFL_VERIFY(buffer.GetAggregations().size() == Results.size() + ResultOperators.size()); + for (auto&& i : ResultOperators) { + Self->GetProgressTxController().FinishProposeOnComplete(i->GetTxId(), ctx); + } + for (auto&& i : Results) { + i.DoSendReply(ctx); + } for (ui32 i = 0; i < buffer.GetAggregations().size(); ++i) { const auto& writeMeta = buffer.GetAggregations()[i]->GetWriteData()->GetWriteMeta(); - auto operation = Self->OperationsManager->GetOperation((TWriteId)writeMeta.GetWriteId()); - if (operation) { - Self->GetProgressTxController().CompleteTransaction(operation->GetLockId(), ctx); - ctx.Send(writeMeta.GetSource(), Results[i].release(), 0, operation->GetCookie()); - } else { - ctx.Send(writeMeta.GetSource(), Results[i].release()); - } Self->CSCounters.OnWriteTxComplete(now - writeMeta.GetWriteStartInstant()); Self->CSCounters.OnSuccessWriteResponse(); } diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.h b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.h index fbc15665bdf7..da4567b25a02 100644 --- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.h +++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.h @@ -16,11 +16,29 @@ class TTxWrite : public NTabletFlatExecutor::TTransactionBase { void Complete(const TActorContext& ctx) override; TTxType GetTxType() const override { return TXTYPE_WRITE; } - private: TEvPrivate::TEvWriteBlobsResult::TPtr PutBlobResult; const ui32 TabletTxNo; - std::vector> Results; + + class TReplyInfo { + private: + std::unique_ptr Event; + TActorId DestinationForReply; + public: + TReplyInfo(std::unique_ptr&& ev, const TActorId& destinationForReply) + : Event(std::move(ev)) + , DestinationForReply(destinationForReply) + { + + } + + void DoSendReply(const TActorContext& ctx) { + ctx.Send(DestinationForReply, Event.release()); + } + }; + + std::vector Results; + std::vector> ResultOperators; bool InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TWriteId writeId); diff --git a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp index 0dae70199a9f..696341fb6511 100644 --- a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp +++ b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp @@ -15,11 +15,11 @@ class TTxProposeTransaction : public NTabletFlatExecutor::TTransactionBase(Self->TabletID(), txKind, txId, proposeResult.GetStatus(), proposeResult.GetStatusMessage()); + auto reply = std::make_unique(Self->TabletID(), txKind, txId, proposeResult.GetStatus(), proposeResult.GetStatusMessage()); + ctx.Send(Ev->Sender, reply.release()); return true; } @@ -49,36 +50,27 @@ class TTxProposeTransaction : public NTabletFlatExecutor::TTransactionBaseCurrentSchemeShardId == record.GetSchemeShardId()); } } - auto result = Self->GetProgressTxController().ProposeTransaction(TTxController::TBasicTxInfo(txKind, txId), txBody, Ev->Get()->GetSource(), Ev->Cookie, txc); - const auto& proposeResult = result.GetProposeResult(); - if (result.IsError()) { - const auto& txInfo = result.GetBaseTxInfoVerified(); - Result = std::make_unique(Self->TabletID(), txInfo.TxKind, txInfo.TxId, proposeResult.GetStatus(), proposeResult.GetStatusMessage()); - Self->IncCounter(COUNTER_PREPARE_ERROR); - AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("message", proposeResult.GetStatusMessage())("tablet_id", Self->TabletID())("tx_id", txInfo.TxId); - } else { - const auto& txInfo = result.GetFullTxInfoVerified(); - AFL_VERIFY(proposeResult.GetStatus() == NKikimrTxColumnShard::EResultStatus::PREPARED)("tx_id", txInfo.TxId)("details", proposeResult.DebugString()); - Result = std::make_unique(Self->TabletID(), txInfo.TxKind, txInfo.TxId, proposeResult.GetStatus(), proposeResult.GetStatusMessage()); - Result->Record.SetMinStep(txInfo.MinStep); - Result->Record.SetMaxStep(txInfo.MaxStep); - if (Self->ProcessingParams) { - Result->Record.MutableDomainCoordinators()->CopyFrom(Self->ProcessingParams->GetCoordinators()); - } - Self->IncCounter(COUNTER_PREPARE_SUCCESS); - } + TxOperator = Self->GetProgressTxController().StartProposeOnExecute(TTxController::TBasicTxInfo(txKind, txId), txBody, Ev->Get()->GetSource(), Ev->Cookie, txc); return true; } virtual void Complete(const TActorContext& ctx) override { - Y_ABORT_UNLESS(Ev); - Y_ABORT_UNLESS(Result); - auto& record = Proto(Ev->Get()); + if (record.GetTxKind() == NKikimrTxColumnShard::TX_KIND_TTL) { + return; + } + AFL_VERIFY(!!TxOperator); const ui64 txId = record.GetTxId(); - Self->GetProgressTxController().CompleteTransaction(txId, ctx); - ctx.Send(Ev->Get()->GetSource(), Result.release()); + if (TxOperator->IsFail()) { + TxOperator->SendReply(*Self, ctx); + } + if (TxOperator->IsAsync()) { + Self->GetProgressTxController().StartProposeOnComplete(txId, ctx); + } else { + Self->GetProgressTxController().FinishProposeOnComplete(txId, ctx); + } + Self->TryRegisterMediatorTimeCast(); } @@ -86,7 +78,7 @@ class TTxProposeTransaction : public NTabletFlatExecutor::TTransactionBase Result; + std::shared_ptr TxOperator; TTxController::TProposeResult ProposeTtlDeprecated(const TString& txBody) { /// @note There's no tx guaranties now. For now TX_KIND_TTL is used to trigger TTL in tests only. diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index 752e74158cdd..bea1e96d0381 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -270,7 +270,6 @@ class TProposeWriteTransaction : public NTabletFlatExecutor::TTransactionBase Result; }; bool TProposeWriteTransaction::Execute(TTransactionContext& txc, const TActorContext&) { @@ -278,18 +277,13 @@ bool TProposeWriteTransaction::Execute(TTransactionContext& txc, const TActorCon proto.SetLockId(WriteCommit->GetLockId()); TString txBody; Y_ABORT_UNLESS(proto.SerializeToString(&txBody)); - auto result = Self->GetProgressTxController().ProposeTransaction( - TTxController::TBasicTxInfo(NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE, WriteCommit->GetTxId()), txBody, Source, Cookie, txc); - if (result.IsError()) { - Result = NEvents::TDataEvents::TEvWriteResult::BuildError(Self->TabletID(), result.GetTxId(), NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, result.GetProposeResult().GetStatusMessage()); - } else { - Result = NEvents::TDataEvents::TEvWriteResult::BuildPrepared(Self->TabletID(), result.GetTxId(), Self->GetProgressTxController().BuildCoordinatorInfo(result.GetFullTxInfoVerified())); - } + Y_UNUSED(Self->GetProgressTxController().StartProposeOnExecute( + TTxController::TBasicTxInfo(NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE, WriteCommit->GetTxId()), txBody, Source, Cookie, txc)); return true; } void TProposeWriteTransaction::Complete(const TActorContext& ctx) { - ctx.Send(Source, Result.release(), 0, Cookie); + Self->GetProgressTxController().FinishProposeOnComplete(WriteCommit->GetTxId(), ctx); } void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActorContext& ctx) { diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index abd72886ec7f..a1dd606a0887 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -183,6 +183,9 @@ class TColumnShard friend class TLongTxTransactionOperator; friend class TEvWriteTransactionOperator; friend class TBackupTransactionOperator; + friend class ISSTransactionOperator; + friend class TSharingTransactionOperator; + class TTxProgressTx; class TTxProposeCancel; diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h index 605810859932..c83e41b4d0ee 100644 --- a/ydb/core/tx/columnshard/columnshard_schema.h +++ b/ydb/core/tx/columnshard/columnshard_schema.h @@ -579,6 +579,12 @@ struct Schema : NIceDb::Schema { NIceDb::TUpdate(cookie)); } + static void UpdateTxInfoSource(NIceDb::TNiceDb& db, ui64 txId, const TActorId& source, ui64 cookie) { + db.Table().Key(txId).Update( + NIceDb::TUpdate(source), + NIceDb::TUpdate(cookie)); + } + static void UpdateTxInfoPlanStep(NIceDb::TNiceDb& db, ui64 txId, ui64 planStep) { db.Table().Key(txId).Update( NIceDb::TUpdate(planStep)); diff --git a/ydb/core/tx/columnshard/data_sharing/common/context/context.cpp b/ydb/core/tx/columnshard/data_sharing/common/context/context.cpp index 64f32b59b920..e2d856844652 100644 --- a/ydb/core/tx/columnshard/data_sharing/common/context/context.cpp +++ b/ydb/core/tx/columnshard/data_sharing/common/context/context.cpp @@ -6,16 +6,27 @@ namespace NKikimr::NOlap::NDataSharing { NKikimrColumnShardDataSharingProto::TTransferContext TTransferContext::SerializeToProto() const { NKikimrColumnShardDataSharingProto::TTransferContext result; result.SetDestinationTabletId((ui64)DestinationTabletId); + if (TxId) { + result.SetTxId(*TxId); + } for (auto&& i : SourceTabletIds) { result.AddSourceTabletIds((ui64)i); } - SnapshotBarrier.SerializeToProto(*result.MutableSnapshotBarrier()); + if (SnapshotBarrier) { + SnapshotBarrier->SerializeToProto(*result.MutableSnapshotBarrier()); + } result.SetMoving(Moving); return result; } NKikimr::TConclusionStatus TTransferContext::DeserializeFromProto(const NKikimrColumnShardDataSharingProto::TTransferContext& proto) { DestinationTabletId = (TTabletId)proto.GetDestinationTabletId(); + if (proto.HasTxId()) { + TxId = proto.GetTxId(); + if (!*TxId) { + return TConclusionStatus::Fail("TxId is incorrect"); + } + } if (!(ui64)DestinationTabletId) { return TConclusionStatus::Fail("incorrect DestinationTabletId in proto"); } @@ -24,15 +35,16 @@ NKikimr::TConclusionStatus TTransferContext::DeserializeFromProto(const NKikimrC } Moving = proto.GetMoving(); { - if (!proto.HasSnapshotBarrier()) { - return TConclusionStatus::Fail("SnapshotBarrier not initialized in proto."); - } - auto snapshotParse = SnapshotBarrier.DeserializeFromProto(proto.GetSnapshotBarrier()); - if (!snapshotParse) { - return snapshotParse; - } - if (!SnapshotBarrier.Valid()) { - return TConclusionStatus::Fail("SnapshotBarrier must be valid in proto."); + if (proto.HasSnapshotBarrier()) { + TSnapshot snapshot = TSnapshot::Zero(); + auto snapshotParse = snapshot.DeserializeFromProto(proto.GetSnapshotBarrier()); + if (!snapshotParse) { + return snapshotParse; + } + if (!snapshot.Valid()) { + return TConclusionStatus::Fail("SnapshotBarrier must be valid in proto."); + } + SnapshotBarrier = snapshot; } } return TConclusionStatus::Success(); @@ -47,16 +59,28 @@ bool TTransferContext::IsEqualTo(const TTransferContext& context) const { } TString TTransferContext::DebugString() const { - return TStringBuilder() << "{from=" << (ui64)DestinationTabletId << ";moving=" << Moving << ";snapshot=" << SnapshotBarrier.DebugString() << "}"; + return TStringBuilder() << "{from=" << (ui64)DestinationTabletId << ";moving=" << Moving << ";snapshot=" << (SnapshotBarrier ? SnapshotBarrier->DebugString() : "NO") << "}"; } -TTransferContext::TTransferContext(const TTabletId destination, const THashSet& sources, const TSnapshot& snapshotBarrier, const bool moving) +TTransferContext::TTransferContext(const TTabletId destination, const THashSet& sources, const TSnapshot& snapshotBarrier, const bool moving, const std::optional txId) : DestinationTabletId(destination) , SourceTabletIds(sources) , Moving(moving) , SnapshotBarrier(snapshotBarrier) + , TxId(txId) { + AFL_VERIFY(!TxId || *TxId); AFL_VERIFY(!sources.contains(destination)); } +const NKikimr::NOlap::TSnapshot& TTransferContext::GetSnapshotBarrierVerified() const { + AFL_VERIFY(!!SnapshotBarrier); + return *SnapshotBarrier; +} + +void TTransferContext::SetSnapshotBarrier(const TSnapshot& snapshot) { + AFL_VERIFY(!SnapshotBarrier || *SnapshotBarrier == snapshot); + SnapshotBarrier = snapshot; +} + } \ No newline at end of file diff --git a/ydb/core/tx/columnshard/data_sharing/common/context/context.h b/ydb/core/tx/columnshard/data_sharing/common/context/context.h index b124bc7a07ea..2369f7639e11 100644 --- a/ydb/core/tx/columnshard/data_sharing/common/context/context.h +++ b/ydb/core/tx/columnshard/data_sharing/common/context/context.h @@ -15,13 +15,18 @@ class TTransferContext { YDB_READONLY(TTabletId, DestinationTabletId, (TTabletId)0); YDB_READONLY_DEF(THashSet, SourceTabletIds); YDB_READONLY(bool, Moving, false); - YDB_READONLY(TSnapshot, SnapshotBarrier, TSnapshot::Zero()); + std::optional SnapshotBarrier; + YDB_READONLY_DEF(std::optional, TxId); public: TTransferContext() = default; bool IsEqualTo(const TTransferContext& context) const; TString DebugString() const; - TTransferContext(const TTabletId destination, const THashSet& sources, const TSnapshot& snapshotBarrier, const bool moving); + const TSnapshot& GetSnapshotBarrierVerified() const; + + void SetSnapshotBarrier(const TSnapshot& snapshot); + + TTransferContext(const TTabletId destination, const THashSet& sources, const TSnapshot& snapshotBarrier, const bool moving, const std::optional txId = {}); NKikimrColumnShardDataSharingProto::TTransferContext SerializeToProto() const; TConclusionStatus DeserializeFromProto(const NKikimrColumnShardDataSharingProto::TTransferContext& proto); }; diff --git a/ydb/core/tx/columnshard/data_sharing/common/session/common.h b/ydb/core/tx/columnshard/data_sharing/common/session/common.h index c3070b5e7b8e..d2e06fa98894 100644 --- a/ydb/core/tx/columnshard/data_sharing/common/session/common.h +++ b/ydb/core/tx/columnshard/data_sharing/common/session/common.h @@ -53,6 +53,10 @@ class TCommonSession { , TransferContext(transferContext) { } + const TTransferContext& GetTransferContext() const { + return TransferContext; + } + bool IsFinished() const { return IsFinishedFlag; } @@ -73,7 +77,7 @@ class TCommonSession { void Finish(const std::shared_ptr& dataLocksManager); const TSnapshot& GetSnapshotBarrier() const { - return TransferContext.GetSnapshotBarrier(); + return TransferContext.GetSnapshotBarrierVerified(); } TString DebugString() const; diff --git a/ydb/core/tx/columnshard/data_sharing/destination/session/destination.cpp b/ydb/core/tx/columnshard/data_sharing/destination/session/destination.cpp index d70fd05f9849..d0647be552a7 100644 --- a/ydb/core/tx/columnshard/data_sharing/destination/session/destination.cpp +++ b/ydb/core/tx/columnshard/data_sharing/destination/session/destination.cpp @@ -42,14 +42,22 @@ NKikimr::TConclusionStatus TDestinationSession::DataReceived(THashMap tabletId) { +ui32 TDestinationSession::GetSourcesInProgressCount() const { AFL_VERIFY(IsStarted() || IsStarting()); - bool found = false; - bool allTransfersFinished = true; + AFL_VERIFY(Cursors.size()); + ui32 result = 0; for (auto&& [_, cursor] : Cursors) { if (!cursor.GetDataFinished()) { - allTransfersFinished = false; + ++result; } + } + return result; +} + +void TDestinationSession::SendCurrentCursorAck(const NColumnShard::TColumnShard& shard, const std::optional tabletId) { + AFL_VERIFY(IsStarted() || IsStarting()); + bool found = false; + for (auto&& [_, cursor] : Cursors) { if (tabletId && *tabletId != cursor.GetTabletId()) { continue; } @@ -73,11 +81,6 @@ void TDestinationSession::SendCurrentCursorAck(const NColumnShard::TColumnShard& new TEvPipeCache::TEvForward(ev.release(), (ui64)cursor.GetTabletId(), true), IEventHandle::FlagTrackDelivery, GetRuntimeId()); } } - if (allTransfersFinished && !IsFinished()) { - NYDBTest::TControllers::GetColumnShardController()->OnDataSharingFinished(shard.TabletID(), GetSessionId()); - Finish(shard.GetDataLocksManager()); - InitiatorController.Finished(GetSessionId()); - } AFL_VERIFY(found); } diff --git a/ydb/core/tx/columnshard/data_sharing/destination/session/destination.h b/ydb/core/tx/columnshard/data_sharing/destination/session/destination.h index a2780449d498..1c702e62e1a4 100644 --- a/ydb/core/tx/columnshard/data_sharing/destination/session/destination.h +++ b/ydb/core/tx/columnshard/data_sharing/destination/session/destination.h @@ -87,6 +87,10 @@ class TDestinationSession: public TCommonSession { return result; } public: + void SetBarrierSnapshot(const TSnapshot& value) { + TransferContext.SetSnapshotBarrier(value); + } + TSourceCursorForDestination& GetCursorVerified(const TTabletId& tabletId) { auto it = Cursors.find(tabletId); AFL_VERIFY(it != Cursors.end()); @@ -114,7 +118,8 @@ class TDestinationSession: public TCommonSession { [[nodiscard]] TConclusionStatus DataReceived(THashMap&& data, TColumnEngineForLogs& index, const std::shared_ptr& manager); - void SendCurrentCursorAck(const NColumnShard::TColumnShard& shard, const std::optional tabletId); + ui32 GetSourcesInProgressCount() const; + void SendCurrentCursorAck(const NColumnShard::TColumnShard & shard, const std::optional tabletId); NKikimrColumnShardDataSharingProto::TDestinationSession SerializeDataToProto() const; diff --git a/ydb/core/tx/columnshard/data_sharing/destination/transactions/tx_finish_from_source.cpp b/ydb/core/tx/columnshard/data_sharing/destination/transactions/tx_finish_from_source.cpp index 7229dafe789e..94e9ec55d027 100644 --- a/ydb/core/tx/columnshard/data_sharing/destination/transactions/tx_finish_from_source.cpp +++ b/ydb/core/tx/columnshard/data_sharing/destination/transactions/tx_finish_from_source.cpp @@ -1,4 +1,5 @@ #include "tx_finish_from_source.h" +#include namespace NKikimr::NOlap::NDataSharing { @@ -7,11 +8,27 @@ bool TTxFinishFromSource::DoExecute(NTabletFlatExecutor::TTransactionContext& tx NIceDb::TNiceDb db(txc.DB); db.Table().Key(Session->GetSessionId()) .Update(NIceDb::TUpdate(Session->SerializeCursorToProto().SerializeAsString())); + if (Session->GetSourcesInProgressCount() == 1) { + Finished = true; + if (Session->GetTransferContext().GetTxId()) { + Self->GetProgressTxController().FinishProposeOnExecute(*Session->GetTransferContext().GetTxId(), txc); + } + } return true; } -void TTxFinishFromSource::DoComplete(const TActorContext& /*ctx*/) { +void TTxFinishFromSource::DoComplete(const TActorContext& ctx) { Session->SendCurrentCursorAck(*Self, SourceTabletId); + + if (Finished) { + AFL_VERIFY(Session->GetSourcesInProgressCount() == 0); + if (Session->GetTransferContext().GetTxId()) { + Self->GetProgressTxController().FinishProposeOnComplete(*Session->GetTransferContext().GetTxId(), ctx); + } + NYDBTest::TControllers::GetColumnShardController()->OnDataSharingFinished(Self->TabletID(), Session->GetSessionId()); + Session->Finish(Self->GetDataLocksManager()); + Session->GetInitiatorController().Finished(Session->GetSessionId()); + } } } \ No newline at end of file diff --git a/ydb/core/tx/columnshard/data_sharing/destination/transactions/tx_finish_from_source.h b/ydb/core/tx/columnshard/data_sharing/destination/transactions/tx_finish_from_source.h index 92a0bb667988..4e2f107d8a2f 100644 --- a/ydb/core/tx/columnshard/data_sharing/destination/transactions/tx_finish_from_source.h +++ b/ydb/core/tx/columnshard/data_sharing/destination/transactions/tx_finish_from_source.h @@ -10,6 +10,7 @@ class TTxFinishFromSource: public TExtendedTransactionBase; std::shared_ptr Session; const TTabletId SourceTabletId; + bool Finished = false; protected: virtual bool DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& ctx) override; virtual void DoComplete(const TActorContext& ctx) override; diff --git a/ydb/core/tx/columnshard/data_sharing/destination/transactions/tx_start_from_initiator.cpp b/ydb/core/tx/columnshard/data_sharing/destination/transactions/tx_start_from_initiator.cpp index 4de8f6e30882..2c80eca95245 100644 --- a/ydb/core/tx/columnshard/data_sharing/destination/transactions/tx_start_from_initiator.cpp +++ b/ydb/core/tx/columnshard/data_sharing/destination/transactions/tx_start_from_initiator.cpp @@ -21,7 +21,8 @@ bool TTxConfirmFromInitiator::DoExecute(NTabletFlatExecutor::TTransactionContext NIceDb::TNiceDb db(txc.DB); Session->Confirm(true); db.Table().Key(Session->GetSessionId()) - .Update(NIceDb::TUpdate(Session->SerializeCursorToProto().SerializeAsString())); + .Update(NIceDb::TUpdate(Session->SerializeCursorToProto().SerializeAsString())) + .Update(NIceDb::TUpdate(Session->SerializeDataToProto().SerializeAsString())); return true; } diff --git a/ydb/core/tx/columnshard/data_sharing/initiator/controller/schemeshard.cpp b/ydb/core/tx/columnshard/data_sharing/initiator/controller/schemeshard.cpp new file mode 100644 index 000000000000..212569a00e04 --- /dev/null +++ b/ydb/core/tx/columnshard/data_sharing/initiator/controller/schemeshard.cpp @@ -0,0 +1,9 @@ +#include "schemeshard.h" + +namespace NKikimr::NOlap::NDataSharing { + +void TSSInitiatorController::DoProposeError(const TString& sessionId, const TString& message) const { + AFL_VERIFY(false)("error", "on_propose")("session_id", sessionId)("reason", message); +} + +} \ No newline at end of file diff --git a/ydb/core/tx/columnshard/data_sharing/initiator/controller/schemeshard.h b/ydb/core/tx/columnshard/data_sharing/initiator/controller/schemeshard.h new file mode 100644 index 000000000000..17ef14c50e20 --- /dev/null +++ b/ydb/core/tx/columnshard/data_sharing/initiator/controller/schemeshard.h @@ -0,0 +1,59 @@ +#pragma once +#include "abstract.h" + +namespace NKikimr::NOlap::NDataSharing { + +class TSSInitiatorController: public IInitiatorController { +public: + static TString GetClassNameStatic() { + return "SS"; + } +private: +// ui64 TabletId = 0; +// ui64 ReplyCookie = 0; + static inline TFactory::TRegistrator Registrator = TFactory::TRegistrator(GetClassNameStatic()); +protected: + virtual TConclusionStatus DoDeserializeFromProto(const NKikimrColumnShardDataSharingProto::TInitiator::TController& /*proto*/) override { +// if (!proto.HasSS()) { +// return TConclusionStatus::Fail("no data about SS initiator"); +// } +// if (!proto.GetSS().GetTabletId()) { +// return TConclusionStatus::Fail("incorrect tabletId for SS initiator"); +// } +// TabletId = proto.GetSS().GetTabletId(); +// ReplyCookie = proto.GetSS().GetReplyCookie(); + return TConclusionStatus::Success(); + } + virtual void DoSerializeToProto(NKikimrColumnShardDataSharingProto::TInitiator::TController& /*proto*/) const override { +// AFL_VERIFY(TabletId); +// proto.MutableSS()->SetTabletId(TabletId); +// proto.MutableSS()->SetReplyCookie(ReplyCookie); + } + + virtual void DoStatus(const TStatusContainer& /*status*/) const override { + + } + virtual void DoProposeError(const TString& sessionId, const TString& message) const override; + virtual void DoProposeSuccess(const TString& /*sessionId*/) const override { + } + virtual void DoConfirmSuccess(const TString& /*sessionId*/) const override { + } + virtual void DoFinished(const TString& /*sessionId*/) const override { +// auto ev = std::make_unique(sessionId); +// NActors::TActivationContext::AsActorContext().Send(MakePipePeNodeCacheID(false), +// new TEvPipeCache::TEvForward(ev.release(), (ui64)TabletId, true), IEventHandle::FlagTrackDelivery, ReplyId); + } + virtual TString GetClassName() const override { + return GetClassNameStatic(); + } +public: + TSSInitiatorController() = default; + TSSInitiatorController(const ui64 /*tabletId*/, const ui64 /*replyCookie*/) +// : TabletId(tabletId) +// , ReplyCookie(replyCookie) + { + + } +}; + +} \ No newline at end of file diff --git a/ydb/core/tx/columnshard/data_sharing/initiator/controller/ya.make b/ydb/core/tx/columnshard/data_sharing/initiator/controller/ya.make index 9f2dce70dbd7..11c9794dba93 100644 --- a/ydb/core/tx/columnshard/data_sharing/initiator/controller/ya.make +++ b/ydb/core/tx/columnshard/data_sharing/initiator/controller/ya.make @@ -3,6 +3,7 @@ LIBRARY() SRCS( abstract.cpp GLOBAL test.cpp + GLOBAL schemeshard.cpp ) PEERDIR( diff --git a/ydb/core/tx/columnshard/data_sharing/protos/initiator.proto b/ydb/core/tx/columnshard/data_sharing/protos/initiator.proto index 8f84e125473c..d6552cc3eaae 100644 --- a/ydb/core/tx/columnshard/data_sharing/protos/initiator.proto +++ b/ydb/core/tx/columnshard/data_sharing/protos/initiator.proto @@ -7,8 +7,13 @@ message TInitiator { message TTest { } + message TSchemeShard { + optional uint64 TabletId = 1; + } + oneof Implementation { TTest Test = 40; + TSchemeShard SS = 41; } } diff --git a/ydb/core/tx/columnshard/data_sharing/protos/sessions.proto b/ydb/core/tx/columnshard/data_sharing/protos/sessions.proto index ea01d6fc6f1b..cd0397a9a21d 100644 --- a/ydb/core/tx/columnshard/data_sharing/protos/sessions.proto +++ b/ydb/core/tx/columnshard/data_sharing/protos/sessions.proto @@ -13,6 +13,7 @@ message TTransferContext { repeated uint64 SourceTabletIds = 2; optional bool Moving = 3[default = false]; optional NKikimrColumnShardProto.TSnapshot SnapshotBarrier = 4; + optional uint64 TxId = 5; } message TDestinationSession { diff --git a/ydb/core/tx/columnshard/export/actor/export_actor.cpp b/ydb/core/tx/columnshard/export/actor/export_actor.cpp index be2806b2dd50..060a09e56ef7 100644 --- a/ydb/core/tx/columnshard/export/actor/export_actor.cpp +++ b/ydb/core/tx/columnshard/export/actor/export_actor.cpp @@ -33,4 +33,33 @@ void TActor::HandleExecute(NEvents::TEvExportWritingFailed::TPtr& /*ev*/) { Register(CreateWriteActor((ui64)TabletId, controller, TInstant::Max())); } +class TTxProposeFinish: public NTabletFlatExecutor::TTransactionBase { +private: + using TBase = NTabletFlatExecutor::TTransactionBase; + const ui64 TxId; +protected: + virtual bool Execute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& /*ctx*/) override { + Self->GetProgressTxController().FinishProposeOnExecute(TxId, txc); + return true; + } + virtual void Complete(const TActorContext& ctx) override { + Self->GetProgressTxController().FinishProposeOnComplete(TxId, ctx); + } +public: + TTxProposeFinish(NColumnShard::TColumnShard* self, const ui64 txId) + : TBase(self) + , TxId(txId) { + } +}; + +void TActor::OnSessionStateSaved() { + AFL_VERIFY(ExportSession->IsFinished()); + NYDBTest::TControllers::GetColumnShardController()->OnExportFinished(); + if (ExportSession->GetTxId()) { + ExecuteTransaction(std::make_unique(GetShardVerified(), *ExportSession->GetTxId())); + } else { + Session->FinishActor(); + } +} + } \ No newline at end of file diff --git a/ydb/core/tx/columnshard/export/actor/export_actor.h b/ydb/core/tx/columnshard/export/actor/export_actor.h index d2ad29b65a70..92a470a14ff6 100644 --- a/ydb/core/tx/columnshard/export/actor/export_actor.h +++ b/ydb/core/tx/columnshard/export/actor/export_actor.h @@ -48,14 +48,10 @@ class TActor: public NBackground::TSessionActor { TBase::Send(*ScanActorId, new NKqp::TEvKqpCompute::TEvScanDataAck(FreeSpace, (ui64)TabletId, 1)); } - virtual void OnSessionStateSaved() override { - AFL_VERIFY(ExportSession->IsFinished()); - NYDBTest::TControllers::GetColumnShardController()->OnExportFinished(); - Session->FinishActor(); - } + virtual void OnSessionStateSaved() override; virtual void OnTxCompleted(const ui64 /*txId*/) override { - AFL_VERIFY(false); + Session->FinishActor(); } virtual void OnSessionProgressSaved() override { diff --git a/ydb/core/tx/columnshard/export/protos/task.proto b/ydb/core/tx/columnshard/export/protos/task.proto index 7e5fb9efb152..438e9e934c0f 100644 --- a/ydb/core/tx/columnshard/export/protos/task.proto +++ b/ydb/core/tx/columnshard/export/protos/task.proto @@ -13,6 +13,7 @@ message TExportTask { optional TSelectorContainer Selector = 2; optional TStorageInitializerContainer StorageInitializer = 3; optional NKikimrSchemeOp.TOlapColumn.TSerializer Serializer = 4; + optional uint64 TxId = 5; } message TSessionControlContainer { diff --git a/ydb/core/tx/columnshard/export/session/session.h b/ydb/core/tx/columnshard/export/session/session.h index 84e8f9408949..ce98f56a8dd9 100644 --- a/ydb/core/tx/columnshard/export/session/session.h +++ b/ydb/core/tx/columnshard/export/session/session.h @@ -77,6 +77,9 @@ class TSession: public NBackground::TSessionProtoAdapter Registrator = TFactory::TRegistrator(GetClassNameStatic()); public: + std::optional GetTxId() const { + return Task->GetTxId(); + } virtual bool IsReadyForStart() const override { return Status == EStatus::Confirmed; } diff --git a/ydb/core/tx/columnshard/export/session/task.cpp b/ydb/core/tx/columnshard/export/session/task.cpp index 58e5212fa6a8..7153ddf9420e 100644 --- a/ydb/core/tx/columnshard/export/session/task.cpp +++ b/ydb/core/tx/columnshard/export/session/task.cpp @@ -24,6 +24,9 @@ NKikimr::TConclusionStatus TExportTask::DoDeserializeFromProto(const NKikimrColu Selector = selector.DetachResult(); StorageInitializer = initializer.DetachResult(); Serializer = serializer.DetachResult(); + if (proto.HasTxId()) { + TxId = proto.GetTxId(); + } return TConclusionStatus::Success(); } @@ -33,6 +36,9 @@ NKikimrColumnShardExportProto::TExportTask TExportTask::DoSerializeToProto() con *result.MutableSelector() = Selector.SerializeToProto(); *result.MutableStorageInitializer() = StorageInitializer.SerializeToProto(); *result.MutableSerializer() = Serializer.SerializeToProto(); + if (TxId) { + result.SetTxId(*TxId); + } return result; } @@ -45,7 +51,11 @@ NBackground::TSessionControlContainer TExportTask::BuildAbortControl() const { } std::shared_ptr TExportTask::DoBuildSession() const { - return std::make_shared(std::make_shared(Identifier, Selector, StorageInitializer, Serializer)); + auto result = std::make_shared(std::make_shared(Identifier, Selector, StorageInitializer, Serializer, TxId)); + if (!!TxId) { + result->Confirm(); + } + return result; } } diff --git a/ydb/core/tx/columnshard/export/session/task.h b/ydb/core/tx/columnshard/export/session/task.h index 75de9fcbb886..bb92e03b7ed9 100644 --- a/ydb/core/tx/columnshard/export/session/task.h +++ b/ydb/core/tx/columnshard/export/session/task.h @@ -23,6 +23,7 @@ class TExportTask: public NBackgroundTasks::TInterfaceProtoAdapter, TxId); virtual TConclusionStatus DoDeserializeFromProto(const NKikimrColumnShardExportProto::TExportTask& proto) override; virtual NKikimrColumnShardExportProto::TExportTask DoSerializeToProto() const override; @@ -31,6 +32,8 @@ class TExportTask: public NBackgroundTasks::TInterfaceProtoAdapter txId = {}) : Identifier(id) , Selector(selector) , StorageInitializer(storageInitializer) , Serializer(serializer) + , TxId(txId) { } diff --git a/ydb/core/tx/columnshard/normalizer/portion/chunks.cpp b/ydb/core/tx/columnshard/normalizer/portion/chunks.cpp index aeffea79e3cf..cc741c33a586 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/chunks.cpp +++ b/ydb/core/tx/columnshard/normalizer/portion/chunks.cpp @@ -131,7 +131,7 @@ TConclusion> TChunksNormalizer::DoInit(const TTablesManager tablesManager(controller.GetStoragesManager(), 0); if (!tablesManager.InitFromDB(db)) { - ACFL_ERROR("normalizer", "TChunksNormalizer")("error", "can't initialize tables manager"); + ACFL_TRACE("normalizer", "TChunksNormalizer")("error", "can't initialize tables manager"); return TConclusionStatus::Fail("Can't load index"); } diff --git a/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp b/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp index f8ac96ab664e..e706a6fcede4 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp +++ b/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp @@ -24,7 +24,7 @@ TConclusion> TPortionsNormalizerBase::DoInit( NColumnShard::TTablesManager tablesManager(controller.GetStoragesManager(), 0); if (!tablesManager.InitFromDB(db)) { - ACFL_ERROR("normalizer", "TPortionsNormalizer")("error", "can't initialize tables manager"); + ACFL_TRACE("normalizer", "TPortionsNormalizer")("error", "can't initialize tables manager"); return TConclusionStatus::Fail("Can't load index"); } diff --git a/ydb/core/tx/columnshard/transactions/operators/backup.cpp b/ydb/core/tx/columnshard/transactions/operators/backup.cpp index b87b38545a44..711256a35f78 100644 --- a/ydb/core/tx/columnshard/transactions/operators/backup.cpp +++ b/ydb/core/tx/columnshard/transactions/operators/backup.cpp @@ -5,7 +5,7 @@ namespace NKikimr::NColumnShard { -bool TBackupTransactionOperator::Parse(TColumnShard& owner, const TString& data) { +bool TBackupTransactionOperator::DoParse(TColumnShard& owner, const TString& data) { NKikimrTxColumnShard::TBackupTxBody txBody; if (!txBody.ParseFromString(data)) { return false; @@ -29,7 +29,7 @@ bool TBackupTransactionOperator::Parse(TColumnShard& owner, const TString& data) return false; } NArrow::NSerialization::TSerializerContainer serializer(std::make_shared()); - ExportTask = std::make_shared(id.DetachResult(), selector.DetachResult(), storeInitializer.DetachResult(), serializer); + ExportTask = std::make_shared(id.DetachResult(), selector.DetachResult(), storeInitializer.DetachResult(), serializer, GetTxId()); NOlap::NBackground::TTask task(::ToString(ExportTask->GetIdentifier().GetPathId()), std::make_shared(), ExportTask); TxAddTask = owner.GetBackgroundSessionsManager()->TxAddTask(task); if (!TxAddTask) { @@ -39,36 +39,23 @@ bool TBackupTransactionOperator::Parse(TColumnShard& owner, const TString& data) return true; } -TBackupTransactionOperator::TProposeResult TBackupTransactionOperator::ExecuteOnPropose(TColumnShard& /*owner*/, NTabletFlatExecutor::TTransactionContext& txc) const { +TBackupTransactionOperator::TProposeResult TBackupTransactionOperator::DoStartProposeOnExecute(TColumnShard& /*owner*/, NTabletFlatExecutor::TTransactionContext& txc) { AFL_VERIFY(!!TxAddTask); AFL_VERIFY(TxAddTask->Execute(txc, NActors::TActivationContext::AsActorContext())); return TProposeResult(); } -bool TBackupTransactionOperator::CompleteOnPropose(TColumnShard& /*owner*/, const TActorContext& ctx) const { +void TBackupTransactionOperator::DoStartProposeOnComplete(TColumnShard& /*owner*/, const TActorContext& ctx) { AFL_VERIFY(!!TxAddTask); TxAddTask->Complete(ctx); - return true; + TxAddTask.reset(); } -bool TBackupTransactionOperator::ExecuteOnProgress(TColumnShard& owner, const NOlap::TSnapshot& /*version*/, NTabletFlatExecutor::TTransactionContext& txc) { - AFL_VERIFY(ExportTask); - if (!TxConfirm) { - auto control = ExportTask->BuildConfirmControl(); - TxConfirm = owner.GetBackgroundSessionsManager()->TxApplyControl(control); - } - return TxConfirm->Execute(txc, NActors::TActivationContext::AsActorContext()); +bool TBackupTransactionOperator::ExecuteOnProgress(TColumnShard& /*owner*/, const NOlap::TSnapshot& /*version*/, NTabletFlatExecutor::TTransactionContext& /*txc*/) { + return true; } -bool TBackupTransactionOperator::CompleteOnProgress(TColumnShard& owner, const TActorContext& ctx) { - AFL_VERIFY(ExportTask); - AFL_VERIFY(!!TxConfirm); - TxConfirm->Complete(ctx); - - auto result = std::make_unique( - owner.TabletID(), TxInfo.TxKind, GetTxId(), NKikimrTxColumnShard::SUCCESS); - result->Record.SetStep(TxInfo.PlanStep); - ctx.Send(TxInfo.Source, result.release(), 0, TxInfo.Cookie); +bool TBackupTransactionOperator::CompleteOnProgress(TColumnShard& /*owner*/, const TActorContext& /*ctx*/) { return true; } diff --git a/ydb/core/tx/columnshard/transactions/operators/backup.h b/ydb/core/tx/columnshard/transactions/operators/backup.h index f4ea54b482d3..24c41ff9d0e0 100644 --- a/ydb/core/tx/columnshard/transactions/operators/backup.h +++ b/ydb/core/tx/columnshard/transactions/operators/backup.h @@ -1,39 +1,47 @@ #pragma once +#include "ss_operation.h" #include #include namespace NKikimr::NColumnShard { - class TBackupTransactionOperator : public TTxController::ITransactionOperator { - private: - std::shared_ptr ExportTask; - std::unique_ptr TxAddTask; - std::unique_ptr TxConfirm; - std::unique_ptr TxAbort; - using TBase = TTxController::ITransactionOperator; - using TProposeResult = TTxController::TProposeResult; - static inline auto Registrator = TFactory::TRegistrator(NKikimrTxColumnShard::TX_KIND_BACKUP); - public: - using TBase::TBase; - - virtual bool AllowTxDups() const override { - return true; - } - - virtual bool Parse(TColumnShard& owner, const TString& data) override; - - virtual TProposeResult ExecuteOnPropose(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) const override; - virtual bool CompleteOnPropose(TColumnShard& /*owner*/, const TActorContext& /*ctx*/) const override; - - virtual bool ExecuteOnProgress(TColumnShard& owner, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc) override; - - virtual bool CompleteOnProgress(TColumnShard& owner, const TActorContext& ctx) override; - - virtual bool ExecuteOnAbort(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) override; - virtual bool CompleteOnAbort(TColumnShard& /*owner*/, const TActorContext& /*ctx*/) override { - return true; - } - }; - +class TBackupTransactionOperator: public ISSTransactionOperator { +private: + using TBase = ISSTransactionOperator; + + std::shared_ptr ExportTask; + std::unique_ptr TxAddTask; + std::unique_ptr TxConfirm; + std::unique_ptr TxAbort; + using TProposeResult = TTxController::TProposeResult; + static inline auto Registrator = TFactory::TRegistrator(NKikimrTxColumnShard::TX_KIND_BACKUP); + + virtual TTxController::TProposeResult DoStartProposeOnExecute(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) override; + virtual void DoStartProposeOnComplete(TColumnShard& /*owner*/, const TActorContext& /*ctx*/) override; + virtual void DoFinishProposeOnExecute(TColumnShard& /*owner*/, NTabletFlatExecutor::TTransactionContext& /*txc*/) override { + } + virtual void DoFinishProposeOnComplete(TColumnShard& /*owner*/, const TActorContext& /*ctx*/) override { + } + virtual bool DoIsAsync() const override { + return true; + } + virtual bool DoParse(TColumnShard& owner, const TString& data) override; +public: + using TBase::TBase; + + virtual bool AllowTxDups() const override { + return true; + } + + virtual bool ExecuteOnProgress(TColumnShard& owner, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc) override; + + virtual bool CompleteOnProgress(TColumnShard& owner, const TActorContext& ctx) override; + + virtual bool ExecuteOnAbort(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) override; + virtual bool CompleteOnAbort(TColumnShard& /*owner*/, const TActorContext& /*ctx*/) override { + return true; + } +}; } + diff --git a/ydb/core/tx/columnshard/transactions/operators/ev_write.cpp b/ydb/core/tx/columnshard/transactions/operators/ev_write.cpp index 927cfabadc19..3aa27316237f 100644 --- a/ydb/core/tx/columnshard/transactions/operators/ev_write.cpp +++ b/ydb/core/tx/columnshard/transactions/operators/ev_write.cpp @@ -1,3 +1,4 @@ #include "ev_write.h" -namespace NKikimr::NColumnShard {} +namespace NKikimr::NColumnShard { +} diff --git a/ydb/core/tx/columnshard/transactions/operators/ev_write.h b/ydb/core/tx/columnshard/transactions/operators/ev_write.h index 81e2ad35d8a5..b83014fd9083 100644 --- a/ydb/core/tx/columnshard/transactions/operators/ev_write.h +++ b/ydb/core/tx/columnshard/transactions/operators/ev_write.h @@ -8,10 +8,33 @@ namespace NKikimr::NColumnShard { using TBase = TTxController::ITransactionOperator; using TProposeResult = TTxController::TProposeResult; static inline auto Registrator = TFactory::TRegistrator(NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE); - public: - using TBase::TBase; + private: + virtual TProposeResult DoStartProposeOnExecute(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) override { + owner.OperationsManager->LinkTransaction(LockId, GetTxId(), txc); + return TProposeResult(); + } + virtual void DoStartProposeOnComplete(TColumnShard& /*owner*/, const TActorContext& /*ctx*/) override { - virtual bool Parse(TColumnShard& /*owner*/, const TString& data) override { + } + virtual void DoFinishProposeOnExecute(TColumnShard& /*owner*/, NTabletFlatExecutor::TTransactionContext& /*txc*/) override { + } + virtual void DoFinishProposeOnComplete(TColumnShard& /*owner*/, const TActorContext& /*ctx*/) override { + } + virtual bool DoIsAsync() const override { + return false; + } + virtual void DoSendReply(TColumnShard& owner, const TActorContext& ctx) override { + const auto& txInfo = GetTxInfo(); + std::unique_ptr evResult; + if (IsFail()) { + evResult = NEvents::TDataEvents::TEvWriteResult::BuildError(owner.TabletID(), txInfo.GetTxId(), NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, GetProposeStartInfoVerified().GetStatusMessage()); + } else { + evResult = NEvents::TDataEvents::TEvWriteResult::BuildPrepared(owner.TabletID(), txInfo.GetTxId(), owner.GetProgressTxController().BuildCoordinatorInfo(txInfo)); + } + ctx.Send(txInfo.Source, evResult.release(), 0, txInfo.Cookie); + } + + virtual bool DoParse(TColumnShard& /*owner*/, const TString& data) override { NKikimrTxColumnShard::TCommitWriteTxBody commitTxBody; if (!commitTxBody.ParseFromString(data)) { return false; @@ -20,14 +43,8 @@ namespace NKikimr::NColumnShard { return !!LockId; } - TProposeResult ExecuteOnPropose(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) const override { - owner.OperationsManager->LinkTransaction(LockId, GetTxId(), txc); - return TProposeResult(); - } - - bool CompleteOnPropose(TColumnShard& /*owner*/, const TActorContext& /*ctx*/) const override { - return true; - } + public: + using TBase::TBase; virtual bool ExecuteOnProgress(TColumnShard& owner, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc) override { return owner.OperationsManager->CommitTransaction(owner, GetTxId(), txc, version); diff --git a/ydb/core/tx/columnshard/transactions/operators/long_tx_write.cpp b/ydb/core/tx/columnshard/transactions/operators/long_tx_write.cpp index e10c1dc4a71d..784ebc3eda71 100644 --- a/ydb/core/tx/columnshard/transactions/operators/long_tx_write.cpp +++ b/ydb/core/tx/columnshard/transactions/operators/long_tx_write.cpp @@ -1,3 +1,41 @@ #include "long_tx_write.h" -namespace NKikimr::NColumnShard {} +namespace NKikimr::NColumnShard { + +TLongTxTransactionOperator::TProposeResult TLongTxTransactionOperator::DoStartProposeOnExecute(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& /*txc*/) { + if (WriteIds.empty()) { + return TProposeResult(NKikimrTxColumnShard::EResultStatus::ERROR, + TStringBuilder() << "Commit TxId# " << GetTxId() << " has an empty list of write ids"); + } + + for (auto&& writeId : WriteIds) { + if (!owner.LongTxWrites.contains(writeId)) { + return TProposeResult(NKikimrTxColumnShard::EResultStatus::ERROR, + TStringBuilder() << "Commit TxId# " << GetTxId() << " references WriteId# " << (ui64)writeId << " that no longer exists"); + } + auto& lw = owner.LongTxWrites[writeId]; + if (lw.PreparedTxId != 0) { + return TProposeResult(NKikimrTxColumnShard::EResultStatus::ERROR, + TStringBuilder() << "Commit TxId# " << GetTxId() << " references WriteId# " << (ui64)writeId << " that is already locked by TxId# " << lw.PreparedTxId); + } + } + + for (auto&& writeId : WriteIds) { + owner.AddLongTxWrite(writeId, GetTxId()); + } + return TProposeResult(); +} + +bool TLongTxTransactionOperator::DoParse(TColumnShard& /*owner*/, const TString& data) { + NKikimrTxColumnShard::TCommitTxBody commitTxBody; + if (!commitTxBody.ParseFromString(data)) { + return false; + } + + for (auto& id : commitTxBody.GetWriteIds()) { + WriteIds.insert(TWriteId{ id }); + } + return true; +} + +} diff --git a/ydb/core/tx/columnshard/transactions/operators/long_tx_write.h b/ydb/core/tx/columnshard/transactions/operators/long_tx_write.h index 69622fc5031a..fca206f161bf 100644 --- a/ydb/core/tx/columnshard/transactions/operators/long_tx_write.h +++ b/ydb/core/tx/columnshard/transactions/operators/long_tx_write.h @@ -1,27 +1,29 @@ #pragma once +#include "ss_operation.h" #include namespace NKikimr::NColumnShard { - class TLongTxTransactionOperator : public TTxController::ITransactionOperator { - using TBase = TTxController::ITransactionOperator; + class TLongTxTransactionOperator : public ISSTransactionOperator { + using TBase = ISSTransactionOperator; using TProposeResult = TTxController::TProposeResult; static inline auto Registrator = TFactory::TRegistrator(NKikimrTxColumnShard::TX_KIND_COMMIT); - public: - using TBase::TBase; - - bool Parse(TColumnShard& /*owner*/, const TString& data) override { - NKikimrTxColumnShard::TCommitTxBody commitTxBody; - if (!commitTxBody.ParseFromString(data)) { - return false; - } + private: + virtual TProposeResult DoStartProposeOnExecute(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) override; + virtual void DoStartProposeOnComplete(TColumnShard& /*owner*/, const TActorContext& /*ctx*/) override { - for (auto& id : commitTxBody.GetWriteIds()) { - WriteIds.insert(TWriteId{id}); - } - return true; } + virtual void DoFinishProposeOnExecute(TColumnShard& /*owner*/, NTabletFlatExecutor::TTransactionContext& /*txc*/) override { + } + virtual void DoFinishProposeOnComplete(TColumnShard& /*owner*/, const TActorContext& /*ctx*/) override { + } + virtual bool DoIsAsync() const override { + return false; + } + virtual bool DoParse(TColumnShard& owner, const TString& data) override; + public: + using TBase::TBase; void OnTabletInit(TColumnShard& owner) override { for (auto&& writeId : WriteIds) { @@ -31,34 +33,6 @@ namespace NKikimr::NColumnShard { } } - TProposeResult ExecuteOnPropose(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& /*txc*/) const override { - if (WriteIds.empty()) { - return TProposeResult(NKikimrTxColumnShard::EResultStatus::ERROR, - TStringBuilder() << "Commit TxId# " << GetTxId() << " has an empty list of write ids"); - } - - for (auto&& writeId : WriteIds) { - if (!owner.LongTxWrites.contains(writeId)) { - return TProposeResult(NKikimrTxColumnShard::EResultStatus::ERROR, - TStringBuilder() << "Commit TxId# " << GetTxId() << " references WriteId# " << (ui64)writeId << " that no longer exists"); - } - auto& lw = owner.LongTxWrites[writeId]; - if (lw.PreparedTxId != 0) { - return TProposeResult(NKikimrTxColumnShard::EResultStatus::ERROR, - TStringBuilder() << "Commit TxId# " << GetTxId() << " references WriteId# " << (ui64)writeId << " that is already locked by TxId# " << lw.PreparedTxId); - } - } - - for (auto&& writeId : WriteIds) { - owner.AddLongTxWrite(writeId, GetTxId()); - } - return TProposeResult();; - } - - bool CompleteOnPropose(TColumnShard& /*owner*/, const TActorContext& /*ctx*/) const override { - return true; - } - bool ExecuteOnProgress(TColumnShard& owner, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc) override { TBlobGroupSelector dsGroupSelector(owner.Info()); NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector); diff --git a/ydb/core/tx/columnshard/transactions/operators/schema.cpp b/ydb/core/tx/columnshard/transactions/operators/schema.cpp index b9979fd754a5..bd3d51394dcc 100644 --- a/ydb/core/tx/columnshard/transactions/operators/schema.cpp +++ b/ydb/core/tx/columnshard/transactions/operators/schema.cpp @@ -1,3 +1,128 @@ #include "schema.h" -namespace NKikimr::NColumnShard {} +namespace NKikimr::NColumnShard { + +NKikimr::NColumnShard::TTxController::TProposeResult TSchemaTransactionOperator::DoStartProposeOnExecute(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) { + switch (SchemaTxBody.TxBody_case()) { + case NKikimrTxColumnShard::TSchemaTxBody::kInitShard: + { + auto validationStatus = ValidateTables(SchemaTxBody.GetInitShard().GetTables()); + if (validationStatus.IsFail()) { + return TProposeResult(NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR, "Invalid schema: " + validationStatus.GetErrorMessage()); + } + } + break; + case NKikimrTxColumnShard::TSchemaTxBody::kEnsureTables: + { + auto validationStatus = ValidateTables(SchemaTxBody.GetEnsureTables().GetTables()); + if (validationStatus.IsFail()) { + return TProposeResult(NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR, "Invalid schema: " + validationStatus.GetErrorMessage()); + } + } + break; + case NKikimrTxColumnShard::TSchemaTxBody::kAlterTable: + case NKikimrTxColumnShard::TSchemaTxBody::kAlterStore: + case NKikimrTxColumnShard::TSchemaTxBody::kDropTable: + case NKikimrTxColumnShard::TSchemaTxBody::TXBODY_NOT_SET: + break; + } + + auto seqNo = SeqNoFromProto(SchemaTxBody.GetSeqNo()); + auto lastSeqNo = owner.LastSchemaSeqNo; + + // Check if proposal is outdated + if (seqNo < lastSeqNo) { + auto errorMessage = TStringBuilder() + << "Ignoring outdated schema tx proposal at tablet " + << owner.TabletID() + << " txId " << GetTxId() + << " ssId " << owner.CurrentSchemeShardId + << " seqNo " << seqNo + << " lastSeqNo " << lastSeqNo; + return TProposeResult(NKikimrTxColumnShard::EResultStatus::SCHEMA_CHANGED, errorMessage); + } + + owner.UpdateSchemaSeqNo(seqNo, txc); + return TProposeResult(); +} + +NKikimr::TConclusionStatus TSchemaTransactionOperator::ValidateTableSchema(const NKikimrSchemeOp::TColumnTableSchema& schema) const { + namespace NTypeIds = NScheme::NTypeIds; + static const THashSet pkSupportedTypes = { + NTypeIds::Timestamp, + NTypeIds::Date32, + NTypeIds::Datetime64, + NTypeIds::Timestamp64, + NTypeIds::Interval64, + NTypeIds::Int8, + NTypeIds::Int16, + NTypeIds::Int32, + NTypeIds::Int64, + NTypeIds::Uint8, + NTypeIds::Uint16, + NTypeIds::Uint32, + NTypeIds::Uint64, + NTypeIds::Date, + NTypeIds::Datetime, + //NTypeIds::Interval, + //NTypeIds::Float, + //NTypeIds::Double, + NTypeIds::String, + NTypeIds::Utf8 + }; + if (!schema.HasEngine() || + schema.GetEngine() != NKikimrSchemeOp::EColumnTableEngine::COLUMN_ENGINE_REPLACING_TIMESERIES) { + return TConclusionStatus::Fail("Invalid scheme engine: " + (schema.HasEngine() ? NKikimrSchemeOp::EColumnTableEngine_Name(schema.GetEngine()) : TString("No"))); + } + + if (!schema.KeyColumnNamesSize()) { + return TConclusionStatus::Fail("There is no key columns"); + } + + THashSet keyColumns(schema.GetKeyColumnNames().begin(), schema.GetKeyColumnNames().end()); + TVector columnErrors; + for (const NKikimrSchemeOp::TOlapColumnDescription& column : schema.GetColumns()) { + TString name = column.GetName(); + void* typeDescr = nullptr; + if (column.GetTypeId() == NTypeIds::Pg && column.HasTypeInfo()) { + typeDescr = NPg::TypeDescFromPgTypeId(column.GetTypeInfo().GetPgTypeId()); + } + + NScheme::TTypeInfo schemeType(column.GetTypeId(), typeDescr); + if (keyColumns.contains(name) && !pkSupportedTypes.contains(column.GetTypeId())) { + columnErrors.emplace_back("key column " + name + " has unsupported type " + column.GetTypeName()); + } + auto arrowType = NArrow::GetArrowType(schemeType); + if (!arrowType.ok()) { + columnErrors.emplace_back("column " + name + ": " + arrowType.status().ToString()); + } + keyColumns.erase(name); + } + if (!columnErrors.empty()) { + return TConclusionStatus::Fail("Column errors: " + JoinSeq("; ", columnErrors)); + } + + if (!keyColumns.empty()) { + return TConclusionStatus::Fail("Key columns not in scheme: " + JoinSeq(", ", keyColumns)); + } + return TConclusionStatus::Success(); +} + +NKikimr::TConclusionStatus TSchemaTransactionOperator::ValidateTables(::google::protobuf::RepeatedPtrField<::NKikimrTxColumnShard::TCreateTable> tables) const { + for (auto& table : tables) { + if (table.HasSchemaPreset()) { + const auto validationStatus = ValidateTablePreset(table.GetSchemaPreset()); + if (validationStatus.IsFail()) { + return validationStatus; + } + } + if (table.HasSchema()) { + const auto validationStatus = ValidateTableSchema(table.GetSchema()); + if (validationStatus.IsFail()) { + return validationStatus; + } + } + } return TConclusionStatus::Success(); +} + +} diff --git a/ydb/core/tx/columnshard/transactions/operators/schema.h b/ydb/core/tx/columnshard/transactions/operators/schema.h index 9aecc6f10fe8..a4aa5bc827fe 100644 --- a/ydb/core/tx/columnshard/transactions/operators/schema.h +++ b/ydb/core/tx/columnshard/transactions/operators/schema.h @@ -1,13 +1,32 @@ #pragma once +#include "ss_operation.h" #include namespace NKikimr::NColumnShard { - class TSchemaTransactionOperator : public TTxController::ITransactionOperator { - using TBase = TTxController::ITransactionOperator; + class TSchemaTransactionOperator : public ISSTransactionOperator { + private: + using TBase = ISSTransactionOperator; + using TProposeResult = TTxController::TProposeResult; static inline auto Registrator = TFactory::TRegistrator(NKikimrTxColumnShard::TX_KIND_SCHEMA); + + virtual TTxController::TProposeResult DoStartProposeOnExecute(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) override; + virtual void DoStartProposeOnComplete(TColumnShard& /*owner*/, const TActorContext& /*ctx*/) override { + AFL_VERIFY(false)("error", "method not implemented for non-async operator by default"); + } + virtual void DoFinishProposeOnExecute(TColumnShard& /*owner*/, NTabletFlatExecutor::TTransactionContext& /*txc*/) override { + AFL_VERIFY(false)("error", "method not implemented for non-async operator by default"); + } + virtual void DoFinishProposeOnComplete(TColumnShard& /*owner*/, const TActorContext& /*ctx*/) override { + } + virtual bool DoIsAsync() const override { + return false; + } + virtual bool DoParse(TColumnShard& /*owner*/, const TString& data) override { + return SchemaTxBody.ParseFromString(data); + } public: using TBase::TBase; @@ -15,65 +34,10 @@ namespace NKikimr::NColumnShard { return true; } - virtual bool Parse(TColumnShard& /*owner*/, const TString& data) override { - if (!SchemaTxBody.ParseFromString(data)) { - return false; - } - return true; - } - bool TxWithDeadline() const override { return false; } - TProposeResult ExecuteOnPropose(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) const override { - switch (SchemaTxBody.TxBody_case()) { - case NKikimrTxColumnShard::TSchemaTxBody::kInitShard: - { - auto validationStatus = ValidateTables(SchemaTxBody.GetInitShard().GetTables()); - if (validationStatus.IsFail()) { - return TProposeResult(NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR, "Invalid schema: " + validationStatus.GetErrorMessage()); - } - } - break; - case NKikimrTxColumnShard::TSchemaTxBody::kEnsureTables: - { - auto validationStatus = ValidateTables(SchemaTxBody.GetEnsureTables().GetTables()); - if (validationStatus.IsFail()) { - return TProposeResult(NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR, "Invalid schema: " + validationStatus.GetErrorMessage()); - } - } - break; - case NKikimrTxColumnShard::TSchemaTxBody::kAlterTable: - case NKikimrTxColumnShard::TSchemaTxBody::kAlterStore: - case NKikimrTxColumnShard::TSchemaTxBody::kDropTable: - case NKikimrTxColumnShard::TSchemaTxBody::TXBODY_NOT_SET: - break; - } - - auto seqNo = SeqNoFromProto(SchemaTxBody.GetSeqNo()); - auto lastSeqNo = owner.LastSchemaSeqNo; - - // Check if proposal is outdated - if (seqNo < lastSeqNo) { - auto errorMessage = TStringBuilder() - << "Ignoring outdated schema tx proposal at tablet " - << owner.TabletID() - << " txId " << GetTxId() - << " ssId " << owner.CurrentSchemeShardId - << " seqNo " << seqNo - << " lastSeqNo " << lastSeqNo; - return TProposeResult(NKikimrTxColumnShard::EResultStatus::SCHEMA_CHANGED, errorMessage); - } - - owner.UpdateSchemaSeqNo(seqNo, txc); - return TProposeResult(); - } - - virtual bool CompleteOnPropose(TColumnShard& /*owner*/, const TActorContext& /*ctx*/) const override { - return true; - } - virtual bool ExecuteOnProgress(TColumnShard& owner, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc) override { owner.RunSchemaTx(SchemaTxBody, version, txc); owner.ProtectSchemaSeqNo(SchemaTxBody.GetSeqNo(), txc); @@ -105,84 +69,9 @@ namespace NKikimr::NColumnShard { } private: - TConclusionStatus ValidateTables(::google::protobuf::RepeatedPtrField<::NKikimrTxColumnShard::TCreateTable> tables) const { - for (auto& table : tables) { - if (table.HasSchemaPreset()) { - const auto validationStatus = ValidateTablePreset(table.GetSchemaPreset()); - if (validationStatus.IsFail()) { - return validationStatus; - } - } - if (table.HasSchema()) { - const auto validationStatus = ValidateTableSchema(table.GetSchema()); - if (validationStatus.IsFail()) { - return validationStatus; - } - } - } return TConclusionStatus::Success(); - } - - TConclusionStatus ValidateTableSchema(const NKikimrSchemeOp::TColumnTableSchema& schema) const { - namespace NTypeIds = NScheme::NTypeIds; - static const THashSet pkSupportedTypes = { - NTypeIds::Timestamp, - NTypeIds::Date32, - NTypeIds::Datetime64, - NTypeIds::Timestamp64, - NTypeIds::Interval64, - NTypeIds::Int8, - NTypeIds::Int16, - NTypeIds::Int32, - NTypeIds::Int64, - NTypeIds::Uint8, - NTypeIds::Uint16, - NTypeIds::Uint32, - NTypeIds::Uint64, - NTypeIds::Date, - NTypeIds::Datetime, - //NTypeIds::Interval, - //NTypeIds::Float, - //NTypeIds::Double, - NTypeIds::String, - NTypeIds::Utf8 - }; - if (!schema.HasEngine() || - schema.GetEngine() != NKikimrSchemeOp::EColumnTableEngine::COLUMN_ENGINE_REPLACING_TIMESERIES) { - return TConclusionStatus::Fail("Invalid scheme engine: " + (schema.HasEngine() ? NKikimrSchemeOp::EColumnTableEngine_Name(schema.GetEngine()) : TString("No"))); - } - - if (!schema.KeyColumnNamesSize()) { - return TConclusionStatus::Fail("There is no key columns"); - } - - THashSet keyColumns(schema.GetKeyColumnNames().begin(), schema.GetKeyColumnNames().end()); - TVector columnErrors; - for (const NKikimrSchemeOp::TOlapColumnDescription& column : schema.GetColumns()) { - TString name = column.GetName(); - void* typeDescr = nullptr; - if (column.GetTypeId() == NTypeIds::Pg && column.HasTypeInfo()) { - typeDescr = NPg::TypeDescFromPgTypeId(column.GetTypeInfo().GetPgTypeId()); - } - - NScheme::TTypeInfo schemeType(column.GetTypeId(), typeDescr); - if (keyColumns.contains(name) && !pkSupportedTypes.contains(column.GetTypeId())) { - columnErrors.emplace_back("key column " + name + " has unsupported type " + column.GetTypeName()); - } - auto arrowType = NArrow::GetArrowType(schemeType); - if (!arrowType.ok()) { - columnErrors.emplace_back("column " + name + ": " + arrowType.status().ToString()); - } - keyColumns.erase(name); - } - if (!columnErrors.empty()) { - return TConclusionStatus::Fail("Column errors: " + JoinSeq("; ", columnErrors)); - } + TConclusionStatus ValidateTables(::google::protobuf::RepeatedPtrField<::NKikimrTxColumnShard::TCreateTable> tables) const; - if (!keyColumns.empty()) { - return TConclusionStatus::Fail("Key columns not in scheme: " + JoinSeq(", ", keyColumns)); - } - return TConclusionStatus::Success(); - } + TConclusionStatus ValidateTableSchema(const NKikimrSchemeOp::TColumnTableSchema& schema) const; TConclusionStatus ValidateTablePreset(const NKikimrSchemeOp::TColumnTableSchemaPreset& preset) const { if (preset.HasName() && preset.GetName() != "default") { diff --git a/ydb/core/tx/columnshard/transactions/operators/sharing.cpp b/ydb/core/tx/columnshard/transactions/operators/sharing.cpp new file mode 100644 index 000000000000..936142e11be3 --- /dev/null +++ b/ydb/core/tx/columnshard/transactions/operators/sharing.cpp @@ -0,0 +1,80 @@ +#include "sharing.h" +#include +#include +#include + +namespace NKikimr::NColumnShard { + +bool TSharingTransactionOperator::DoParse(TColumnShard& owner, const TString& data) { + NKikimrColumnShardDataSharingProto::TDestinationSession txBody; + SharingSessionsManager = owner.GetSharingSessionsManager(); + + if (!txBody.ParseFromString(data)) { + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("reason", "cannot parse string as proto"); + return false; + } + AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("process", "BlobsSharing")("event", "TEvProposeFromInitiator"); + SharingTask = std::make_shared(); + auto conclusion = SharingTask->DeserializeDataFromProto(txBody, owner.GetIndexAs()); + if (!conclusion) { + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "cannot_parse_start_data_sharing_from_initiator")("error", conclusion.GetErrorMessage()); + return false; + } + + auto currentSession = SharingSessionsManager->GetDestinationSession(SharingTask->GetSessionId()); + if (currentSession) { + SharingTask = currentSession; + } else { + SharingTask->Confirm(); + } + + TxPropose = SharingSessionsManager->ProposeDestSession(&owner, SharingTask); + + return true; +} + +TSharingTransactionOperator::TProposeResult TSharingTransactionOperator::DoStartProposeOnExecute(TColumnShard& /*owner*/, NTabletFlatExecutor::TTransactionContext& txc) { + AFL_VERIFY(!!TxPropose); + AFL_VERIFY(TxPropose->Execute(txc, NActors::TActivationContext::AsActorContext())); + return TProposeResult(); +} + +void TSharingTransactionOperator::DoStartProposeOnComplete(TColumnShard& /*owner*/, const TActorContext& ctx) { + AFL_VERIFY(!!TxPropose); + TxPropose->Complete(ctx); + TxPropose.release(); +} + +bool TSharingTransactionOperator::ExecuteOnProgress(TColumnShard& /*owner*/, const NOlap::TSnapshot& /*version*/, NTabletFlatExecutor::TTransactionContext& /*txc*/) { + return true; +} + +bool TSharingTransactionOperator::CompleteOnProgress(TColumnShard& owner, const TActorContext& ctx) { + for (TActorId subscriber : NotifySubscribers) { + auto event = MakeHolder(owner.TabletID(), GetTxId()); + ctx.Send(subscriber, event.Release(), 0, 0); + } + return true; +} + +bool TSharingTransactionOperator::ExecuteOnAbort(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) { + if (!SharingTask) { + return true; + } + if (!TxAbort) { + TxAbort = SharingTask->AckInitiatorFinished(&owner, SharingTask).DetachResult(); + } + TxAbort->Execute(txc, NActors::TActivationContext::AsActorContext()); + return true; +} + +bool TSharingTransactionOperator::CompleteOnAbort(TColumnShard& /*owner*/, const TActorContext& ctx) { + if (!SharingTask) { + return true; + } + AFL_VERIFY(!!TxAbort); + TxAbort->Complete(ctx); + return true; +} + +} diff --git a/ydb/core/tx/columnshard/transactions/operators/sharing.h b/ydb/core/tx/columnshard/transactions/operators/sharing.h new file mode 100644 index 000000000000..4675129f1a1e --- /dev/null +++ b/ydb/core/tx/columnshard/transactions/operators/sharing.h @@ -0,0 +1,50 @@ +#pragma once + +#include "ss_operation.h" +#include +#include + +namespace NKikimr::NColumnShard { + +class TSharingTransactionOperator: public ISSTransactionOperator { +private: + using TBase = ISSTransactionOperator; + + std::shared_ptr SharingSessionsManager; + std::shared_ptr SharingTask; + using TProposeResult = TTxController::TProposeResult; + mutable std::unique_ptr TxPropose; + mutable std::unique_ptr TxConfirm; + mutable std::unique_ptr TxAbort; + static inline auto Registrator = TFactory::TRegistrator(NKikimrTxColumnShard::TX_KIND_SHARING); + THashSet NotifySubscribers; + virtual TTxController::TProposeResult DoStartProposeOnExecute(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) override; + virtual void DoStartProposeOnComplete(TColumnShard& /*owner*/, const TActorContext& /*ctx*/) override; + virtual void DoFinishProposeOnExecute(TColumnShard& /*owner*/, NTabletFlatExecutor::TTransactionContext& /*txc*/) override { + } + virtual void DoFinishProposeOnComplete(TColumnShard& /*owner*/, const TActorContext& /*ctx*/) override { + } + virtual bool DoIsAsync() const override { + return true; + } + virtual bool DoParse(TColumnShard& owner, const TString& data) override; +public: + using TBase::TBase; + virtual void RegisterSubscriber(const TActorId& actorId) override { + NotifySubscribers.insert(actorId); + } + + virtual bool AllowTxDups() const override { + return true; + } + + + virtual bool ExecuteOnProgress(TColumnShard& owner, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc) override; + + virtual bool CompleteOnProgress(TColumnShard& owner, const TActorContext& ctx) override; + + virtual bool ExecuteOnAbort(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) override; + virtual bool CompleteOnAbort(TColumnShard& owner, const TActorContext& ctx) override; +}; + +} diff --git a/ydb/core/tx/columnshard/transactions/operators/ss_operation.cpp b/ydb/core/tx/columnshard/transactions/operators/ss_operation.cpp new file mode 100644 index 000000000000..1bf60d44d373 --- /dev/null +++ b/ydb/core/tx/columnshard/transactions/operators/ss_operation.cpp @@ -0,0 +1,23 @@ +#include "ss_operation.h" + +namespace NKikimr::NColumnShard { + +void ISSTransactionOperator::DoSendReply(TColumnShard& owner, const TActorContext& ctx) { + const auto& txInfo = GetTxInfo(); + std::unique_ptr evResult = std::make_unique( + owner.TabletID(), txInfo.TxKind, txInfo.TxId, GetProposeStartInfoVerified().GetStatus(), GetProposeStartInfoVerified().GetStatusMessage()); + if (IsFail()) { + owner.IncCounter(COUNTER_PREPARE_ERROR); + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("message", GetProposeStartInfoVerified().GetStatusMessage())("tablet_id", owner.TabletID())("tx_id", txInfo.TxId); + } else { + evResult->Record.SetMinStep(txInfo.MinStep); + evResult->Record.SetMaxStep(txInfo.MaxStep); + if (owner.ProcessingParams) { + evResult->Record.MutableDomainCoordinators()->CopyFrom(owner.ProcessingParams->GetCoordinators()); + } + owner.IncCounter(COUNTER_PREPARE_SUCCESS); + } + ctx.Send(txInfo.Source, evResult.release()); +} + +} diff --git a/ydb/core/tx/columnshard/transactions/operators/ss_operation.h b/ydb/core/tx/columnshard/transactions/operators/ss_operation.h new file mode 100644 index 000000000000..feff6af77225 --- /dev/null +++ b/ydb/core/tx/columnshard/transactions/operators/ss_operation.h @@ -0,0 +1,17 @@ +#pragma once + +#include +#include + +namespace NKikimr::NColumnShard { + +class ISSTransactionOperator: public TTxController::ITransactionOperator { +private: + using TBase = TTxController::ITransactionOperator; +protected: + virtual void DoSendReply(TColumnShard& owner, const TActorContext& ctx) override; +public: + using TBase::TBase; +}; + +} diff --git a/ydb/core/tx/columnshard/transactions/operators/ya.make b/ydb/core/tx/columnshard/transactions/operators/ya.make index 134bb52136d4..7bb012aad1e1 100644 --- a/ydb/core/tx/columnshard/transactions/operators/ya.make +++ b/ydb/core/tx/columnshard/transactions/operators/ya.make @@ -5,6 +5,8 @@ SRCS( GLOBAL long_tx_write.cpp GLOBAL ev_write.cpp GLOBAL backup.cpp + GLOBAL sharing.cpp + ss_operation.cpp ) PEERDIR( diff --git a/ydb/core/tx/columnshard/transactions/tx_controller.cpp b/ydb/core/tx/columnshard/transactions/tx_controller.cpp index 19edc03b0a80..976713d02c05 100644 --- a/ydb/core/tx/columnshard/transactions/tx_controller.cpp +++ b/ydb/core/tx/columnshard/transactions/tx_controller.cpp @@ -23,7 +23,7 @@ ui64 TTxController::GetAllowedStep() const { } ui64 TTxController::GetMemoryUsage() const { - return BasicTxInfo.size() * sizeof(TTxInfo) + + return Operators.size() * (sizeof(TTxController::ITransactionOperator) + 24) + DeadlineQueue.size() * sizeof(TPlanQueueItem) + (PlanQueue.size() + RunningQueue.size()) * sizeof(TPlanQueueItem); } @@ -47,9 +47,12 @@ bool TTxController::Load(NTabletFlatExecutor::TTransactionContext& txc) { while (!rowset.EndOfSet()) { const ui64 txId = rowset.GetValue(); const NKikimrTxColumnShard::ETransactionKind txKind = rowset.GetValue(); + ITransactionOperator::TPtr txOperator(ITransactionOperator::TFactory::Construct(txKind, TTxInfo(txKind, txId))); + Y_ABORT_UNLESS(!!txOperator); + const TString txBody = rowset.GetValue(); + Y_ABORT_UNLESS(txOperator->Parse(Owner, txBody)); - auto txInfoIt = BasicTxInfo.emplace(txId, TTxInfo(txKind, txId)).first; - auto& txInfo = txInfoIt->second; + auto& txInfo = txOperator->MutableTxInfo(); txInfo.MaxStep = rowset.GetValue(); if (txInfo.MaxStep != Max()) { txInfo.MinStep = txInfo.MaxStep - MaxCommitTxDelay.MilliSeconds(); @@ -63,12 +66,7 @@ bool TTxController::Load(NTabletFlatExecutor::TTransactionContext& txc) { } else if (txInfo.MaxStep != Max()) { DeadlineQueue.emplace(txInfo.MaxStep, txInfo.TxId); } - - const TString txBody = rowset.GetValue(); - ITransactionOperator::TPtr txOperator(ITransactionOperator::TFactory::Construct(txInfo.TxKind, txInfo)); - Y_ABORT_UNLESS(!!txOperator); - Y_ABORT_UNLESS(txOperator->Parse(Owner, txBody)); - Operators[txId] = txOperator; + AFL_VERIFY(Operators.emplace(txId, txOperator).second); if (!rowset.Next()) { return false; @@ -91,59 +89,50 @@ TTxController::ITransactionOperator::TPtr TTxController::GetVerifiedTxOperator(c return it->second; } -TTxController::TTxInfo TTxController::RegisterTx(const ui64 txId, const NKikimrTxColumnShard::ETransactionKind& txKind, const TString& txBody, const TActorId& source, const ui64 cookie, NTabletFlatExecutor::TTransactionContext& txc) { - NIceDb::TNiceDb db(txc.DB); - - auto txInfoIt = BasicTxInfo.emplace(txId, TTxInfo(txKind, txId)).first; - auto& txInfo = txInfoIt->second; +std::shared_ptr TTxController::UpdateTxSourceInfo(const ui64 txId, const TActorId& source, const ui64 cookie, NTabletFlatExecutor::TTransactionContext& txc) { + auto op = GetVerifiedTxOperator(txId); + auto& txInfo = op->MutableTxInfo(); txInfo.Source = source; txInfo.Cookie = cookie; - ITransactionOperator::TPtr txOperator(ITransactionOperator::TFactory::Construct(txInfo.TxKind, txInfo)); - Y_ABORT_UNLESS(!!txOperator); - Y_ABORT_UNLESS(txOperator->Parse(Owner, txBody)); - Operators[txId] = txOperator; + NIceDb::TNiceDb db(txc.DB); + Schema::UpdateTxInfoSource(db, txId, txInfo.Source, txInfo.Cookie); + return op; +} + +TTxController::TTxInfo TTxController::RegisterTx(const std::shared_ptr& txOperator, const TString& txBody, NTabletFlatExecutor::TTransactionContext& txc) { + NIceDb::TNiceDb db(txc.DB); + auto& txInfo = txOperator->GetTxInfo(); + AFL_VERIFY(Operators.emplace(txInfo.TxId, txOperator).second); Schema::SaveTxInfo(db, txInfo.TxId, txInfo.TxKind, txBody, Max(), txInfo.Source, txInfo.Cookie); return txInfo; } -TTxController::TTxInfo TTxController::RegisterTxWithDeadline(const ui64 txId, const NKikimrTxColumnShard::ETransactionKind& txKind, const TString& txBody, const TActorId& source, const ui64 cookie, NTabletFlatExecutor::TTransactionContext& txc) { +TTxController::TTxInfo TTxController::RegisterTxWithDeadline(const std::shared_ptr& txOperator, const TString& txBody, NTabletFlatExecutor::TTransactionContext& txc) { NIceDb::TNiceDb db(txc.DB); - auto txInfoIt = BasicTxInfo.emplace(txId, TTxInfo(txKind, txId)).first; - auto& txInfo = txInfoIt->second; - txInfo.Source = source; - txInfo.Cookie = cookie; + auto& txInfo = txOperator->MutableTxInfo(); txInfo.MinStep = GetAllowedStep(); txInfo.MaxStep = txInfo.MinStep + MaxCommitTxDelay.MilliSeconds(); - ITransactionOperator::TPtr txOperator(ITransactionOperator::TFactory::Construct(txInfo.TxKind, txInfo)); - Y_ABORT_UNLESS(!!txOperator); - Y_ABORT_UNLESS(txOperator->Parse(Owner, txBody)); - Operators[txId] = txOperator; + AFL_VERIFY(Operators.emplace(txOperator->GetTxId(), txOperator).second); Schema::SaveTxInfo(db, txInfo.TxId, txInfo.TxKind, txBody, txInfo.MaxStep, txInfo.Source, txInfo.Cookie); - DeadlineQueue.emplace(txInfo.MaxStep, txId); + DeadlineQueue.emplace(txInfo.MaxStep, txOperator->GetTxId()); return txInfo; } bool TTxController::AbortTx(const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc) { - auto it = BasicTxInfo.find(txId); - if (it == BasicTxInfo.end()) { - return true; - } - Y_ABORT_UNLESS(it->second.PlanStep == 0); - auto opIt = Operators.find(txId); Y_ABORT_UNLESS(opIt != Operators.end()); + Y_ABORT_UNLESS(opIt->second->GetTxInfo().PlanStep == 0); opIt->second->ExecuteOnAbort(Owner, txc); opIt->second->CompleteOnAbort(Owner, NActors::TActivationContext::AsActorContext()); - if (it->second.MaxStep != Max()) { - DeadlineQueue.erase(TPlanQueueItem(it->second.MaxStep, txId)); + if (opIt->second->GetTxInfo().MaxStep != Max()) { + DeadlineQueue.erase(TPlanQueueItem(opIt->second->GetTxInfo().MaxStep, txId)); } - BasicTxInfo.erase(it); Operators.erase(txId); NIceDb::TNiceDb db(txc.DB); Schema::EraseTxInfo(db, txId); @@ -151,38 +140,33 @@ bool TTxController::AbortTx(const ui64 txId, NTabletFlatExecutor::TTransactionCo } bool TTxController::CompleteOnCancel(const ui64 txId, const TActorContext& ctx) { - auto it = BasicTxInfo.find(txId); - if (it == BasicTxInfo.end()) { + auto opIt = Operators.find(txId); + if (opIt == Operators.end()) { return true; } - if (it->second.PlanStep != 0) { + Y_ABORT_UNLESS(opIt != Operators.end()); + if (opIt->second->GetTxInfo().PlanStep != 0) { return false; } - - auto opIt = Operators.find(txId); - Y_ABORT_UNLESS(opIt != Operators.end()); opIt->second->CompleteOnAbort(Owner, ctx); - if (it->second.MaxStep != Max()) { - DeadlineQueue.erase(TPlanQueueItem(it->second.MaxStep, txId)); + if (opIt->second->GetTxInfo().MaxStep != Max()) { + DeadlineQueue.erase(TPlanQueueItem(opIt->second->GetTxInfo().MaxStep, txId)); } - BasicTxInfo.erase(it); Operators.erase(txId); return true; } bool TTxController::ExecuteOnCancel(const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc) { - auto it = BasicTxInfo.find(txId); - if (it == BasicTxInfo.end()) { + auto opIt = Operators.find(txId); + if (opIt == Operators.end()) { return true; } - if (it->second.PlanStep != 0) { - // Cannot cancel planned transaction + Y_ABORT_UNLESS(opIt != Operators.end()); + if (opIt->second->GetTxInfo().PlanStep != 0) { return false; } - auto opIt = Operators.find(txId); - Y_ABORT_UNLESS(opIt != Operators.end()); opIt->second->ExecuteOnAbort(Owner, txc); NIceDb::TNiceDb db(txc.DB); @@ -195,10 +179,8 @@ std::optional TTxController::StartPlannedTx() { auto node = PlanQueue.extract(PlanQueue.begin()); auto& item = node.value(); TPlanQueueItem tx(item.Step, item.TxId); - auto it = BasicTxInfo.find(item.TxId); - Y_ABORT_UNLESS(it != BasicTxInfo.end()); RunningQueue.emplace(std::move(item)); - return it->second; + return GetTxInfoVerified(item.TxId); } return std::nullopt; } @@ -209,7 +191,6 @@ void TTxController::FinishPlannedTx(const ui64 txId, NTabletFlatExecutor::TTrans } void TTxController::CompleteRunningTx(const TPlanQueueItem& txItem) { - AFL_VERIFY(BasicTxInfo.erase(txItem.TxId)); AFL_VERIFY(Operators.erase(txItem.TxId)); AFL_VERIFY(RunningQueue.erase(txItem))("info", txItem.DebugString()); } @@ -222,13 +203,19 @@ std::optional TTxController::GetPlannedTx() const } std::optional TTxController::GetTxInfo(const ui64 txId) const { - auto txPtr = BasicTxInfo.FindPtr(txId); - if (txPtr) { - return *txPtr; + auto it = Operators.find(txId); + if (it != Operators.end()) { + return it->second->GetTxInfo(); } return std::nullopt; } +TTxController::TTxInfo TTxController::GetTxInfoVerified(const ui64 txId) const { + auto it = Operators.find(txId); + AFL_VERIFY(it != Operators.end()); + return it->second->GetTxInfo(); +} + NEvents::TDataEvents::TCoordinatorInfo TTxController::BuildCoordinatorInfo(const TTxInfo& txInfo) const { if (Owner.ProcessingParams) { return NEvents::TDataEvents::TCoordinatorInfo(txInfo.MinStep, txInfo.MaxStep, Owner.ProcessingParams->GetCoordinators()); @@ -270,17 +257,18 @@ TDuration TTxController::GetTxCompleteLag(ui64 timecastStep) const { } TTxController::EPlanResult TTxController::PlanTx(const ui64 planStep, const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc) { - auto it = BasicTxInfo.find(txId); - if (it == BasicTxInfo.end()) { + auto it = Operators.find(txId); + if (it == Operators.end()) { return EPlanResult::Skipped; } - if (it->second.PlanStep == 0) { - it->second.PlanStep = planStep; + auto& txInfo = it->second->MutableTxInfo(); + if (txInfo.PlanStep == 0) { + txInfo.PlanStep = planStep; NIceDb::TNiceDb db(txc.DB); Schema::UpdateTxInfoPlanStep(db, txId, planStep); PlanQueue.emplace(planStep, txId); - if (it->second.MaxStep != Max()) { - DeadlineQueue.erase(TPlanQueueItem(it->second.MaxStep, txId)); + if (txInfo.MaxStep != Max()) { + DeadlineQueue.erase(TPlanQueueItem(txInfo.MaxStep, txId)); } return EPlanResult::Planned; } @@ -293,42 +281,63 @@ void TTxController::OnTabletInit() { } } -TTxProposeResult TTxController::ProposeTransaction(const TTxController::TBasicTxInfo& txInfo, const TString& txBody, const TActorId source, const ui64 cookie, NTabletFlatExecutor::TTransactionContext& txc) { - auto txOperator = TTxController::ITransactionOperator::TFactory::MakeHolder(txInfo.TxKind, TTxController::TTxInfo(txInfo.TxKind, txInfo.TxId)); - if (!txOperator || !txOperator->Parse(Owner, txBody)) { - TTxController::TProposeResult proposeResult(NKikimrTxColumnShard::EResultStatus::ERROR, TStringBuilder() << "Error processing commit TxId# " << txInfo.TxId - << (txOperator ? ". Parsing error " : ". Unknown operator for txKind")); - return TTxProposeResult(txInfo, std::move(proposeResult)); +std::shared_ptr TTxController::StartProposeOnExecute(const TTxController::TBasicTxInfo& txInfo, const TString& txBody, const TActorId source, const ui64 cookie, NTabletFlatExecutor::TTransactionContext& txc) { + std::shared_ptr txOperator(TTxController::ITransactionOperator::TFactory::Construct(txInfo.TxKind, + TTxController::TTxInfo(txInfo.TxKind, txInfo.TxId, source, cookie))); + AFL_VERIFY(!!txOperator); + if (!txOperator->Parse(Owner, txBody)) { + return txOperator; } auto txInfoPtr = GetTxInfo(txInfo.TxId); if (!!txInfoPtr) { if (!txOperator->AllowTxDups() && (txInfoPtr->Source != source || txInfoPtr->Cookie != cookie)) { TTxController::TProposeResult proposeResult(NKikimrTxColumnShard::EResultStatus::ERROR, TStringBuilder() << "Another commit TxId# " << txInfo.TxId << " has already been proposed"); - return TTxProposeResult(txInfo, std::move(proposeResult)); + txOperator->SetProposeStartInfo(proposeResult); + return txOperator; } else { - return TTxProposeResult(*txInfoPtr, TTxController::TProposeResult()); + return UpdateTxSourceInfo(txInfo.GetTxId(), source, cookie, txc); } } else { - auto proposeResult = txOperator->ExecuteOnPropose(Owner, txc); - if (!proposeResult.IsFail()) { - const auto fullTxInfo = txOperator->TxWithDeadline() ? RegisterTxWithDeadline(txInfo.TxId, txInfo.TxKind, txBody, source, cookie, txc) - : RegisterTx(txInfo.TxId, txInfo.TxKind, txBody, source, cookie, txc); - - return TTxProposeResult(fullTxInfo, std::move(proposeResult)); - } else { - return TTxProposeResult(txInfo, std::move(proposeResult)); + if (txOperator->StartProposeOnExecute(Owner, txc)) { + if (txOperator->TxWithDeadline()) { + RegisterTxWithDeadline(txOperator, txBody, txc); + } else { + RegisterTx(txOperator, txBody, txc); + } } + return txOperator; } } -void TTxController::CompleteTransaction(const ui64 txId, const TActorContext& ctx) { +void TTxController::StartProposeOnComplete(const ui64 txId, const TActorContext& ctx) { auto txOperator = GetTxOperator(txId); if (!txOperator) { AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", "cannot found txOperator in propose transaction base")("tx_id", txId); } else { - txOperator->CompleteOnPropose(Owner, ctx); + txOperator->StartProposeOnComplete(Owner, ctx); + } +} + +void TTxController::FinishProposeOnExecute(const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc) { + auto txOperator = GetTxOperator(txId); + if (!txOperator) { + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", "cannot found txOperator in propose transaction base")("tx_id", txId); + } else { + txOperator->FinishProposeOnExecute(Owner, txc); + } +} + +void TTxController::FinishProposeOnComplete(const ui64 txId, const TActorContext& ctx) { + auto txOperator = GetTxOperator(txId); + if (!txOperator) { + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", "cannot found txOperator in propose transaction finish")("tx_id", txId); + return; } + TTxController::TProposeResult proposeResult = txOperator->GetProposeStartInfoVerified(); + AFL_VERIFY(!txOperator->IsFail()); + txOperator->FinishProposeOnComplete(Owner, ctx); + txOperator->SendReply(Owner, ctx); } } diff --git a/ydb/core/tx/columnshard/transactions/tx_controller.h b/ydb/core/tx/columnshard/transactions/tx_controller.h index 7894584dbe78..fa840152aef8 100644 --- a/ydb/core/tx/columnshard/transactions/tx_controller.h +++ b/ydb/core/tx/columnshard/transactions/tx_controller.h @@ -18,6 +18,10 @@ struct TBasicTxInfo { : TxKind(txKind) , TxId(txId) { } + + ui64 GetTxId() const { + return TxId; + } }; struct TFullTxInfo: public TBasicTxInfo { @@ -30,6 +34,13 @@ struct TFullTxInfo: public TBasicTxInfo { TFullTxInfo(const NKikimrTxColumnShard::ETransactionKind& txKind, const ui64 txId) : TBasicTxInfo(txKind, txId) { } + + TFullTxInfo(const NKikimrTxColumnShard::ETransactionKind& txKind, const ui64 txId, const TActorId& source, const ui64 cookie) + : TBasicTxInfo(txKind, txId) + , Source(source) + , Cookie(cookie) + { + } }; class TTxProposeResult { @@ -117,12 +128,63 @@ class TTxController { using TProposeResult = TTxProposeResult::TProposeResult; class ITransactionOperator { + private: + enum class EStatus { + Created, + Parsed, + ProposeStartedOnExecute, + ProposeStartedOnComplete, + ProposeFinishedOnExecute, + ProposeFinishedOnComplete, + ReplySent, + Failed + }; protected: TTxInfo TxInfo; + YDB_READONLY_DEF(std::optional, ProposeStartInfo); + EStatus Status = EStatus::Created; + private: + virtual bool DoParse(TColumnShard& owner, const TString& data) = 0; + virtual TTxController::TProposeResult DoStartProposeOnExecute(TColumnShard & owner, NTabletFlatExecutor::TTransactionContext & txc) = 0; + virtual void DoStartProposeOnComplete(TColumnShard& owner, const TActorContext& ctx) = 0; + virtual void DoFinishProposeOnExecute(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) = 0; + virtual void DoFinishProposeOnComplete(TColumnShard& owner, const TActorContext& ctx) = 0; + virtual bool DoIsAsync() const = 0; + virtual void DoSendReply(TColumnShard& owner, const TActorContext& ctx) = 0; + + [[nodiscard]] bool SwitchState(const EStatus from, const EStatus to) { + if (Status == from) { + Status = to; + return true; + } + return false; + } public: using TPtr = std::shared_ptr; using TFactory = NObjectFactory::TParametrizedObjectFactory; + bool IsFail() const { + return ProposeStartInfo && ProposeStartInfo->IsFail(); + } + + const TTxController::TProposeResult& GetProposeStartInfoVerified() const { + AFL_VERIFY(!!ProposeStartInfo); + return *ProposeStartInfo; + } + + void SetProposeStartInfo(const TTxController::TProposeResult& info) { + AFL_VERIFY(!ProposeStartInfo); + ProposeStartInfo = info; + } + + const TTxInfo& GetTxInfo() const { + return TxInfo; + } + + TTxInfo& MutableTxInfo() { + return TxInfo; + } + ITransactionOperator(const TTxInfo& txInfo) : TxInfo(txInfo) {} @@ -135,15 +197,68 @@ class TTxController { return false; } + bool IsAsync() const { + return DoIsAsync() && Status != EStatus::Failed && Status != EStatus::ReplySent; + } + virtual ~ITransactionOperator() {} virtual bool TxWithDeadline() const { return true; } - virtual bool Parse(TColumnShard& owner, const TString& data) = 0; - virtual TProposeResult ExecuteOnPropose(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) const = 0; - virtual bool CompleteOnPropose(TColumnShard& owner, const TActorContext& ctx) const = 0; + bool Parse(TColumnShard& owner, const TString& data) { + const bool result = DoParse(owner, data); + if (!result) { + ProposeStartInfo = TTxController::TProposeResult(NKikimrTxColumnShard::EResultStatus::ERROR, TStringBuilder() << "Error processing commit TxId# " << TxInfo.TxId + << ". Parsing error"); + AFL_VERIFY(SwitchState(EStatus::Created, EStatus::Failed)); + } else { + AFL_VERIFY(SwitchState(EStatus::Created, EStatus::Parsed)); + } + return result; + } + + void SendReply(TColumnShard& owner, const TActorContext& ctx) { + AFL_VERIFY(!!ProposeStartInfo); + if (ProposeStartInfo->IsFail()) { + AFL_VERIFY(SwitchState(EStatus::Failed, EStatus::ReplySent)); + } else { + AFL_VERIFY(SwitchState(EStatus::ProposeFinishedOnComplete, EStatus::ReplySent)); + } + return DoSendReply(owner, ctx); + } + + bool StartProposeOnExecute(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) { + AFL_VERIFY(!ProposeStartInfo); + ProposeStartInfo = DoStartProposeOnExecute(owner, txc); + if (ProposeStartInfo->IsFail()) { + AFL_VERIFY(SwitchState(EStatus::Parsed, EStatus::Failed)); + } else { + AFL_VERIFY(SwitchState(EStatus::Parsed, EStatus::ProposeStartedOnExecute)); + } + return !GetProposeStartInfoVerified().IsFail(); + } + void StartProposeOnComplete(TColumnShard& owner, const TActorContext& ctx) { + AFL_VERIFY(SwitchState(EStatus::ProposeStartedOnExecute, EStatus::ProposeStartedOnComplete)); + AFL_VERIFY(IsAsync()); + return DoStartProposeOnComplete(owner, ctx); + } + void FinishProposeOnExecute(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) { + AFL_VERIFY(SwitchState(EStatus::ProposeStartedOnComplete, EStatus::ProposeFinishedOnExecute)); + AFL_VERIFY(IsAsync()); + return DoFinishProposeOnExecute(owner, txc); + } + void FinishProposeOnComplete(TColumnShard& owner, const TActorContext& ctx) { + if (IsFail()) { + AFL_VERIFY(Status == EStatus::Failed); + } else if (DoIsAsync()) { + AFL_VERIFY(SwitchState(EStatus::ProposeFinishedOnExecute, EStatus::ProposeFinishedOnComplete)); + } else { + AFL_VERIFY(SwitchState(EStatus::ProposeStartedOnExecute, EStatus::ProposeFinishedOnComplete)); + } + return DoFinishProposeOnComplete(owner, ctx); + } virtual bool ExecuteOnProgress(TColumnShard& owner, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc) = 0; virtual bool CompleteOnProgress(TColumnShard& owner, const TActorContext& ctx) = 0; @@ -160,7 +275,6 @@ class TTxController { private: const TDuration MaxCommitTxDelay = TDuration::Seconds(30); TColumnShard& Owner; - THashMap BasicTxInfo; std::set DeadlineQueue; std::set PlanQueue; std::set RunningQueue; @@ -171,8 +285,8 @@ class TTxController { ui64 GetAllowedStep() const; bool AbortTx(const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc); - TTxInfo RegisterTx(const ui64 txId, const NKikimrTxColumnShard::ETransactionKind& txKind, const TString& txBody, const TActorId& source, const ui64 cookie, NTabletFlatExecutor::TTransactionContext& txc); - TTxInfo RegisterTxWithDeadline(const ui64 txId, const NKikimrTxColumnShard::ETransactionKind& txKind, const TString& txBody, const TActorId& source, const ui64 cookie, NTabletFlatExecutor::TTransactionContext& txc); + TTxInfo RegisterTx(const std::shared_ptr& txOperator, const TString& txBody, NTabletFlatExecutor::TTransactionContext& txc); + TTxInfo RegisterTxWithDeadline(const std::shared_ptr& txOperator, const TString& txBody, NTabletFlatExecutor::TTransactionContext& txc); public: TTxController(TColumnShard& owner); @@ -185,8 +299,14 @@ class TTxController { bool Load(NTabletFlatExecutor::TTransactionContext& txc); - TTxProposeResult ProposeTransaction(const TBasicTxInfo& txInfo, const TString& txBody, const TActorId source, const ui64 cookie, NTabletFlatExecutor::TTransactionContext& txc); - void CompleteTransaction(const ui64 txId, const TActorContext& ctx); + [[nodiscard]] std::shared_ptr UpdateTxSourceInfo(const ui64 txId, const TActorId& source, const ui64 cookie, NTabletFlatExecutor::TTransactionContext& txc); + + [[nodiscard]] std::shared_ptr StartProposeOnExecute(const TBasicTxInfo& txInfo, const TString& txBody, const TActorId source, const ui64 cookie, NTabletFlatExecutor::TTransactionContext& txc); + void StartProposeOnComplete(const ui64 txId, const TActorContext& ctx); + + void FinishProposeOnExecute(const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc); + + void FinishProposeOnComplete(const ui64 txId, const TActorContext& ctx); bool ExecuteOnCancel(const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc); bool CompleteOnCancel(const ui64 txId, const TActorContext& ctx); @@ -198,6 +318,7 @@ class TTxController { std::optional GetPlannedTx() const; TPlanQueueItem GetFrontTx() const; std::optional GetTxInfo(const ui64 txId) const; + TTxInfo GetTxInfoVerified(const ui64 txId) const; NEvents::TDataEvents::TCoordinatorInfo BuildCoordinatorInfo(const TTxInfo& txInfo) const; size_t CleanExpiredTxs(NTabletFlatExecutor::TTransactionContext& txc); diff --git a/ydb/core/tx/columnshard/ut_rw/ut_backup.cpp b/ydb/core/tx/columnshard/ut_rw/ut_backup.cpp index a2f5b502d445..6463e4a0a266 100644 --- a/ydb/core/tx/columnshard/ut_rw/ut_backup.cpp +++ b/ydb/core/tx/columnshard/ut_rw/ut_backup.cpp @@ -101,13 +101,12 @@ Y_UNIT_TEST_SUITE(Backup) { txBody.MutableBackupTask()->SetSnapshotTxId(backupSnapshot.GetTxId()); txBody.MutableBackupTask()->MutableS3Settings()->SetEndpoint("fake"); txBody.MutableBackupTask()->MutableS3Settings()->SetSecretKey("fakeSecret"); - UNIT_ASSERT(ProposeTx(runtime, sender, NKikimrTxColumnShard::TX_KIND_BACKUP, txBody.SerializeAsString(), ++txId)); AFL_VERIFY(csControllerGuard->GetFinishedExportsCount() == 0); - PlanTx(runtime, sender, NKikimrTxColumnShard::TX_KIND_BACKUP, NOlap::TSnapshot(++planStep, txId)); + UNIT_ASSERT(ProposeTx(runtime, sender, NKikimrTxColumnShard::TX_KIND_BACKUP, txBody.SerializeAsString(), ++txId)); + AFL_VERIFY(csControllerGuard->GetFinishedExportsCount() == 1); + PlanTx(runtime, sender, NKikimrTxColumnShard::TX_KIND_BACKUP, NOlap::TSnapshot(++planStep, txId), false); TestWaitCondition(runtime, "export", []() {return Singleton()->GetSize(); }); - TestWaitCondition(runtime, "finish", - [&]() {return csControllerGuard->GetFinishedExportsCount() == 1; }); } } From cfcaf47448adaec2039d6d5a765e6f737facebe8 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Wed, 15 May 2024 18:06:12 +0300 Subject: [PATCH 2/3] fix build --- ydb/core/protos/tx_columnshard.proto | 1 + 1 file changed, 1 insertion(+) diff --git a/ydb/core/protos/tx_columnshard.proto b/ydb/core/protos/tx_columnshard.proto index 5a2c712523ee..3da14c4444bf 100644 --- a/ydb/core/protos/tx_columnshard.proto +++ b/ydb/core/protos/tx_columnshard.proto @@ -131,6 +131,7 @@ enum ETransactionKind { TX_KIND_DATA = 4; TX_KIND_COMMIT_WRITE = 5; TX_KIND_BACKUP = 6; + TX_KIND_SHARING = 7; } enum ETransactionFlag { From bbb92147b0859115f2ddff76cf225789127c523a Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Wed, 15 May 2024 18:56:46 +0300 Subject: [PATCH 3/3] fixes --- .../data_sharing/destination/session/destination.cpp | 5 ++--- .../destination/transactions/tx_finish_from_source.cpp | 2 +- .../destination/transactions/tx_finish_from_source.h | 1 + 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/ydb/core/tx/columnshard/data_sharing/destination/session/destination.cpp b/ydb/core/tx/columnshard/data_sharing/destination/session/destination.cpp index d0647be552a7..e09e894f8f92 100644 --- a/ydb/core/tx/columnshard/data_sharing/destination/session/destination.cpp +++ b/ydb/core/tx/columnshard/data_sharing/destination/session/destination.cpp @@ -95,9 +95,8 @@ NKikimr::TConclusion> TDestin } NKikimr::TConclusion> TDestinationSession::ReceiveFinished(NColumnShard::TColumnShard* self, const TTabletId sourceTabletId, const std::shared_ptr& selfPtr) { - auto result = GetCursorVerified(sourceTabletId).ReceiveFinished(); - if (!result) { - return result; + if (GetCursorVerified(sourceTabletId).GetDataFinished()) { + return TConclusionStatus::Fail("session finished already"); } return std::unique_ptr(new TTxFinishFromSource(self, sourceTabletId, selfPtr)); } diff --git a/ydb/core/tx/columnshard/data_sharing/destination/transactions/tx_finish_from_source.cpp b/ydb/core/tx/columnshard/data_sharing/destination/transactions/tx_finish_from_source.cpp index 94e9ec55d027..f09641f6696c 100644 --- a/ydb/core/tx/columnshard/data_sharing/destination/transactions/tx_finish_from_source.cpp +++ b/ydb/core/tx/columnshard/data_sharing/destination/transactions/tx_finish_from_source.cpp @@ -8,7 +8,7 @@ bool TTxFinishFromSource::DoExecute(NTabletFlatExecutor::TTransactionContext& tx NIceDb::TNiceDb db(txc.DB); db.Table().Key(Session->GetSessionId()) .Update(NIceDb::TUpdate(Session->SerializeCursorToProto().SerializeAsString())); - if (Session->GetSourcesInProgressCount() == 1) { + if (Session->GetSourcesInProgressCount() == 0) { Finished = true; if (Session->GetTransferContext().GetTxId()) { Self->GetProgressTxController().FinishProposeOnExecute(*Session->GetTransferContext().GetTxId(), txc); diff --git a/ydb/core/tx/columnshard/data_sharing/destination/transactions/tx_finish_from_source.h b/ydb/core/tx/columnshard/data_sharing/destination/transactions/tx_finish_from_source.h index 4e2f107d8a2f..982029b2c928 100644 --- a/ydb/core/tx/columnshard/data_sharing/destination/transactions/tx_finish_from_source.h +++ b/ydb/core/tx/columnshard/data_sharing/destination/transactions/tx_finish_from_source.h @@ -20,6 +20,7 @@ class TTxFinishFromSource: public TExtendedTransactionBaseGetCursorVerified(SourceTabletId).ReceiveFinished().Validate(); } TTxType GetTxType() const override { return NColumnShard::TXTYPE_DATA_SHARING_FINISH_FROM_SOURCE; }