Skip to content

Commit

Permalink
24-1 Correctly trigger borrow compaction for shadow data (#4978)
Browse files Browse the repository at this point in the history
  • Loading branch information
kunga authored Jun 3, 2024
1 parent 178cf6e commit 4d22497
Show file tree
Hide file tree
Showing 19 changed files with 289 additions and 180 deletions.
4 changes: 2 additions & 2 deletions ydb/core/tx/datashard/build_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ class TBuildIndexScan : public TActor<TBuildIndexScan>, public NTable::IScan {

EScan Seek(TLead& lead, ui64 seq) noexcept override {
auto ctx = TActivationContext::AsActorContext().MakeFor(SelfId());
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD,
"Seek no " << seq << " " << Debug());
if (seq) {
if (!WriteBuf.IsEmpty()) {
Expand Down Expand Up @@ -367,7 +367,7 @@ class TBuildIndexScan : public TActor<TBuildIndexScan>, public NTable::IScan {

EScan Feed(TArrayRef<const TCell> key, const TRow& row) noexcept override {
auto ctx = TActivationContext::AsActorContext().MakeFor(SelfId());
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD,
"Feed key " << DebugPrintPoint(KeyTypes, key, *AppData()->TypeRegistry)
<< " " << Debug());

Expand Down
64 changes: 39 additions & 25 deletions ydb/core/tx/datashard/datashard__compact_borrowed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,45 +21,59 @@ class TDataShard::TTxCompactBorrowed : public NTabletFlatExecutor::TTransactionB
<< " for table " << pathId
<< " at tablet " << Self->TabletID());

auto response = MakeHolder<TEvDataShard::TEvCompactBorrowedResult>(Self->TabletID(), pathId);
auto nothingToCompactResult = MakeHolder<TEvDataShard::TEvCompactBorrowedResult>(Self->TabletID(), pathId);

if (pathId.OwnerId != Self->GetPathOwnerId()) {
// Ignore unexpected owner
ctx.Send(Ev->Sender, std::move(response));
if (pathId.OwnerId != Self->GetPathOwnerId()) { // ignore unexpected owner
ctx.Send(Ev->Sender, std::move(nothingToCompactResult));
return true;
}

auto it = Self->TableInfos.find(pathId.LocalPathId);
if (it == Self->TableInfos.end()) {
// Ignore unexpected table (may normally happen with races)
ctx.Send(Ev->Sender, std::move(response));
if (it == Self->TableInfos.end()) { // ignore unexpected table (may normally happen with races)
ctx.Send(Ev->Sender, std::move(nothingToCompactResult));
return true;
}

const TUserTable& tableInfo = *it->second;

THashSet<ui32> tablesToCompact;
if (txc.DB.HasBorrowed(tableInfo.LocalTid, Self->TabletID())) {
tablesToCompact.insert(tableInfo.LocalTid);
}
if (tableInfo.ShadowTid && txc.DB.HasBorrowed(tableInfo.ShadowTid, Self->TabletID())) {
tablesToCompact.insert(tableInfo.ShadowTid);
}

auto waiter = MakeIntrusive<TCompactBorrowedWaiter>(Ev->Sender, pathId.LocalPathId);

bool hasBorrowed = txc.DB.HasBorrowed(tableInfo.LocalTid, Self->TabletID());
if (!hasBorrowed) {
for (auto tableToCompact : tablesToCompact) {
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
"TEvCompactBorrowed request from " << Ev->Sender
<< " for table " << pathId
<< " has no borrowed parts"
<< " starting compaction for local table " << tableToCompact
<< " at tablet " << Self->TabletID());
ctx.Send(Ev->Sender, std::move(response));
return true;
}

LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
"TEvCompactBorrowed request from " << Ev->Sender
<< " for table " << pathId
<< " starting compaction for local table " << tableInfo.LocalTid
<< " at tablet " << Self->TabletID());

Self->Executor()->CompactBorrowed(tableInfo.LocalTid);
Self->IncCounter(COUNTER_TX_COMPACT_BORROWED);
++tableInfo.Stats.CompactBorrowedCount;
if (Self->Executor()->CompactBorrowed(tableToCompact)) {
Self->IncCounter(COUNTER_TX_COMPACT_BORROWED);
++tableInfo.Stats.CompactBorrowedCount;

waiter->CompactingTables.insert(tableToCompact);
Self->CompactBorrowedWaiters[tableToCompact].push_back(waiter);
} else {
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
"TEvCompactBorrowed request from " << Ev->Sender
<< " for table " << pathId
<< " can not be compacted"
<< " at tablet " << Self->TabletID());
}
}

Self->CompactBorrowedWaiters[tableInfo.LocalTid].emplace_back(Ev->Sender);
if (waiter->CompactingTables.empty()) { // none has been triggered
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
"TEvCompactBorrowed request from " << Ev->Sender
<< " for table " << pathId
<< " has no parts for borrowed compaction"
<< " at tablet " << Self->TabletID());
ctx.Send(Ev->Sender, std::move(nothingToCompactResult));
}

return true;
}
Expand Down
52 changes: 31 additions & 21 deletions ydb/core/tx/datashard/datashard__compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,12 @@ void TDataShard::Handle(TEvDataShard::TEvCompactTable::TPtr& ev, const TActorCon
void TDataShard::CompactionComplete(ui32 tableId, const TActorContext &ctx) {
auto finishedInfo = Executor()->GetFinishedCompactionInfo(tableId);

LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
"CompactionComplete of tablet# "<< TabletID()
<< ", table# " << tableId
<< ", finished edge# " << finishedInfo.Edge
<< ", ts " << finishedInfo.FullCompactionTs);

TLocalPathId localPathId = InvalidLocalPathId;
if (tableId >= Schema::MinLocalTid) {
for (auto& ti : TableInfos) {
Expand Down Expand Up @@ -234,11 +240,12 @@ void TDataShard::ReplyCompactionWaiters(
<< ", finished edge# " << compactionInfo.Edge
<< ", front# " << (CompactionWaiters[tableId].empty() ? 0UL : std::get<0>(CompactionWaiters[tableId].front())));

auto& fullCompactionQueue = CompactionWaiters[tableId];
while (!fullCompactionQueue.empty()) {
const auto& waiter = CompactionWaiters[tableId].front();
if (std::get<0>(waiter) > compactionInfo.Edge)
auto fullCompactionQueue = CompactionWaiters.FindPtr(tableId);
while (fullCompactionQueue && !fullCompactionQueue->empty()) {
const auto& waiter = fullCompactionQueue->front();
if (std::get<0>(waiter) > compactionInfo.Edge) {
break;
}

const auto& sender = std::get<1>(waiter);
auto response = MakeHolder<TEvDataShard::TEvCompactTableResult>(
Expand All @@ -252,27 +259,30 @@ void TDataShard::ReplyCompactionWaiters(
"Sending TEvCompactTableResult to# " << sender
<< "pathId# " << TPathId(GetPathOwnerId(), localPathId));

fullCompactionQueue.pop_front();
fullCompactionQueue->pop_front();
}

auto& compactBorrowedQueue = CompactBorrowedWaiters[tableId];
if (!compactBorrowedQueue.empty()) {
auto compactBorrowedQueue = CompactBorrowedWaiters.FindPtr(tableId);
if (compactBorrowedQueue && !compactBorrowedQueue->empty()) {
const bool hasBorrowed = Executor()->HasBorrowed(tableId, TabletID());
if (!hasBorrowed) {
while (!compactBorrowedQueue.empty()) {
const auto& waiter = compactBorrowedQueue.front();

auto response = MakeHolder<TEvDataShard::TEvCompactBorrowedResult>(
TabletID(),
GetPathOwnerId(),
localPathId);
ctx.Send(waiter, std::move(response));

LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
"Sending TEvCompactBorrowedResult to# " << waiter
<< "pathId# " << TPathId(GetPathOwnerId(), localPathId));

compactBorrowedQueue.pop_front();
while (!compactBorrowedQueue->empty()) {
const auto& waiter = compactBorrowedQueue->front();
waiter->CompactingTables.erase(tableId);

if (waiter->CompactingTables.empty()) { // all requested tables have been compacted
auto response = MakeHolder<TEvDataShard::TEvCompactBorrowedResult>(
TabletID(),
GetPathOwnerId(),
waiter->RequestedTable);
ctx.Send(waiter->ActorId, std::move(response));

LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
"Sending TEvCompactBorrowedResult to# " << waiter->ActorId
<< "pathId# " << TPathId(GetPathOwnerId(), waiter->RequestedTable));
}

compactBorrowedQueue->pop_front();
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/datashard/datashard__stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,9 @@ void TDataShard::Handle(TEvPrivate::TEvAsyncTableStats::TPtr& ev, const TActorCo
Actors.erase(ev->Sender);

ui64 tableId = ev->Get()->TableId;
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Stats rebuilt at datashard " << TabletID() << ", for tableId " << tableId);
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Stats rebuilt at datashard " << TabletID() << ", for tableId " << tableId
<< ": RowCount " << ev->Get()->Stats.RowCount << ", DataSize " << ev->Get()->Stats.DataSize.Size
<< (ev->Get()->PartOwners.size() > 1 || ev->Get()->PartOwners.size() == 1 && *ev->Get()->PartOwners.begin() != TabletID() ? ", with borrowed parts" : ""));

i64 dataSize = 0;
if (TableInfos.contains(tableId)) {
Expand Down
13 changes: 11 additions & 2 deletions ydb/core/tx/datashard/datashard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -2822,10 +2822,19 @@ class TDataShard
// from the front
THashMap<ui32, TCompactionWaiterList> CompactionWaiters;

using TCompactBorrowedWaiterList = TList<TActorId>;
struct TCompactBorrowedWaiter : public TThrRefBase {
TCompactBorrowedWaiter(TActorId actorId, TLocalPathId requestedTable)
: ActorId(actorId)
, RequestedTable(requestedTable)
{ }

TActorId ActorId;
TLocalPathId RequestedTable;
THashSet<ui32> CompactingTables;
};

// tableLocalTid -> waiters, similar to CompactionWaiters
THashMap<ui32, TCompactBorrowedWaiterList> CompactBorrowedWaiters;
THashMap<ui32, TList<TIntrusivePtr<TCompactBorrowedWaiter>>> CompactBorrowedWaiters;

struct TReplicationSourceOffsetsReceiveState {
// A set of tables for which we already received offsets
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/datashard/datashard_loans.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ NTabletFlatExecutor::ITransaction* TDataShard::CreateTxInitiateBorrowedPartsRetu
}

void TDataShard::CompletedLoansChanged(const TActorContext &ctx) {
LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, TabletID() << " CompletedLoansChanged");
Y_ABORT_UNLESS(Executor()->GetStats().CompactedPartLoans);

CheckInitiateBorrowedPartsReturn(ctx);
Expand Down
101 changes: 98 additions & 3 deletions ydb/core/tx/datashard/datashard_ut_build_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ Y_UNIT_TEST_SUITE(TTxDataShardBuildIndexScan) {
CreateShardedTable(server, sender, root, name, opts);
}

Y_UNIT_TEST(TestRunScan) {
Y_UNIT_TEST(RunScan) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
Expand Down Expand Up @@ -130,8 +130,7 @@ Y_UNIT_TEST_SUITE(TTxDataShardBuildIndexScan) {
// Alter table: disable shadow data and change compaction policy
auto policy = NLocalDb::CreateDefaultUserTablePolicy();
policy->KeepEraseMarkers = false;
WaitTxNotification(server,
AsyncAlterAndDisableShadow(server, "/Root", "table-2", policy.Get()));
WaitTxNotification(server, AsyncAlterAndDisableShadow(server, "/Root", "table-2", policy.Get()));

// Shadow data must be visible now
auto data2 = ReadShardedTable(server, "/Root/table-2");
Expand All @@ -140,6 +139,102 @@ Y_UNIT_TEST_SUITE(TTxDataShardBuildIndexScan) {
"value = 300, key = 3\n"
"value = 500, key = 5\n");
}

Y_UNIT_TEST(ShadowBorrowCompaction) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings
.SetDomainName("Root")
.SetUseRealThreads(false);

Tests::TServer::TPtr server = new TServer(serverSettings);
auto &runtime = *server->GetRuntime();
auto sender = runtime.AllocateEdgeActor();

runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_DEBUG);
runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NLog::PRI_DEBUG);

// Allow manipulating shadow data using normal schemeshard operations
runtime.GetAppData().AllowShadowDataInSchemeShardForTests = true;

InitRoot(server, sender);

CreateShardedTable(server, sender, "/Root", "table-1", 1, false);

// Upsert some initial values
ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 100), (2, 200), (3, 300), (4, 400), (5, 500);");

CreateShardedTableForIndex(server, sender, "/Root", "table-2", 1, false);

auto observer = runtime.AddObserver<TEvDataShard::TEvCompactBorrowed>([&](TEvDataShard::TEvCompactBorrowed::TPtr& event) {
Cerr << "Captured TEvDataShard::TEvCompactBorrowed from " << runtime.FindActorName(event->Sender) << " to " << runtime.FindActorName(event->GetRecipientRewrite()) << Endl;
if (runtime.FindActorName(event->Sender) == "FLAT_SCHEMESHARD_ACTOR") {
event.Reset();
}
});

auto snapshot = CreateVolatileSnapshot(server, { "/Root/table-1" });

DoBuildIndex(server, sender, "/Root/table-1", "/Root/table-2", snapshot, NKikimrTxDataShard::TEvBuildIndexProgressResponse::DONE);

// Writes to shadow data should not be visible yet
auto data = ReadShardedTable(server, "/Root/table-2");
UNIT_ASSERT_VALUES_EQUAL(data, "");

// Split index
auto shards1 = GetTableShards(server, sender, "/Root/table-2");
UNIT_ASSERT_VALUES_EQUAL(shards1.size(), 1u);

// Split would fail otherwise :(
SetSplitMergePartCountLimit(server->GetRuntime(), -1);

auto senderSplit = runtime.AllocateEdgeActor();
ui64 txId = AsyncSplitTable(server, senderSplit, "/Root/table-2", shards1.at(0), 300);
WaitTxNotification(server, senderSplit, txId);

auto shards2 = GetTableShards(server, sender, "/Root/table-2");
UNIT_ASSERT_VALUES_EQUAL(shards2.size(), 2u);

for (auto shardIndex : xrange(2u)) {
auto stats = WaitTableStats(runtime, shards2.at(shardIndex));
// Cerr << "Received shard stats:" << Endl << stats.DebugString() << Endl;

UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), shardIndex == 0 ? 2 : 3);

THashSet<ui64> owners(stats.GetUserTablePartOwners().begin(), stats.GetUserTablePartOwners().end());
// Note: datashard always adds current shard to part owners, even if there are no parts
UNIT_ASSERT_VALUES_EQUAL(owners, (THashSet<ui64>{shards1.at(0), shards2.at(shardIndex)}));

auto tableId = ResolveTableId(server, sender, "/Root/table-2");
auto result = CompactBorrowed(runtime, shards2.at(shardIndex), tableId);
// Cerr << "Compact result " << result.DebugString() << Endl;
UNIT_ASSERT_VALUES_EQUAL(result.GetTabletId(), shards2.at(shardIndex));
UNIT_ASSERT_VALUES_EQUAL(result.GetPathId().GetOwnerId(), tableId.PathId.OwnerId);
UNIT_ASSERT_VALUES_EQUAL(result.GetPathId().GetLocalId(), tableId.PathId.LocalPathId);

for (int i = 0; i < 5 && (owners.size() > 1 || owners.contains(shards1.at(0))); ++i) {
auto stats = WaitTableStats(runtime, shards2.at(shardIndex));
owners = THashSet<ui64>(stats.GetUserTablePartOwners().begin(), stats.GetUserTablePartOwners().end());
}

UNIT_ASSERT_VALUES_EQUAL(owners, (THashSet<ui64>{shards2.at(shardIndex)}));
}

// Alter table: disable shadow data and change compaction policy
auto policy = NLocalDb::CreateDefaultUserTablePolicy();
policy->KeepEraseMarkers = false;
WaitTxNotification(server, AsyncAlterAndDisableShadow(server, "/Root", "table-2", policy.Get()));

// Shadow data must be visible now
auto data2 = ReadShardedTable(server, "/Root/table-2");
UNIT_ASSERT_VALUES_EQUAL(data2,
"value = 100, key = 1\n"
"value = 200, key = 2\n"
"value = 300, key = 3\n"
"value = 400, key = 4\n"
"value = 500, key = 5\n");
}
}

} // namespace NKikimr
Loading

0 comments on commit 4d22497

Please sign in to comment.