Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

24-1: Cleanup persistent locks on table rename #4979

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion ydb/core/tx/datashard/datashard.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "datashard_impl.h"
#include "datashard_txs.h"
#include "datashard_locks_db.h"
#include "probes.h"

#include <ydb/core/base/interconnect_channels.h>
Expand Down Expand Up @@ -1620,7 +1621,9 @@ TUserTable::TPtr TDataShard::MoveUserTable(TOperation::TPtr op, const NKikimrTxD
newTableInfo->StatsUpdateInProgress = false;
newTableInfo->StatsNeedUpdate = true;

RemoveUserTable(prevId);
TDataShardLocksDb locksDb(*this, txc);

RemoveUserTable(prevId, &locksDb);
AddUserTable(newId, newTableInfo);

for (auto& [_, record] : ChangesQueue) {
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tx/datashard/datashard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1607,10 +1607,10 @@ class TDataShard
return nullptr;
}

void RemoveUserTable(const TPathId& tableId) {
TableInfos.erase(tableId.LocalPathId);
SysLocks.RemoveSchema(tableId);
void RemoveUserTable(const TPathId& tableId, ILocksDb* locksDb) {
SysLocks.RemoveSchema(tableId, locksDb);
Pipeline.GetDepTracker().RemoveSchema(tableId);
TableInfos.erase(tableId.LocalPathId);
}

void AddUserTable(const TPathId& tableId, TUserTable::TPtr tableInfo) {
Expand Down
13 changes: 12 additions & 1 deletion ydb/core/tx/datashard/datashard_locks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -624,12 +624,23 @@ void TLockLocker::UpdateSchema(const TPathId& tableId, const TUserTable& tableIn
table->UpdateKeyColumnsTypes(tableInfo.KeyColumnTypes);
}

void TLockLocker::RemoveSchema(const TPathId& tableId) {
void TLockLocker::RemoveSchema(const TPathId& tableId, ILocksDb* db) {
// Make sure all persistent locks are removed from the database
for (auto& pr : Locks) {
if (pr.second->IsPersistent()) {
pr.second->PersistRemoveLock(db);
}
pr.second->OnRemoved();
}

Tables.erase(tableId);
Y_ABORT_UNLESS(Tables.empty());
Locks.clear();
ShardLocks.clear();
ExpireQueue.Clear();
BrokenLocks.Clear();
BrokenPersistentLocks.Clear();
BrokenLocksCount_ = 0;
CleanupPending.clear();
CleanupCandidates.clear();
PendingSubscribeLocks.clear();
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tx/datashard/datashard_locks.h
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ class TLockLocker {
}

void UpdateSchema(const TPathId& tableId, const TUserTable& tableInfo);
void RemoveSchema(const TPathId& tableId);
void RemoveSchema(const TPathId& tableId, ILocksDb* db);
bool ForceShardLock(const TPathId& tableId) const;
bool ForceShardLock(const TIntrusiveList<TTableLocks, TTableLocksReadListTag>& readTables) const;

Expand Down Expand Up @@ -840,8 +840,8 @@ class TSysLocks {
Locker.UpdateSchema(tableId, tableInfo);
}

void RemoveSchema(const TPathId& tableId) {
Locker.RemoveSchema(tableId);
void RemoveSchema(const TPathId& tableId, ILocksDb* db) {
Locker.RemoveSchema(tableId, db);
}

TVector<TLock> ApplyLocks();
Expand Down
77 changes: 77 additions & 0 deletions ydb/core/tx/datashard/datashard_ut_snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4896,6 +4896,83 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
"{ items { int32_value: 2 } items { int32_value: 20 } }");
}

Y_UNIT_TEST(UncommittedChangesRenameTable) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false)
.SetDomainPlanResolution(100)
.SetEnableDataShardVolatileTransactions(true);

Tests::TServer::TPtr server = new TServer(serverSettings);
auto &runtime = *server->GetRuntime();
auto sender = runtime.AllocateEdgeActor();

runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);

InitRoot(server, sender);

TDisableDataShardLogBatching disableDataShardLogBatching;

UNIT_ASSERT_VALUES_EQUAL(
KqpSchemeExec(runtime, R"(
CREATE TABLE `/Root/table1` (key int, value int, PRIMARY KEY (key));
)"),
"SUCCESS");

ExecSQL(server, sender, "UPSERT INTO `/Root/table1` (key, value) VALUES (2, 22);");

TString sessionId = CreateSessionRPC(runtime);
TString txId;
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleBegin(runtime, sessionId, txId, R"(
UPSERT INTO `/Root/table1` (key, value) VALUES (1, 11), (3, 33);
SELECT key, value FROM `/Root/table1` ORDER BY key;
)"),
"{ items { int32_value: 1 } items { int32_value: 11 } }, "
"{ items { int32_value: 2 } items { int32_value: 22 } }, "
"{ items { int32_value: 3 } items { int32_value: 33 } }");

auto shards = GetTableShards(server, sender, "/Root/table1");
auto tableId1 = ResolveTableId(server, sender, "/Root/table1");

// Check shard has open transactions
{
runtime.SendToPipe(shards.at(0), sender, new TEvDataShard::TEvGetOpenTxs(tableId1.PathId));
auto ev = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvGetOpenTxsResult>(sender);
UNIT_ASSERT_C(!ev->Get()->OpenTxs.empty(), "at shard " << shards.at(0));
}

WaitTxNotification(server, sender, AsyncMoveTable(server, "/Root/table1", "/Root/table1moved"));
auto tableId2 = ResolveTableId(server, sender, "/Root/table1moved");

runtime.SimulateSleep(TDuration::Seconds(1));

// Check shard doesn't have open transactions
{
runtime.SendToPipe(shards.at(0), sender, new TEvDataShard::TEvGetOpenTxs(tableId2.PathId));
auto ev = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvGetOpenTxsResult>(sender);
UNIT_ASSERT_C(ev->Get()->OpenTxs.empty(), "at shard " << shards.at(0));
}

RebootTablet(runtime, shards.at(0), sender);

// The original table was removed
// We must not be able to commit the transaction
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleCommit(runtime, sessionId, txId, "SELECT 1"),
"ERROR: ABORTED");

runtime.SimulateSleep(TDuration::Seconds(1));

// Check shard doesn't have open transactions
{
runtime.SendToPipe(shards.at(0), sender, new TEvDataShard::TEvGetOpenTxs(tableId2.PathId));
auto ev = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvGetOpenTxsResult>(sender);
UNIT_ASSERT_C(ev->Get()->OpenTxs.empty(), "at shard " << shards.at(0));
}
}

}

} // namespace NKikimr
Loading