From fccde7284d777fe6d79156f22b702c7ab34c9340 Mon Sep 17 00:00:00 2001 From: Aleksei Borzenkov Date: Tue, 26 Mar 2024 13:14:58 +0000 Subject: [PATCH] Fix race between table merge and borrowed gc compaction. Fixes #3154. --- ydb/core/tx/datashard/datashard_split_dst.cpp | 81 +++++++----- .../tx/datashard/datashard_ut_common_kqp.h | 28 ++++ .../tx/datashard/datashard_ut_snapshot.cpp | 124 ++++++++++++++++++ 3 files changed, 200 insertions(+), 33 deletions(-) diff --git a/ydb/core/tx/datashard/datashard_split_dst.cpp b/ydb/core/tx/datashard/datashard_split_dst.cpp index d78ca11ccf2c..c55038394253 100644 --- a/ydb/core/tx/datashard/datashard_split_dst.cpp +++ b/ydb/core/tx/datashard/datashard_split_dst.cpp @@ -120,13 +120,11 @@ class TDataShard::TTxInitSplitMergeDestination : public NTabletFlatExecutor::TTr class TDataShard::TTxSplitTransferSnapshot : public NTabletFlatExecutor::TTransactionBase { private: TEvDataShard::TEvSplitTransferSnapshot::TPtr Ev; - bool LastSnapshotReceived; public: TTxSplitTransferSnapshot(TDataShard* ds, TEvDataShard::TEvSplitTransferSnapshot::TPtr& ev) : NTabletFlatExecutor::TTransactionBase(ds) , Ev(ev) - , LastSnapshotReceived(false) {} TTxType GetTxType() const override { return TXTYPE_SPLIT_TRANSFER_SNAPSHOT; } @@ -257,8 +255,6 @@ class TDataShard::TTxSplitTransferSnapshot : public NTabletFlatExecutor::TTransa } if (Self->ReceiveSnapshotsFrom.empty()) { - LastSnapshotReceived = true; - const auto minVersion = mvcc ? Self->GetSnapshotManager().GetLowWatermark() : Self->GetSnapshotManager().GetMinWriteVersion(); @@ -295,6 +291,12 @@ class TDataShard::TTxSplitTransferSnapshot : public NTabletFlatExecutor::TTransa // Note: we persist Ready, but keep current state in memory until Complete Self->SetPersistState(TShardState::Ready, txc); Self->State = TShardState::SplitDstReceivingSnapshot; + + // Schedule a new transaction that will move shard to the Ready state + // and finish initialization. This new transaction is guaranteed to + // wait until async LoanTable above is complete and new parts are + // fully merged into the table. + Self->Execute(new TTxLastSnapshotReceived(Self)); } return true; @@ -307,39 +309,52 @@ class TDataShard::TTxSplitTransferSnapshot : public NTabletFlatExecutor::TTransa LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " ack snapshot OpId " << opId); ctx.Send(ackTo, new TEvDataShard::TEvSplitTransferSnapshotAck(opId, Self->TabletID())); + } - // Note: we skip init in an unlikely event of state resetting between Execute and Complete - if (LastSnapshotReceived && Self->State == TShardState::SplitDstReceivingSnapshot) { - // We have received all the data, finish shard initialization - // Note: previously we used TxInit, however received system tables - // have been empty for years now, and since pipes are still open we - // may receive requests between TxInit loading the Ready state and - // its Complete method initializing everything properly. Instead - // necessary steps are repeated here. - Self->State = TShardState::Ready; - - // We are already in StateWork, but we need to repeat many steps now that we are Ready - Self->SwitchToWork(ctx); - - // We can send the registration request now that we are ready - Self->SendRegistrationRequestTimeCast(ctx); - - // Initialize snapshot expiration queue with current context time - Self->GetSnapshotManager().InitExpireQueue(ctx.Now()); - if (Self->GetSnapshotManager().HasExpiringSnapshots()) { - Self->PlanCleanup(ctx); - } + class TTxLastSnapshotReceived : public NTabletFlatExecutor::TTransactionBase { + public: + TTxLastSnapshotReceived(TDataShard* self) + : TTransactionBase(self) + {} - // Initialize change senders - Self->KillChangeSender(ctx); - Self->CreateChangeSender(ctx); - Self->MaybeActivateChangeSender(ctx); - Self->EmitHeartbeats(); + bool Execute(TTransactionContext&, const TActorContext&) override { + return true; + } - // Switch mvcc state if needed - Self->CheckMvccStateChangeCanStart(ctx); + void Complete(const TActorContext& ctx) override { + // Note: we skip init in an unlikely event of state resetting before reaching Complete + if (Self->State == TShardState::SplitDstReceivingSnapshot) { + // We have received all the data, finish shard initialization + // Note: previously we used TxInit, however received system tables + // have been empty for years now, and since pipes are still open we + // may receive requests between TxInit loading the Ready state and + // its Complete method initializing everything properly. Instead + // necessary steps are repeated here. + Self->State = TShardState::Ready; + + // We are already in StateWork, but we need to repeat many steps now that we are Ready + Self->SwitchToWork(ctx); + + // We can send the registration request now that we are ready + Self->SendRegistrationRequestTimeCast(ctx); + + // Initialize snapshot expiration queue with current context time + Self->GetSnapshotManager().InitExpireQueue(ctx.Now()); + if (Self->GetSnapshotManager().HasExpiringSnapshots()) { + Self->PlanCleanup(ctx); + } + + // Initialize change senders + Self->KillChangeSender(ctx); + Self->CreateChangeSender(ctx); + Self->MaybeActivateChangeSender(ctx); + Self->EmitHeartbeats(); + + // Switch mvcc state if needed + Self->CheckMvccStateChangeCanStart(ctx); + } } - } + }; }; class TDataShard::TTxSplitReplicationSourceOffsets : public NTabletFlatExecutor::TTransactionBase { diff --git a/ydb/core/tx/datashard/datashard_ut_common_kqp.h b/ydb/core/tx/datashard/datashard_ut_common_kqp.h index 1ca7c1396ebe..61b189a860c4 100644 --- a/ydb/core/tx/datashard/datashard_ut_common_kqp.h +++ b/ydb/core/tx/datashard/datashard_ut_common_kqp.h @@ -11,6 +11,9 @@ namespace NKqpHelpers { using TEvExecuteDataQueryRequest = NKikimr::NGRpcService::TGrpcRequestOperationCall; + using TEvExecuteSchemeQueryRequest = NKikimr::NGRpcService::TGrpcRequestOperationCall; + using TEvCreateSessionRequest = NKikimr::NGRpcService::TGrpcRequestOperationCall; @@ -224,6 +227,31 @@ namespace NKqpHelpers { return FormatResult(result); } + inline Ydb::Table::ExecuteSchemeQueryRequest MakeSchemeRequestRPC( + const TString& sql, const TString& sessionId) + { + Ydb::Table::ExecuteSchemeQueryRequest request; + request.set_session_id(sessionId); + request.set_yql_text(sql); + return request; + } + + inline NThreading::TFuture SendRequest( + TTestActorRuntime& runtime, Ydb::Table::ExecuteSchemeQueryRequest&& request, const TString& database = {}) + { + return NRpcService::DoLocalRpc( + std::move(request), database, /* token */ "", runtime.GetActorSystem(0)); + } + + inline TString KqpSchemeExec(TTestActorRuntime& runtime, const TString& query) { + TString sessionId = CreateSessionRPC(runtime); + auto response = AwaitResponse(runtime, SendRequest(runtime, MakeSchemeRequestRPC(query, sessionId))); + if (response.operation().status() != Ydb::StatusIds::SUCCESS) { + return TStringBuilder() << "ERROR: " << response.operation().status(); + } + return "SUCCESS"; + } + } // namespace NKqpHelpers } // namespace NDataShard } // namespace NKikimr diff --git a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp index e3af16937812..b9b30ba89247 100644 --- a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp +++ b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp @@ -4540,6 +4540,130 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { } } + void CompactBorrowed(TTestActorRuntime& runtime, ui64 shardId, const TTableId& tableId) { + auto msg = MakeHolder(tableId.PathId); + auto sender = runtime.AllocateEdgeActor(); + runtime.SendToPipe(shardId, sender, msg.Release(), 0, GetPipeConfigWithRetries()); + runtime.GrabEdgeEventRethrow(sender); + } + + Y_UNIT_TEST(PostMergeNotCompactedTooEarly) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetDomainPlanResolution(100); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + + InitRoot(server, sender); + + TDisableDataShardLogBatching disableDataShardLogBatching; + + KqpSchemeExec(runtime, R"( + CREATE TABLE `/Root/table` (key int, value bytes, PRIMARY KEY (key)) + WITH (AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 1, + PARTITION_AT_KEYS = (5)); + )"); + + const auto shards = GetTableShards(server, sender, "/Root/table"); + UNIT_ASSERT_VALUES_EQUAL(shards.size(), 2u); + const auto tableId = ResolveTableId(server, sender, "/Root/table"); + + for (int i = 0; i < 20; ++i) { + Cerr << "... upserting key " << i << Endl; + auto query = Sprintf(R"( + UPSERT INTO `/Root/table` (key, value) VALUES (%d, '%s'); + )", i, TString(128 * 1024, 'x').c_str()); + ExecSQL(server, sender, query); + if (i >= 5) { + Cerr << "... compacting shard " << shards.at(1) << Endl; + CompactTable(runtime, shards.at(1), tableId, false); + } else if (i == 4) { + Cerr << "... compacting shard " << shards.at(0) << Endl; + CompactTable(runtime, shards.at(0), tableId, false); + } + } + + // Read (and snapshot) current data, so it doesn't go away on compaction + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, "SELECT COUNT(*) FROM `/Root/table`;"), + "{ items { uint64_value: 20 } }"); + + // Delete all the data in shard 0, this is small and will stay in memtable + // But when borrowed dst compaction will have pressure to compact it all + ExecSQL(server, sender, "DELETE FROM `/Root/table` WHERE key < 5"); + + std::vector snapshots; + auto captureSnapshots = runtime.AddObserver( + [&](TEvDataShard::TEvSplitTransferSnapshot::TPtr& ev) { + auto* msg = ev->Get(); + Cerr << "... captured snapshot from " << msg->Record.GetSrcTabletId() << Endl; + snapshots.emplace_back(ev.Release()); + }); + + Cerr << "... merging table" << Endl; + SetSplitMergePartCountLimit(server->GetRuntime(), -1); + ui64 txId = AsyncMergeTable(server, sender, "/Root/table", shards); + Cerr << "... started merge " << txId << Endl; + WaitFor(runtime, [&]{ return snapshots.size() >= 2; }, "both src tablet snapshots"); + + std::vector gets; + auto captureGets = runtime.AddObserver( + [&](TEvBlobStorage::TEvGet::TPtr& ev) { + auto* msg = ev->Get(); + if (msg->Queries[0].Id.TabletID() == shards.at(1)) { + Cerr << "... blocking blob get of " << msg->Queries[0].Id << Endl; + gets.emplace_back(ev.Release()); + } + }); + + // Release snapshot for shard 0 then shard 1 + captureSnapshots.Remove(); + Cerr << "... unlocking snapshots from tablet " << shards.at(0) << Endl; + for (auto& ev : snapshots) { + if (ev && ev->Get()->Record.GetSrcTabletId() == shards.at(0)) { + runtime.Send(ev.Release(), 0, true); + } + } + Cerr << "... unblocking snapshots from tablet " << shards.at(1) << Endl; + for (auto& ev : snapshots) { + if (ev && ev->Get()->Record.GetSrcTabletId() == shards.at(1)) { + runtime.Send(ev.Release(), 0, true); + } + } + + // Let it commit above snapshots and incorrectly compact after the first one is loaded and merged + runtime.SimulateSleep(TDuration::Seconds(1)); + UNIT_ASSERT(gets.size() > 0); + + Cerr << "... unblocking blob gets" << Endl; + captureGets.Remove(); + for (auto& ev : gets) { + runtime.Send(ev.Release(), 0, true); + } + + // Let it finish loading the second snapshot + runtime.SimulateSleep(TDuration::Seconds(1)); + + // Wait for merge to complete and start a borrowed compaction + // When bug is present it will cause newly compacted to part to have epoch larger than previously compacted + WaitTxNotification(server, sender, txId); + const auto merged = GetTableShards(server, sender, "/Root/table"); + UNIT_ASSERT_VALUES_EQUAL(merged.size(), 1u); + Cerr << "... compacting borrowed parts in shard " << merged.at(0) << Endl; + CompactBorrowed(runtime, merged.at(0), tableId); + + // Validate we have an expected number of rows + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, "SELECT COUNT(*) FROM `/Root/table`;"), + "{ items { uint64_value: 15 } }"); + } + } } // namespace NKikimr