Skip to content

Commit

Permalink
Fix race between table merge and borrowed gc compaction. Fixes #3154. (
Browse files Browse the repository at this point in the history
  • Loading branch information
snaury authored Mar 26, 2024
1 parent bfd2866 commit 7d3baf9
Show file tree
Hide file tree
Showing 3 changed files with 200 additions and 33 deletions.
81 changes: 48 additions & 33 deletions ydb/core/tx/datashard/datashard_split_dst.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,11 @@ class TDataShard::TTxInitSplitMergeDestination : public NTabletFlatExecutor::TTr
class TDataShard::TTxSplitTransferSnapshot : public NTabletFlatExecutor::TTransactionBase<TDataShard> {
private:
TEvDataShard::TEvSplitTransferSnapshot::TPtr Ev;
bool LastSnapshotReceived;

public:
TTxSplitTransferSnapshot(TDataShard* ds, TEvDataShard::TEvSplitTransferSnapshot::TPtr& ev)
: NTabletFlatExecutor::TTransactionBase<TDataShard>(ds)
, Ev(ev)
, LastSnapshotReceived(false)
{}

TTxType GetTxType() const override { return TXTYPE_SPLIT_TRANSFER_SNAPSHOT; }
Expand Down Expand Up @@ -257,8 +255,6 @@ class TDataShard::TTxSplitTransferSnapshot : public NTabletFlatExecutor::TTransa
}

if (Self->ReceiveSnapshotsFrom.empty()) {
LastSnapshotReceived = true;

const auto minVersion = mvcc ? Self->GetSnapshotManager().GetLowWatermark()
: Self->GetSnapshotManager().GetMinWriteVersion();

Expand Down Expand Up @@ -295,6 +291,12 @@ class TDataShard::TTxSplitTransferSnapshot : public NTabletFlatExecutor::TTransa
// Note: we persist Ready, but keep current state in memory until Complete
Self->SetPersistState(TShardState::Ready, txc);
Self->State = TShardState::SplitDstReceivingSnapshot;

// Schedule a new transaction that will move shard to the Ready state
// and finish initialization. This new transaction is guaranteed to
// wait until async LoanTable above is complete and new parts are
// fully merged into the table.
Self->Execute(new TTxLastSnapshotReceived(Self));
}

return true;
Expand All @@ -307,39 +309,52 @@ class TDataShard::TTxSplitTransferSnapshot : public NTabletFlatExecutor::TTransa
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " ack snapshot OpId " << opId);

ctx.Send(ackTo, new TEvDataShard::TEvSplitTransferSnapshotAck(opId, Self->TabletID()));
}

// Note: we skip init in an unlikely event of state resetting between Execute and Complete
if (LastSnapshotReceived && Self->State == TShardState::SplitDstReceivingSnapshot) {
// We have received all the data, finish shard initialization
// Note: previously we used TxInit, however received system tables
// have been empty for years now, and since pipes are still open we
// may receive requests between TxInit loading the Ready state and
// its Complete method initializing everything properly. Instead
// necessary steps are repeated here.
Self->State = TShardState::Ready;

// We are already in StateWork, but we need to repeat many steps now that we are Ready
Self->SwitchToWork(ctx);

// We can send the registration request now that we are ready
Self->SendRegistrationRequestTimeCast(ctx);

// Initialize snapshot expiration queue with current context time
Self->GetSnapshotManager().InitExpireQueue(ctx.Now());
if (Self->GetSnapshotManager().HasExpiringSnapshots()) {
Self->PlanCleanup(ctx);
}
class TTxLastSnapshotReceived : public NTabletFlatExecutor::TTransactionBase<TDataShard> {
public:
TTxLastSnapshotReceived(TDataShard* self)
: TTransactionBase(self)
{}

// Initialize change senders
Self->KillChangeSender(ctx);
Self->CreateChangeSender(ctx);
Self->MaybeActivateChangeSender(ctx);
Self->EmitHeartbeats();
bool Execute(TTransactionContext&, const TActorContext&) override {
return true;
}

// Switch mvcc state if needed
Self->CheckMvccStateChangeCanStart(ctx);
void Complete(const TActorContext& ctx) override {
// Note: we skip init in an unlikely event of state resetting before reaching Complete
if (Self->State == TShardState::SplitDstReceivingSnapshot) {
// We have received all the data, finish shard initialization
// Note: previously we used TxInit, however received system tables
// have been empty for years now, and since pipes are still open we
// may receive requests between TxInit loading the Ready state and
// its Complete method initializing everything properly. Instead
// necessary steps are repeated here.
Self->State = TShardState::Ready;

// We are already in StateWork, but we need to repeat many steps now that we are Ready
Self->SwitchToWork(ctx);

// We can send the registration request now that we are ready
Self->SendRegistrationRequestTimeCast(ctx);

// Initialize snapshot expiration queue with current context time
Self->GetSnapshotManager().InitExpireQueue(ctx.Now());
if (Self->GetSnapshotManager().HasExpiringSnapshots()) {
Self->PlanCleanup(ctx);
}

// Initialize change senders
Self->KillChangeSender(ctx);
Self->CreateChangeSender(ctx);
Self->MaybeActivateChangeSender(ctx);
Self->EmitHeartbeats();

// Switch mvcc state if needed
Self->CheckMvccStateChangeCanStart(ctx);
}
}
}
};
};

class TDataShard::TTxSplitReplicationSourceOffsets : public NTabletFlatExecutor::TTransactionBase<TDataShard> {
Expand Down
28 changes: 28 additions & 0 deletions ydb/core/tx/datashard/datashard_ut_common_kqp.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ namespace NKqpHelpers {
using TEvExecuteDataQueryRequest = NKikimr::NGRpcService::TGrpcRequestOperationCall<Ydb::Table::ExecuteDataQueryRequest,
Ydb::Table::ExecuteDataQueryResponse>;

using TEvExecuteSchemeQueryRequest = NKikimr::NGRpcService::TGrpcRequestOperationCall<Ydb::Table::ExecuteSchemeQueryRequest,
Ydb::Table::ExecuteSchemeQueryResponse>;

using TEvCreateSessionRequest = NKikimr::NGRpcService::TGrpcRequestOperationCall<Ydb::Table::CreateSessionRequest,
Ydb::Table::CreateSessionResponse>;

Expand Down Expand Up @@ -224,6 +227,31 @@ namespace NKqpHelpers {
return FormatResult(result);
}

inline Ydb::Table::ExecuteSchemeQueryRequest MakeSchemeRequestRPC(
const TString& sql, const TString& sessionId)
{
Ydb::Table::ExecuteSchemeQueryRequest request;
request.set_session_id(sessionId);
request.set_yql_text(sql);
return request;
}

inline NThreading::TFuture<Ydb::Table::ExecuteSchemeQueryResponse> SendRequest(
TTestActorRuntime& runtime, Ydb::Table::ExecuteSchemeQueryRequest&& request, const TString& database = {})
{
return NRpcService::DoLocalRpc<TEvExecuteSchemeQueryRequest>(
std::move(request), database, /* token */ "", runtime.GetActorSystem(0));
}

inline TString KqpSchemeExec(TTestActorRuntime& runtime, const TString& query) {
TString sessionId = CreateSessionRPC(runtime);
auto response = AwaitResponse(runtime, SendRequest(runtime, MakeSchemeRequestRPC(query, sessionId)));
if (response.operation().status() != Ydb::StatusIds::SUCCESS) {
return TStringBuilder() << "ERROR: " << response.operation().status();
}
return "SUCCESS";
}

} // namespace NKqpHelpers
} // namespace NDataShard
} // namespace NKikimr
124 changes: 124 additions & 0 deletions ydb/core/tx/datashard/datashard_ut_snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4540,6 +4540,130 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
}
}

void CompactBorrowed(TTestActorRuntime& runtime, ui64 shardId, const TTableId& tableId) {
auto msg = MakeHolder<TEvDataShard::TEvCompactBorrowed>(tableId.PathId);
auto sender = runtime.AllocateEdgeActor();
runtime.SendToPipe(shardId, sender, msg.Release(), 0, GetPipeConfigWithRetries());
runtime.GrabEdgeEventRethrow<TEvDataShard::TEvCompactBorrowedResult>(sender);
}

Y_UNIT_TEST(PostMergeNotCompactedTooEarly) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false)
.SetDomainPlanResolution(100);

Tests::TServer::TPtr server = new TServer(serverSettings);
auto &runtime = *server->GetRuntime();
auto sender = runtime.AllocateEdgeActor();

runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);

InitRoot(server, sender);

TDisableDataShardLogBatching disableDataShardLogBatching;

KqpSchemeExec(runtime, R"(
CREATE TABLE `/Root/table` (key int, value bytes, PRIMARY KEY (key))
WITH (AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 1,
PARTITION_AT_KEYS = (5));
)");

const auto shards = GetTableShards(server, sender, "/Root/table");
UNIT_ASSERT_VALUES_EQUAL(shards.size(), 2u);
const auto tableId = ResolveTableId(server, sender, "/Root/table");

for (int i = 0; i < 20; ++i) {
Cerr << "... upserting key " << i << Endl;
auto query = Sprintf(R"(
UPSERT INTO `/Root/table` (key, value) VALUES (%d, '%s');
)", i, TString(128 * 1024, 'x').c_str());
ExecSQL(server, sender, query);
if (i >= 5) {
Cerr << "... compacting shard " << shards.at(1) << Endl;
CompactTable(runtime, shards.at(1), tableId, false);
} else if (i == 4) {
Cerr << "... compacting shard " << shards.at(0) << Endl;
CompactTable(runtime, shards.at(0), tableId, false);
}
}

// Read (and snapshot) current data, so it doesn't go away on compaction
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleExec(runtime, "SELECT COUNT(*) FROM `/Root/table`;"),
"{ items { uint64_value: 20 } }");

// Delete all the data in shard 0, this is small and will stay in memtable
// But when borrowed dst compaction will have pressure to compact it all
ExecSQL(server, sender, "DELETE FROM `/Root/table` WHERE key < 5");

std::vector<TEvDataShard::TEvSplitTransferSnapshot::TPtr> snapshots;
auto captureSnapshots = runtime.AddObserver<TEvDataShard::TEvSplitTransferSnapshot>(
[&](TEvDataShard::TEvSplitTransferSnapshot::TPtr& ev) {
auto* msg = ev->Get();
Cerr << "... captured snapshot from " << msg->Record.GetSrcTabletId() << Endl;
snapshots.emplace_back(ev.Release());
});

Cerr << "... merging table" << Endl;
SetSplitMergePartCountLimit(server->GetRuntime(), -1);
ui64 txId = AsyncMergeTable(server, sender, "/Root/table", shards);
Cerr << "... started merge " << txId << Endl;
WaitFor(runtime, [&]{ return snapshots.size() >= 2; }, "both src tablet snapshots");

std::vector<TEvBlobStorage::TEvGet::TPtr> gets;
auto captureGets = runtime.AddObserver<TEvBlobStorage::TEvGet>(
[&](TEvBlobStorage::TEvGet::TPtr& ev) {
auto* msg = ev->Get();
if (msg->Queries[0].Id.TabletID() == shards.at(1)) {
Cerr << "... blocking blob get of " << msg->Queries[0].Id << Endl;
gets.emplace_back(ev.Release());
}
});

// Release snapshot for shard 0 then shard 1
captureSnapshots.Remove();
Cerr << "... unlocking snapshots from tablet " << shards.at(0) << Endl;
for (auto& ev : snapshots) {
if (ev && ev->Get()->Record.GetSrcTabletId() == shards.at(0)) {
runtime.Send(ev.Release(), 0, true);
}
}
Cerr << "... unblocking snapshots from tablet " << shards.at(1) << Endl;
for (auto& ev : snapshots) {
if (ev && ev->Get()->Record.GetSrcTabletId() == shards.at(1)) {
runtime.Send(ev.Release(), 0, true);
}
}

// Let it commit above snapshots and incorrectly compact after the first one is loaded and merged
runtime.SimulateSleep(TDuration::Seconds(1));
UNIT_ASSERT(gets.size() > 0);

Cerr << "... unblocking blob gets" << Endl;
captureGets.Remove();
for (auto& ev : gets) {
runtime.Send(ev.Release(), 0, true);
}

// Let it finish loading the second snapshot
runtime.SimulateSleep(TDuration::Seconds(1));

// Wait for merge to complete and start a borrowed compaction
// When bug is present it will cause newly compacted to part to have epoch larger than previously compacted
WaitTxNotification(server, sender, txId);
const auto merged = GetTableShards(server, sender, "/Root/table");
UNIT_ASSERT_VALUES_EQUAL(merged.size(), 1u);
Cerr << "... compacting borrowed parts in shard " << merged.at(0) << Endl;
CompactBorrowed(runtime, merged.at(0), tableId);

// Validate we have an expected number of rows
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleExec(runtime, "SELECT COUNT(*) FROM `/Root/table`;"),
"{ items { uint64_value: 15 } }");
}

}

} // namespace NKikimr

0 comments on commit 7d3baf9

Please sign in to comment.