Skip to content

Commit

Permalink
Merge bbb9214 into 7156a4e
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored May 15, 2024
2 parents 7156a4e + bbb9214 commit e1be878
Show file tree
Hide file tree
Showing 46 changed files with 973 additions and 432 deletions.
1 change: 1 addition & 0 deletions ydb/core/protos/tx_columnshard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ enum ETransactionKind {
TX_KIND_DATA = 4;
TX_KIND_COMMIT_WRITE = 5;
TX_KIND_BACKUP = 6;
TX_KIND_SHARING = 7;
}

enum ETransactionFlag {
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/columnshard/bg_tasks/abstract/adapter.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#pragma once
#include "session.h"
#include <ydb/core/tablet_flat/tablet_flat_executor.h>
#include <ydb/core/tx/columnshard/common/tablet_id.h>

#include <ydb/library/actors/core/actorid.h>
#include <ydb/library/accessor/accessor.h>
#include <ydb/library/conclusion/status.h>
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/tx/columnshard/bg_tasks/manager/actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ class TSessionActor: public NActors::TActorBootstrapped<TSessionActor> {
void SaveSessionProgress();

void SaveSessionState();

template <class T>
T* GetShardVerified() const {
return &Adapter->GetTabletExecutorVerifiedAs<T>();
}

public:
TSessionActor(const std::shared_ptr<TSession>& session, const std::shared_ptr<ITabletAdapter>& adapter)
: TabletId(adapter->GetTabletId())
Expand Down
30 changes: 15 additions & 15 deletions ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
for (auto&& i : buffer.GetRemoveActions()) {
i->OnExecuteTxAfterRemoving(blobManagerDb, true);
}
Results.clear();
for (auto&& aggr : buffer.GetAggregations()) {
const auto& writeMeta = aggr->GetWriteData()->GetWriteMeta();
if (!writeMeta.HasLongTxId()) {
Expand All @@ -87,22 +88,22 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
proto.SetLockId(operation->GetLockId());
TString txBody;
Y_ABORT_UNLESS(proto.SerializeToString(&txBody));
auto result = Self->GetProgressTxController().ProposeTransaction(TTxController::TBasicTxInfo(NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE, operation->GetLockId()), txBody, writeMeta.GetSource(), operation->GetCookie(), txc);
AFL_VERIFY(!result.IsError());
Results.emplace_back(NEvents::TDataEvents::TEvWriteResult::BuildPrepared(
Self->TabletID(), result.GetFullTxInfoVerified().TxId, Self->GetProgressTxController().BuildCoordinatorInfo(result.GetFullTxInfoVerified())));
auto op = Self->GetProgressTxController().StartProposeOnExecute(TTxController::TBasicTxInfo(NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE, operation->GetLockId()), txBody, writeMeta.GetSource(), operation->GetCookie(), txc);
AFL_VERIFY(!op->IsFail());
ResultOperators.emplace_back(op);
} else {
NKikimrDataEvents::TLock lock;
lock.SetLockId(operation->GetLockId());
lock.SetDataShard(Self->TabletID());
lock.SetGeneration(1);
lock.SetCounter(1);

Results.emplace_back(NEvents::TDataEvents::TEvWriteResult::BuildCompleted(Self->TabletID(), operation->GetLockId(), lock));
auto ev = NEvents::TDataEvents::TEvWriteResult::BuildCompleted(Self->TabletID(), operation->GetLockId(), lock);
Results.emplace_back(std::move(ev), writeMeta.GetSource());
}
} else {
Y_ABORT_UNLESS(aggr->GetWriteIds().size() == 1);
Results.emplace_back(std::make_unique<TEvColumnShard::TEvWriteResult>(Self->TabletID(), writeMeta, (ui64)aggr->GetWriteIds().front(), NKikimrTxColumnShard::EResultStatus::SUCCESS));
auto ev = std::make_unique<TEvColumnShard::TEvWriteResult>(Self->TabletID(), writeMeta, (ui64)aggr->GetWriteIds().front(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
Results.emplace_back(std::move(ev), writeMeta.GetSource());
}
}
return true;
Expand All @@ -119,16 +120,15 @@ void TTxWrite::Complete(const TActorContext& ctx) {
for (auto&& i : buffer.GetRemoveActions()) {
i->OnCompleteTxAfterRemoving(true);
}
AFL_VERIFY(buffer.GetAggregations().size() == Results.size());
AFL_VERIFY(buffer.GetAggregations().size() == Results.size() + ResultOperators.size());
for (auto&& i : ResultOperators) {
Self->GetProgressTxController().FinishProposeOnComplete(i->GetTxId(), ctx);
}
for (auto&& i : Results) {
i.DoSendReply(ctx);
}
for (ui32 i = 0; i < buffer.GetAggregations().size(); ++i) {
const auto& writeMeta = buffer.GetAggregations()[i]->GetWriteData()->GetWriteMeta();
auto operation = Self->OperationsManager->GetOperation((TWriteId)writeMeta.GetWriteId());
if (operation) {
Self->GetProgressTxController().CompleteTransaction(operation->GetLockId(), ctx);
ctx.Send(writeMeta.GetSource(), Results[i].release(), 0, operation->GetCookie());
} else {
ctx.Send(writeMeta.GetSource(), Results[i].release());
}
Self->CSCounters.OnWriteTxComplete(now - writeMeta.GetWriteStartInstant());
Self->CSCounters.OnSuccessWriteResponse();
}
Expand Down
22 changes: 20 additions & 2 deletions ydb/core/tx/columnshard/blobs_action/transaction/tx_write.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,29 @@ class TTxWrite : public NTabletFlatExecutor::TTransactionBase<TColumnShard> {
void Complete(const TActorContext& ctx) override;
TTxType GetTxType() const override { return TXTYPE_WRITE; }


private:
TEvPrivate::TEvWriteBlobsResult::TPtr PutBlobResult;
const ui32 TabletTxNo;
std::vector<std::unique_ptr<NActors::IEventBase>> Results;

class TReplyInfo {
private:
std::unique_ptr<NActors::IEventBase> Event;
TActorId DestinationForReply;
public:
TReplyInfo(std::unique_ptr<NActors::IEventBase>&& ev, const TActorId& destinationForReply)
: Event(std::move(ev))
, DestinationForReply(destinationForReply)
{

}

void DoSendReply(const TActorContext& ctx) {
ctx.Send(DestinationForReply, Event.release());
}
};

std::vector<TReplyInfo> Results;
std::vector<std::shared_ptr<TTxController::ITransactionOperator>> ResultOperators;


bool InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TWriteId writeId);
Expand Down
50 changes: 21 additions & 29 deletions ydb/core/tx/columnshard/columnshard__propose_transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ class TTxProposeTransaction : public NTabletFlatExecutor::TTransactionBase<TColu
TTxProposeTransaction(TColumnShard* self, TEvColumnShard::TEvProposeTransaction::TPtr& ev)
: TBase(self)
, Ev(ev)
{}

virtual bool Execute(TTransactionContext& txc, const TActorContext& /*ctx*/) override {
Y_ABORT_UNLESS(Ev);
{
AFL_VERIFY(!!Ev);
}

virtual bool Execute(TTransactionContext& txc, const TActorContext& ctx) override {
txc.DB.NoMoreReadsForTx();
NIceDb::TNiceDb db(txc.DB);

Expand All @@ -32,7 +32,8 @@ class TTxProposeTransaction : public NTabletFlatExecutor::TTransactionBase<TColu

if (txKind == NKikimrTxColumnShard::TX_KIND_TTL) {
auto proposeResult = ProposeTtlDeprecated(txBody);
Result = std::make_unique<TEvColumnShard::TEvProposeTransactionResult>(Self->TabletID(), txKind, txId, proposeResult.GetStatus(), proposeResult.GetStatusMessage());
auto reply = std::make_unique<TEvColumnShard::TEvProposeTransactionResult>(Self->TabletID(), txKind, txId, proposeResult.GetStatus(), proposeResult.GetStatusMessage());
ctx.Send(Ev->Sender, reply.release());
return true;
}

Expand All @@ -49,44 +50,35 @@ class TTxProposeTransaction : public NTabletFlatExecutor::TTransactionBase<TColu
Y_ABORT_UNLESS(Self->CurrentSchemeShardId == record.GetSchemeShardId());
}
}
auto result = Self->GetProgressTxController().ProposeTransaction(TTxController::TBasicTxInfo(txKind, txId), txBody, Ev->Get()->GetSource(), Ev->Cookie, txc);
const auto& proposeResult = result.GetProposeResult();
if (result.IsError()) {
const auto& txInfo = result.GetBaseTxInfoVerified();
Result = std::make_unique<TEvColumnShard::TEvProposeTransactionResult>(Self->TabletID(), txInfo.TxKind, txInfo.TxId, proposeResult.GetStatus(), proposeResult.GetStatusMessage());
Self->IncCounter(COUNTER_PREPARE_ERROR);
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("message", proposeResult.GetStatusMessage())("tablet_id", Self->TabletID())("tx_id", txInfo.TxId);
} else {
const auto& txInfo = result.GetFullTxInfoVerified();
AFL_VERIFY(proposeResult.GetStatus() == NKikimrTxColumnShard::EResultStatus::PREPARED)("tx_id", txInfo.TxId)("details", proposeResult.DebugString());
Result = std::make_unique<TEvColumnShard::TEvProposeTransactionResult>(Self->TabletID(), txInfo.TxKind, txInfo.TxId, proposeResult.GetStatus(), proposeResult.GetStatusMessage());
Result->Record.SetMinStep(txInfo.MinStep);
Result->Record.SetMaxStep(txInfo.MaxStep);
if (Self->ProcessingParams) {
Result->Record.MutableDomainCoordinators()->CopyFrom(Self->ProcessingParams->GetCoordinators());
}
Self->IncCounter(COUNTER_PREPARE_SUCCESS);
}
TxOperator = Self->GetProgressTxController().StartProposeOnExecute(TTxController::TBasicTxInfo(txKind, txId), txBody, Ev->Get()->GetSource(), Ev->Cookie, txc);
return true;
}

virtual void Complete(const TActorContext& ctx) override {
Y_ABORT_UNLESS(Ev);
Y_ABORT_UNLESS(Result);

auto& record = Proto(Ev->Get());
if (record.GetTxKind() == NKikimrTxColumnShard::TX_KIND_TTL) {
return;
}
AFL_VERIFY(!!TxOperator);
const ui64 txId = record.GetTxId();

Self->GetProgressTxController().CompleteTransaction(txId, ctx);
ctx.Send(Ev->Get()->GetSource(), Result.release());
if (TxOperator->IsFail()) {
TxOperator->SendReply(*Self, ctx);
}
if (TxOperator->IsAsync()) {
Self->GetProgressTxController().StartProposeOnComplete(txId, ctx);
} else {
Self->GetProgressTxController().FinishProposeOnComplete(txId, ctx);
}

Self->TryRegisterMediatorTimeCast();
}

TTxType GetTxType() const override { return TXTYPE_PROPOSE; }

private:
TEvColumnShard::TEvProposeTransaction::TPtr Ev;
std::unique_ptr<TEvColumnShard::TEvProposeTransactionResult> Result;
std::shared_ptr<TTxController::ITransactionOperator> TxOperator;

TTxController::TProposeResult ProposeTtlDeprecated(const TString& txBody) {
/// @note There's no tx guaranties now. For now TX_KIND_TTL is used to trigger TTL in tests only.
Expand Down
12 changes: 3 additions & 9 deletions ydb/core/tx/columnshard/columnshard__write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,26 +270,20 @@ class TProposeWriteTransaction : public NTabletFlatExecutor::TTransactionBase<TC
TCommitOperation::TPtr WriteCommit;
TActorId Source;
ui64 Cookie;
std::unique_ptr<NActors::IEventBase> Result;
};

bool TProposeWriteTransaction::Execute(TTransactionContext& txc, const TActorContext&) {
NKikimrTxColumnShard::TCommitWriteTxBody proto;
proto.SetLockId(WriteCommit->GetLockId());
TString txBody;
Y_ABORT_UNLESS(proto.SerializeToString(&txBody));
auto result = Self->GetProgressTxController().ProposeTransaction(
TTxController::TBasicTxInfo(NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE, WriteCommit->GetTxId()), txBody, Source, Cookie, txc);
if (result.IsError()) {
Result = NEvents::TDataEvents::TEvWriteResult::BuildError(Self->TabletID(), result.GetTxId(), NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, result.GetProposeResult().GetStatusMessage());
} else {
Result = NEvents::TDataEvents::TEvWriteResult::BuildPrepared(Self->TabletID(), result.GetTxId(), Self->GetProgressTxController().BuildCoordinatorInfo(result.GetFullTxInfoVerified()));
}
Y_UNUSED(Self->GetProgressTxController().StartProposeOnExecute(
TTxController::TBasicTxInfo(NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE, WriteCommit->GetTxId()), txBody, Source, Cookie, txc));
return true;
}

void TProposeWriteTransaction::Complete(const TActorContext& ctx) {
ctx.Send(Source, Result.release(), 0, Cookie);
Self->GetProgressTxController().FinishProposeOnComplete(WriteCommit->GetTxId(), ctx);
}

void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActorContext& ctx) {
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/tx/columnshard/columnshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ class TColumnShard
friend class TLongTxTransactionOperator;
friend class TEvWriteTransactionOperator;
friend class TBackupTransactionOperator;
friend class ISSTransactionOperator;
friend class TSharingTransactionOperator;


class TTxProgressTx;
class TTxProposeCancel;
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/tx/columnshard/columnshard_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,12 @@ struct Schema : NIceDb::Schema {
NIceDb::TUpdate<TxInfo::Cookie>(cookie));
}

static void UpdateTxInfoSource(NIceDb::TNiceDb& db, ui64 txId, const TActorId& source, ui64 cookie) {
db.Table<TxInfo>().Key(txId).Update(
NIceDb::TUpdate<TxInfo::Source>(source),
NIceDb::TUpdate<TxInfo::Cookie>(cookie));
}

static void UpdateTxInfoPlanStep(NIceDb::TNiceDb& db, ui64 txId, ui64 planStep) {
db.Table<TxInfo>().Key(txId).Update(
NIceDb::TUpdate<TxInfo::PlanStep>(planStep));
Expand Down
48 changes: 36 additions & 12 deletions ydb/core/tx/columnshard/data_sharing/common/context/context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,27 @@ namespace NKikimr::NOlap::NDataSharing {
NKikimrColumnShardDataSharingProto::TTransferContext TTransferContext::SerializeToProto() const {
NKikimrColumnShardDataSharingProto::TTransferContext result;
result.SetDestinationTabletId((ui64)DestinationTabletId);
if (TxId) {
result.SetTxId(*TxId);
}
for (auto&& i : SourceTabletIds) {
result.AddSourceTabletIds((ui64)i);
}
SnapshotBarrier.SerializeToProto(*result.MutableSnapshotBarrier());
if (SnapshotBarrier) {
SnapshotBarrier->SerializeToProto(*result.MutableSnapshotBarrier());
}
result.SetMoving(Moving);
return result;
}

NKikimr::TConclusionStatus TTransferContext::DeserializeFromProto(const NKikimrColumnShardDataSharingProto::TTransferContext& proto) {
DestinationTabletId = (TTabletId)proto.GetDestinationTabletId();
if (proto.HasTxId()) {
TxId = proto.GetTxId();
if (!*TxId) {
return TConclusionStatus::Fail("TxId is incorrect");
}
}
if (!(ui64)DestinationTabletId) {
return TConclusionStatus::Fail("incorrect DestinationTabletId in proto");
}
Expand All @@ -24,15 +35,16 @@ NKikimr::TConclusionStatus TTransferContext::DeserializeFromProto(const NKikimrC
}
Moving = proto.GetMoving();
{
if (!proto.HasSnapshotBarrier()) {
return TConclusionStatus::Fail("SnapshotBarrier not initialized in proto.");
}
auto snapshotParse = SnapshotBarrier.DeserializeFromProto(proto.GetSnapshotBarrier());
if (!snapshotParse) {
return snapshotParse;
}
if (!SnapshotBarrier.Valid()) {
return TConclusionStatus::Fail("SnapshotBarrier must be valid in proto.");
if (proto.HasSnapshotBarrier()) {
TSnapshot snapshot = TSnapshot::Zero();
auto snapshotParse = snapshot.DeserializeFromProto(proto.GetSnapshotBarrier());
if (!snapshotParse) {
return snapshotParse;
}
if (!snapshot.Valid()) {
return TConclusionStatus::Fail("SnapshotBarrier must be valid in proto.");
}
SnapshotBarrier = snapshot;
}
}
return TConclusionStatus::Success();
Expand All @@ -47,16 +59,28 @@ bool TTransferContext::IsEqualTo(const TTransferContext& context) const {
}

TString TTransferContext::DebugString() const {
return TStringBuilder() << "{from=" << (ui64)DestinationTabletId << ";moving=" << Moving << ";snapshot=" << SnapshotBarrier.DebugString() << "}";
return TStringBuilder() << "{from=" << (ui64)DestinationTabletId << ";moving=" << Moving << ";snapshot=" << (SnapshotBarrier ? SnapshotBarrier->DebugString() : "NO") << "}";
}

TTransferContext::TTransferContext(const TTabletId destination, const THashSet<TTabletId>& sources, const TSnapshot& snapshotBarrier, const bool moving)
TTransferContext::TTransferContext(const TTabletId destination, const THashSet<TTabletId>& sources, const TSnapshot& snapshotBarrier, const bool moving, const std::optional<ui64> txId)
: DestinationTabletId(destination)
, SourceTabletIds(sources)
, Moving(moving)
, SnapshotBarrier(snapshotBarrier)
, TxId(txId)
{
AFL_VERIFY(!TxId || *TxId);
AFL_VERIFY(!sources.contains(destination));
}

const NKikimr::NOlap::TSnapshot& TTransferContext::GetSnapshotBarrierVerified() const {
AFL_VERIFY(!!SnapshotBarrier);
return *SnapshotBarrier;
}

void TTransferContext::SetSnapshotBarrier(const TSnapshot& snapshot) {
AFL_VERIFY(!SnapshotBarrier || *SnapshotBarrier == snapshot);
SnapshotBarrier = snapshot;
}

}
9 changes: 7 additions & 2 deletions ydb/core/tx/columnshard/data_sharing/common/context/context.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,18 @@ class TTransferContext {
YDB_READONLY(TTabletId, DestinationTabletId, (TTabletId)0);
YDB_READONLY_DEF(THashSet<TTabletId>, SourceTabletIds);
YDB_READONLY(bool, Moving, false);
YDB_READONLY(TSnapshot, SnapshotBarrier, TSnapshot::Zero());
std::optional<TSnapshot> SnapshotBarrier;
YDB_READONLY_DEF(std::optional<ui64>, TxId);
public:
TTransferContext() = default;
bool IsEqualTo(const TTransferContext& context) const;
TString DebugString() const;

TTransferContext(const TTabletId destination, const THashSet<TTabletId>& sources, const TSnapshot& snapshotBarrier, const bool moving);
const TSnapshot& GetSnapshotBarrierVerified() const;

void SetSnapshotBarrier(const TSnapshot& snapshot);

TTransferContext(const TTabletId destination, const THashSet<TTabletId>& sources, const TSnapshot& snapshotBarrier, const bool moving, const std::optional<ui64> txId = {});
NKikimrColumnShardDataSharingProto::TTransferContext SerializeToProto() const;
TConclusionStatus DeserializeFromProto(const NKikimrColumnShardDataSharingProto::TTransferContext& proto);
};
Expand Down
6 changes: 5 additions & 1 deletion ydb/core/tx/columnshard/data_sharing/common/session/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ class TCommonSession {
, TransferContext(transferContext) {
}

const TTransferContext& GetTransferContext() const {
return TransferContext;
}

bool IsFinished() const {
return IsFinishedFlag;
}
Expand All @@ -73,7 +77,7 @@ class TCommonSession {
void Finish(const std::shared_ptr<NDataLocks::TManager>& dataLocksManager);

const TSnapshot& GetSnapshotBarrier() const {
return TransferContext.GetSnapshotBarrier();
return TransferContext.GetSnapshotBarrierVerified();
}

TString DebugString() const;
Expand Down
Loading

0 comments on commit e1be878

Please sign in to comment.