Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 committed May 16, 2024
1 parent 11aa167 commit 87bf797
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 4 deletions.
3 changes: 3 additions & 0 deletions ydb/core/kqp/ut/query/kqp_query_ut.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>

#include <ydb/core/tx/datashard/datashard_failpoints.h>
#include <ydb/core/testlib/common_helper.h>
#include <ydb/core/kqp/provider/yql_kikimr_expr_nodes.h>
#include <ydb/core/kqp/counters/kqp_counters.h>
#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
Expand Down Expand Up @@ -1639,6 +1640,8 @@ Y_UNIT_TEST_SUITE(KqpQuery) {
WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10);
)";

Tests::NCommon::TLoggerInit(kikimr).SetComponents({ NKikimrServices::TX_COLUMNSHARD }, "CS").Initialize();

auto client = kikimr.GetQueryClient();
auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,12 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
lock.SetGeneration(1);
lock.SetCounter(1);
auto ev = NEvents::TDataEvents::TEvWriteResult::BuildCompleted(Self->TabletID(), operation->GetLockId(), lock);
Results.emplace_back(std::move(ev), writeMeta.GetSource());
Results.emplace_back(std::move(ev), writeMeta.GetSource(), operation->GetCookie());
}
} else {
Y_ABORT_UNLESS(aggr->GetWriteIds().size() == 1);
auto ev = std::make_unique<TEvColumnShard::TEvWriteResult>(Self->TabletID(), writeMeta, (ui64)aggr->GetWriteIds().front(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
Results.emplace_back(std::move(ev), writeMeta.GetSource());
Results.emplace_back(std::move(ev), writeMeta.GetSource(), 0);
}
}
return true;
Expand All @@ -120,6 +120,7 @@ void TTxWrite::Complete(const TActorContext& ctx) {
for (auto&& i : buffer.GetRemoveActions()) {
i->OnCompleteTxAfterRemoving(true);
}

AFL_VERIFY(buffer.GetAggregations().size() == Results.size() + ResultOperators.size());
for (auto&& i : ResultOperators) {
Self->GetProgressTxController().FinishProposeOnComplete(i->GetTxId(), ctx);
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/tx/columnshard/blobs_action/transaction/tx_write.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,18 @@ class TTxWrite : public NTabletFlatExecutor::TTransactionBase<TColumnShard> {
private:
std::unique_ptr<NActors::IEventBase> Event;
TActorId DestinationForReply;
const ui64 Cookie;
public:
TReplyInfo(std::unique_ptr<NActors::IEventBase>&& ev, const TActorId& destinationForReply)
TReplyInfo(std::unique_ptr<NActors::IEventBase>&& ev, const TActorId& destinationForReply, const ui64 cookie)
: Event(std::move(ev))
, DestinationForReply(destinationForReply)
, Cookie(cookie)
{

}

void DoSendReply(const TActorContext& ctx) {
ctx.Send(DestinationForReply, Event.release());
ctx.Send(DestinationForReply, Event.release(), 0, Cookie);
}
};

Expand Down
14 changes: 14 additions & 0 deletions ydb/core/tx/columnshard/transactions/tx_controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -282,20 +282,25 @@ void TTxController::OnTabletInit() {
}

std::shared_ptr<TTxController::ITransactionOperator> TTxController::StartProposeOnExecute(const TTxController::TBasicTxInfo& txInfo, const TString& txBody, const TActorId source, const ui64 cookie, NTabletFlatExecutor::TTransactionContext& txc) {
NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("method", "TTxController::StartProposeOnExecute")("tx_info", txInfo.DebugString());
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "start");
std::shared_ptr<TTxController::ITransactionOperator> txOperator(TTxController::ITransactionOperator::TFactory::Construct(txInfo.TxKind,
TTxController::TTxInfo(txInfo.TxKind, txInfo.TxId, source, cookie)));
AFL_VERIFY(!!txOperator);
if (!txOperator->Parse(Owner, txBody)) {
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("error", "cannot parse txOperator");
return txOperator;
}

auto txInfoPtr = GetTxInfo(txInfo.TxId);
if (!!txInfoPtr) {
if (!txOperator->AllowTxDups() && (txInfoPtr->Source != source || txInfoPtr->Cookie != cookie)) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", "incorrect duplication");
TTxController::TProposeResult proposeResult(NKikimrTxColumnShard::EResultStatus::ERROR, TStringBuilder() << "Another commit TxId# " << txInfo.TxId << " has already been proposed");
txOperator->SetProposeStartInfo(proposeResult);
return txOperator;
} else {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", "update duplication data");
return UpdateTxSourceInfo(txInfo.GetTxId(), source, cookie, txc);
}
} else {
Expand All @@ -305,35 +310,44 @@ std::shared_ptr<TTxController::ITransactionOperator> TTxController::StartPropose
} else {
RegisterTx(txOperator, txBody, txc);
}
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "registered");
} else {
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("error", "problem on start")("message", txOperator->GetProposeStartInfoVerified().GetStatusMessage());
}
return txOperator;
}
}

void TTxController::StartProposeOnComplete(const ui64 txId, const TActorContext& ctx) {
NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("method", "TTxController::StartProposeOnComplete")("tx_id", txId);
auto txOperator = GetTxOperator(txId);
if (!txOperator) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", "cannot found txOperator in propose transaction base")("tx_id", txId);
} else {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "start");
txOperator->StartProposeOnComplete(Owner, ctx);
}
}

void TTxController::FinishProposeOnExecute(const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc) {
NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("method", "TTxController::FinishProposeOnExecute")("tx_id", txId);
auto txOperator = GetTxOperator(txId);
if (!txOperator) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", "cannot found txOperator in propose transaction base")("tx_id", txId);
} else {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "start");
txOperator->FinishProposeOnExecute(Owner, txc);
}
}

void TTxController::FinishProposeOnComplete(const ui64 txId, const TActorContext& ctx) {
NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("method", "TTxController::FinishProposeOnComplete")("tx_id", txId);
auto txOperator = GetTxOperator(txId);
if (!txOperator) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", "cannot found txOperator in propose transaction finish")("tx_id", txId);
return;
}
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "start");
TTxController::TProposeResult proposeResult = txOperator->GetProposeStartInfoVerified();
AFL_VERIFY(!txOperator->IsFail());
txOperator->FinishProposeOnComplete(Owner, ctx);
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/transactions/tx_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ struct TBasicTxInfo {
ui64 GetTxId() const {
return TxId;
}

TString DebugString() const {
return TStringBuilder() << TxId << ":" << NKikimrTxColumnShard::ETransactionKind_Name(TxKind);
}
};

struct TFullTxInfo: public TBasicTxInfo {
Expand Down

0 comments on commit 87bf797

Please sign in to comment.