Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

async propose phase on CS (config stage on SS) #4560

Merged
merged 6 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ydb/core/kqp/ut/query/kqp_query_ut.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>

#include <ydb/core/tx/datashard/datashard_failpoints.h>
#include <ydb/core/testlib/common_helper.h>
#include <ydb/core/kqp/provider/yql_kikimr_expr_nodes.h>
#include <ydb/core/kqp/counters/kqp_counters.h>
#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
Expand Down Expand Up @@ -1639,6 +1640,8 @@ Y_UNIT_TEST_SUITE(KqpQuery) {
WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10);
)";

Tests::NCommon::TLoggerInit(kikimr).SetComponents({ NKikimrServices::TX_COLUMNSHARD }, "CS").Initialize();

auto client = kikimr.GetQueryClient();
auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/tx_columnshard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ enum ETransactionKind {
TX_KIND_DATA = 4;
TX_KIND_COMMIT_WRITE = 5;
TX_KIND_BACKUP = 6;
TX_KIND_SHARING = 7;
}

enum ETransactionFlag {
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/columnshard/bg_tasks/abstract/adapter.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#pragma once
#include "session.h"
#include <ydb/core/tablet_flat/tablet_flat_executor.h>
#include <ydb/core/tx/columnshard/common/tablet_id.h>

#include <ydb/library/actors/core/actorid.h>
#include <ydb/library/accessor/accessor.h>
#include <ydb/library/conclusion/status.h>
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/tx/columnshard/bg_tasks/manager/actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ class TSessionActor: public NActors::TActorBootstrapped<TSessionActor> {
void SaveSessionProgress();

void SaveSessionState();

template <class T>
T* GetShardVerified() const {
return &Adapter->GetTabletExecutorVerifiedAs<T>();
}

public:
TSessionActor(const std::shared_ptr<TSession>& session, const std::shared_ptr<ITabletAdapter>& adapter)
: TabletId(adapter->GetTabletId())
Expand Down
31 changes: 16 additions & 15 deletions ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
for (auto&& i : buffer.GetRemoveActions()) {
i->OnExecuteTxAfterRemoving(blobManagerDb, true);
}
Results.clear();
for (auto&& aggr : buffer.GetAggregations()) {
const auto& writeMeta = aggr->GetWriteData()->GetWriteMeta();
if (!writeMeta.HasLongTxId()) {
Expand All @@ -87,22 +88,22 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
proto.SetLockId(operation->GetLockId());
TString txBody;
Y_ABORT_UNLESS(proto.SerializeToString(&txBody));
auto result = Self->GetProgressTxController().ProposeTransaction(TTxController::TBasicTxInfo(NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE, operation->GetLockId()), txBody, writeMeta.GetSource(), operation->GetCookie(), txc);
AFL_VERIFY(!result.IsError());
Results.emplace_back(NEvents::TDataEvents::TEvWriteResult::BuildPrepared(
Self->TabletID(), result.GetFullTxInfoVerified().TxId, Self->GetProgressTxController().BuildCoordinatorInfo(result.GetFullTxInfoVerified())));
auto op = Self->GetProgressTxController().StartProposeOnExecute(TTxController::TBasicTxInfo(NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE, operation->GetLockId()), txBody, writeMeta.GetSource(), operation->GetCookie(), txc);
AFL_VERIFY(!op->IsFail());
ResultOperators.emplace_back(op);
} else {
NKikimrDataEvents::TLock lock;
lock.SetLockId(operation->GetLockId());
lock.SetDataShard(Self->TabletID());
lock.SetGeneration(1);
lock.SetCounter(1);

Results.emplace_back(NEvents::TDataEvents::TEvWriteResult::BuildCompleted(Self->TabletID(), operation->GetLockId(), lock));
auto ev = NEvents::TDataEvents::TEvWriteResult::BuildCompleted(Self->TabletID(), operation->GetLockId(), lock);
Results.emplace_back(std::move(ev), writeMeta.GetSource(), operation->GetCookie());
}
} else {
Y_ABORT_UNLESS(aggr->GetWriteIds().size() == 1);
Results.emplace_back(std::make_unique<TEvColumnShard::TEvWriteResult>(Self->TabletID(), writeMeta, (ui64)aggr->GetWriteIds().front(), NKikimrTxColumnShard::EResultStatus::SUCCESS));
auto ev = std::make_unique<TEvColumnShard::TEvWriteResult>(Self->TabletID(), writeMeta, (ui64)aggr->GetWriteIds().front(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
Results.emplace_back(std::move(ev), writeMeta.GetSource(), 0);
}
}
return true;
Expand All @@ -119,16 +120,16 @@ void TTxWrite::Complete(const TActorContext& ctx) {
for (auto&& i : buffer.GetRemoveActions()) {
i->OnCompleteTxAfterRemoving(true);
}
AFL_VERIFY(buffer.GetAggregations().size() == Results.size());

AFL_VERIFY(buffer.GetAggregations().size() == Results.size() + ResultOperators.size());
for (auto&& i : ResultOperators) {
Self->GetProgressTxController().FinishProposeOnComplete(i->GetTxId(), ctx);
}
for (auto&& i : Results) {
i.DoSendReply(ctx);
}
for (ui32 i = 0; i < buffer.GetAggregations().size(); ++i) {
const auto& writeMeta = buffer.GetAggregations()[i]->GetWriteData()->GetWriteMeta();
auto operation = Self->OperationsManager->GetOperation((TWriteId)writeMeta.GetWriteId());
if (operation) {
Self->GetProgressTxController().CompleteTransaction(operation->GetLockId(), ctx);
ctx.Send(writeMeta.GetSource(), Results[i].release(), 0, operation->GetCookie());
} else {
ctx.Send(writeMeta.GetSource(), Results[i].release());
}
Self->CSCounters.OnWriteTxComplete(now - writeMeta.GetWriteStartInstant());
Self->CSCounters.OnSuccessWriteResponse();
}
Expand Down
24 changes: 22 additions & 2 deletions ydb/core/tx/columnshard/blobs_action/transaction/tx_write.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,31 @@ class TTxWrite : public NTabletFlatExecutor::TTransactionBase<TColumnShard> {
void Complete(const TActorContext& ctx) override;
TTxType GetTxType() const override { return TXTYPE_WRITE; }


private:
TEvPrivate::TEvWriteBlobsResult::TPtr PutBlobResult;
const ui32 TabletTxNo;
std::vector<std::unique_ptr<NActors::IEventBase>> Results;

class TReplyInfo {
ivanmorozov333 marked this conversation as resolved.
Show resolved Hide resolved
private:
std::unique_ptr<NActors::IEventBase> Event;
TActorId DestinationForReply;
const ui64 Cookie;
public:
TReplyInfo(std::unique_ptr<NActors::IEventBase>&& ev, const TActorId& destinationForReply, const ui64 cookie)
: Event(std::move(ev))
, DestinationForReply(destinationForReply)
, Cookie(cookie)
{

}

void DoSendReply(const TActorContext& ctx) {
ctx.Send(DestinationForReply, Event.release(), 0, Cookie);
}
};

std::vector<TReplyInfo> Results;
std::vector<std::shared_ptr<TTxController::ITransactionOperator>> ResultOperators;


bool InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TWriteId writeId);
Expand Down
51 changes: 22 additions & 29 deletions ydb/core/tx/columnshard/columnshard__propose_transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ class TTxProposeTransaction : public NTabletFlatExecutor::TTransactionBase<TColu
TTxProposeTransaction(TColumnShard* self, TEvColumnShard::TEvProposeTransaction::TPtr& ev)
: TBase(self)
, Ev(ev)
{}

virtual bool Execute(TTransactionContext& txc, const TActorContext& /*ctx*/) override {
Y_ABORT_UNLESS(Ev);
{
AFL_VERIFY(!!Ev);
}

virtual bool Execute(TTransactionContext& txc, const TActorContext& ctx) override {
txc.DB.NoMoreReadsForTx();
NIceDb::TNiceDb db(txc.DB);

Expand All @@ -32,7 +32,8 @@ class TTxProposeTransaction : public NTabletFlatExecutor::TTransactionBase<TColu

if (txKind == NKikimrTxColumnShard::TX_KIND_TTL) {
auto proposeResult = ProposeTtlDeprecated(txBody);
Result = std::make_unique<TEvColumnShard::TEvProposeTransactionResult>(Self->TabletID(), txKind, txId, proposeResult.GetStatus(), proposeResult.GetStatusMessage());
auto reply = std::make_unique<TEvColumnShard::TEvProposeTransactionResult>(Self->TabletID(), txKind, txId, proposeResult.GetStatus(), proposeResult.GetStatusMessage());
ctx.Send(Ev->Sender, reply.release());
ivanmorozov333 marked this conversation as resolved.
Show resolved Hide resolved
return true;
}

Expand All @@ -49,44 +50,36 @@ class TTxProposeTransaction : public NTabletFlatExecutor::TTransactionBase<TColu
Y_ABORT_UNLESS(Self->CurrentSchemeShardId == record.GetSchemeShardId());
}
}
auto result = Self->GetProgressTxController().ProposeTransaction(TTxController::TBasicTxInfo(txKind, txId), txBody, Ev->Get()->GetSource(), Ev->Cookie, txc);
const auto& proposeResult = result.GetProposeResult();
if (result.IsError()) {
const auto& txInfo = result.GetBaseTxInfoVerified();
Result = std::make_unique<TEvColumnShard::TEvProposeTransactionResult>(Self->TabletID(), txInfo.TxKind, txInfo.TxId, proposeResult.GetStatus(), proposeResult.GetStatusMessage());
Self->IncCounter(COUNTER_PREPARE_ERROR);
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("message", proposeResult.GetStatusMessage())("tablet_id", Self->TabletID())("tx_id", txInfo.TxId);
} else {
const auto& txInfo = result.GetFullTxInfoVerified();
AFL_VERIFY(proposeResult.GetStatus() == NKikimrTxColumnShard::EResultStatus::PREPARED)("tx_id", txInfo.TxId)("details", proposeResult.DebugString());
Result = std::make_unique<TEvColumnShard::TEvProposeTransactionResult>(Self->TabletID(), txInfo.TxKind, txInfo.TxId, proposeResult.GetStatus(), proposeResult.GetStatusMessage());
Result->Record.SetMinStep(txInfo.MinStep);
Result->Record.SetMaxStep(txInfo.MaxStep);
if (Self->ProcessingParams) {
Result->Record.MutableDomainCoordinators()->CopyFrom(Self->ProcessingParams->GetCoordinators());
}
Self->IncCounter(COUNTER_PREPARE_SUCCESS);
}
TxOperator = Self->GetProgressTxController().StartProposeOnExecute(TTxController::TBasicTxInfo(txKind, txId), txBody, Ev->Get()->GetSource(), Ev->Cookie, txc);
return true;
}

virtual void Complete(const TActorContext& ctx) override {
Y_ABORT_UNLESS(Ev);
Y_ABORT_UNLESS(Result);

auto& record = Proto(Ev->Get());
if (record.GetTxKind() == NKikimrTxColumnShard::TX_KIND_TTL) {
return;
}
AFL_VERIFY(!!TxOperator);
const ui64 txId = record.GetTxId();
if (Ev->Sender != TxOperator->GetTxInfo().Source) {
return;
}
if (TxOperator->IsFail()) {
TxOperator->SendReply(*Self, ctx);
} else if (TxOperator->IsAsync()) {
Self->GetProgressTxController().StartProposeOnComplete(txId, ctx);
} else {
Self->GetProgressTxController().FinishProposeOnComplete(txId, ctx);
}

Self->GetProgressTxController().CompleteTransaction(txId, ctx);
ctx.Send(Ev->Get()->GetSource(), Result.release());
Self->TryRegisterMediatorTimeCast();
}

TTxType GetTxType() const override { return TXTYPE_PROPOSE; }

private:
TEvColumnShard::TEvProposeTransaction::TPtr Ev;
std::unique_ptr<TEvColumnShard::TEvProposeTransactionResult> Result;
std::shared_ptr<TTxController::ITransactionOperator> TxOperator;

TTxController::TProposeResult ProposeTtlDeprecated(const TString& txBody) {
/// @note There's no tx guaranties now. For now TX_KIND_TTL is used to trigger TTL in tests only.
Expand Down
12 changes: 3 additions & 9 deletions ydb/core/tx/columnshard/columnshard__write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,26 +270,20 @@ class TProposeWriteTransaction : public NTabletFlatExecutor::TTransactionBase<TC
TCommitOperation::TPtr WriteCommit;
TActorId Source;
ui64 Cookie;
std::unique_ptr<NActors::IEventBase> Result;
};

bool TProposeWriteTransaction::Execute(TTransactionContext& txc, const TActorContext&) {
NKikimrTxColumnShard::TCommitWriteTxBody proto;
proto.SetLockId(WriteCommit->GetLockId());
TString txBody;
Y_ABORT_UNLESS(proto.SerializeToString(&txBody));
auto result = Self->GetProgressTxController().ProposeTransaction(
TTxController::TBasicTxInfo(NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE, WriteCommit->GetTxId()), txBody, Source, Cookie, txc);
if (result.IsError()) {
Result = NEvents::TDataEvents::TEvWriteResult::BuildError(Self->TabletID(), result.GetTxId(), NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, result.GetProposeResult().GetStatusMessage());
} else {
Result = NEvents::TDataEvents::TEvWriteResult::BuildPrepared(Self->TabletID(), result.GetTxId(), Self->GetProgressTxController().BuildCoordinatorInfo(result.GetFullTxInfoVerified()));
}
Y_UNUSED(Self->GetProgressTxController().StartProposeOnExecute(
TTxController::TBasicTxInfo(NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE, WriteCommit->GetTxId()), txBody, Source, Cookie, txc));
return true;
}

void TProposeWriteTransaction::Complete(const TActorContext& ctx) {
ctx.Send(Source, Result.release(), 0, Cookie);
Self->GetProgressTxController().FinishProposeOnComplete(WriteCommit->GetTxId(), ctx);
}

void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActorContext& ctx) {
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/tx/columnshard/columnshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ class TColumnShard
friend class TLongTxTransactionOperator;
friend class TEvWriteTransactionOperator;
friend class TBackupTransactionOperator;
friend class ISSTransactionOperator;
friend class TSharingTransactionOperator;


class TTxProgressTx;
class TTxProposeCancel;
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/tx/columnshard/columnshard_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,12 @@ struct Schema : NIceDb::Schema {
NIceDb::TUpdate<TxInfo::Cookie>(cookie));
}

static void UpdateTxInfoSource(NIceDb::TNiceDb& db, ui64 txId, const TActorId& source, ui64 cookie) {
db.Table<TxInfo>().Key(txId).Update(
NIceDb::TUpdate<TxInfo::Source>(source),
NIceDb::TUpdate<TxInfo::Cookie>(cookie));
}

static void UpdateTxInfoPlanStep(NIceDb::TNiceDb& db, ui64 txId, ui64 planStep) {
db.Table<TxInfo>().Key(txId).Update(
NIceDb::TUpdate<TxInfo::PlanStep>(planStep));
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/columnshard/data_locks/manager/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ namespace NKikimr::NOlap::NDataLocks {
std::shared_ptr<TManager::TGuard> TManager::RegisterLock(const std::shared_ptr<ILock>& lock) {
AFL_VERIFY(lock);
AFL_VERIFY(ProcessLocks.emplace(lock->GetLockName(), lock).second)("process_id", lock->GetLockName());
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "lock")("process_id", lock->GetLockName());
return std::make_shared<TGuard>(lock->GetLockName(), StopFlag);
}

void TManager::UnregisterLock(const TString& processId) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "unlock")("process_id", processId);
AFL_VERIFY(ProcessLocks.erase(processId))("process_id", processId);
}

Expand Down
48 changes: 36 additions & 12 deletions ydb/core/tx/columnshard/data_sharing/common/context/context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,27 @@ namespace NKikimr::NOlap::NDataSharing {
NKikimrColumnShardDataSharingProto::TTransferContext TTransferContext::SerializeToProto() const {
NKikimrColumnShardDataSharingProto::TTransferContext result;
result.SetDestinationTabletId((ui64)DestinationTabletId);
if (TxId) {
result.SetTxId(*TxId);
}
for (auto&& i : SourceTabletIds) {
result.AddSourceTabletIds((ui64)i);
}
SnapshotBarrier.SerializeToProto(*result.MutableSnapshotBarrier());
if (SnapshotBarrier) {
SnapshotBarrier->SerializeToProto(*result.MutableSnapshotBarrier());
}
result.SetMoving(Moving);
return result;
}

NKikimr::TConclusionStatus TTransferContext::DeserializeFromProto(const NKikimrColumnShardDataSharingProto::TTransferContext& proto) {
DestinationTabletId = (TTabletId)proto.GetDestinationTabletId();
if (proto.HasTxId()) {
TxId = proto.GetTxId();
if (!*TxId) {
return TConclusionStatus::Fail("TxId is incorrect");
}
}
if (!(ui64)DestinationTabletId) {
return TConclusionStatus::Fail("incorrect DestinationTabletId in proto");
}
Expand All @@ -24,15 +35,16 @@ NKikimr::TConclusionStatus TTransferContext::DeserializeFromProto(const NKikimrC
}
Moving = proto.GetMoving();
{
if (!proto.HasSnapshotBarrier()) {
return TConclusionStatus::Fail("SnapshotBarrier not initialized in proto.");
}
auto snapshotParse = SnapshotBarrier.DeserializeFromProto(proto.GetSnapshotBarrier());
if (!snapshotParse) {
return snapshotParse;
}
if (!SnapshotBarrier.Valid()) {
return TConclusionStatus::Fail("SnapshotBarrier must be valid in proto.");
if (proto.HasSnapshotBarrier()) {
TSnapshot snapshot = TSnapshot::Zero();
auto snapshotParse = snapshot.DeserializeFromProto(proto.GetSnapshotBarrier());
if (!snapshotParse) {
return snapshotParse;
}
if (!snapshot.Valid()) {
return TConclusionStatus::Fail("SnapshotBarrier must be valid in proto.");
}
SnapshotBarrier = snapshot;
}
}
return TConclusionStatus::Success();
Expand All @@ -47,16 +59,28 @@ bool TTransferContext::IsEqualTo(const TTransferContext& context) const {
}

TString TTransferContext::DebugString() const {
return TStringBuilder() << "{from=" << (ui64)DestinationTabletId << ";moving=" << Moving << ";snapshot=" << SnapshotBarrier.DebugString() << "}";
return TStringBuilder() << "{from=" << (ui64)DestinationTabletId << ";moving=" << Moving << ";snapshot=" << (SnapshotBarrier ? SnapshotBarrier->DebugString() : "NO") << "}";
}

TTransferContext::TTransferContext(const TTabletId destination, const THashSet<TTabletId>& sources, const TSnapshot& snapshotBarrier, const bool moving)
TTransferContext::TTransferContext(const TTabletId destination, const THashSet<TTabletId>& sources, const TSnapshot& snapshotBarrier, const bool moving, const std::optional<ui64> txId)
: DestinationTabletId(destination)
, SourceTabletIds(sources)
, Moving(moving)
, SnapshotBarrier(snapshotBarrier)
, TxId(txId)
{
AFL_VERIFY(!TxId || *TxId);
AFL_VERIFY(!sources.contains(destination));
}

const NKikimr::NOlap::TSnapshot& TTransferContext::GetSnapshotBarrierVerified() const {
AFL_VERIFY(!!SnapshotBarrier);
return *SnapshotBarrier;
}

void TTransferContext::SetSnapshotBarrier(const TSnapshot& snapshot) {
AFL_VERIFY(!SnapshotBarrier || *SnapshotBarrier == snapshot);
SnapshotBarrier = snapshot;
}

}
Loading
Loading