Skip to content

Commit

Permalink
Remove change senders upon DROP TABLE KIKIMR-20678 (#901)
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL authored Jan 11, 2024
1 parent d95e3ab commit 7173846
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 5 deletions.
28 changes: 25 additions & 3 deletions ydb/core/tx/datashard/change_sender_async_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,14 @@ class TAsyncIndexChangeSenderMain
return;
}

if (entry.Self && entry.Self->Info.GetPathState() == NKikimrSchemeOp::EPathStateDrop) {
LOG_D("Index is planned to drop, waiting for the EvRemoveSender command");

RemoveRecords();
KillSenders();
return Become(&TThis::StatePendingRemove);
}

Y_ABORT_UNLESS(entry.ListNodeEntry->Children.size() == 1);
const auto& indexTable = entry.ListNodeEntry->Children.at(0);

Expand All @@ -559,7 +567,7 @@ class TAsyncIndexChangeSenderMain
STATEFN(StateResolveIndexTable) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleIndexTable);
sFunc(TEvents::TEvWakeup, ResolveIndexTable);
sFunc(TEvents::TEvWakeup, ResolveIndex);
default:
return StateBase(ev);
}
Expand Down Expand Up @@ -638,7 +646,7 @@ class TAsyncIndexChangeSenderMain
STATEFN(StateResolveKeys) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvTxProxySchemeCache::TEvResolveKeySetResult, HandleKeys);
sFunc(TEvents::TEvWakeup, ResolveIndexTable);
sFunc(TEvents::TEvWakeup, ResolveIndex);
default:
return StateBase(ev);
}
Expand Down Expand Up @@ -690,7 +698,7 @@ class TAsyncIndexChangeSenderMain
}

void Resolve() override {
ResolveIndexTable();
ResolveIndex();
}

bool IsResolved() const override {
Expand Down Expand Up @@ -758,6 +766,11 @@ class TAsyncIndexChangeSenderMain
PassAway();
}

void AutoRemove(TEvChangeExchange::TEvEnqueueRecords::TPtr& ev) {
LOG_D("Handle " << ev->Get()->ToString());
RemoveRecords(std::move(ev->Get()->Records));
}

void Handle(NMon::TEvRemoteHttpInfo::TPtr& ev, const TActorContext& ctx) {
RenderHtmlPage(ESenderType::AsyncIndex, ev, ctx);
}
Expand Down Expand Up @@ -797,6 +810,15 @@ class TAsyncIndexChangeSenderMain
}
}

STFUNC(StatePendingRemove) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvChangeExchange::TEvEnqueueRecords, AutoRemove);
hFunc(TEvChangeExchange::TEvRemoveSender, Handle);
HFunc(NMon::TEvRemoteHttpInfo, Handle);
sFunc(TEvents::TEvPoison, PassAway);
}
}

private:
const TTableId UserTableId;
mutable TMaybe<TString> LogPrefix;
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/datashard/datashard_change_sending.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,8 @@ class TDataShard::TTxRemoveChangeRecords: public TTransactionBase<TDataShard> {
Self->ChangeSenderActivator.DoSend(dstTabletId, ctx);
}
}

Self->CheckStateChange(ctx);
}

private:
Expand Down
22 changes: 20 additions & 2 deletions ydb/core/tx/datashard/drop_table_unit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class TDropTableUnit : public TExecutionUnit {
const TActorContext &ctx) override;

private:
TVector<THolder<TEvChangeExchange::TEvRemoveSender>> RemoveSenders;
};

TDropTableUnit::TDropTableUnit(TDataShard &dataShard,
Expand Down Expand Up @@ -75,6 +76,20 @@ EExecutionStatus TDropTableUnit::Execute(TOperation::TPtr op,
Y_ABORT_UNLESS(DataShard.GetPathOwnerId() == schemeTx.GetDropTable().GetPathId().GetOwnerId());
tableId = schemeTx.GetDropTable().GetPathId().GetLocalId();
}

auto it = DataShard.GetUserTables().find(tableId);
Y_ABORT_UNLESS(it != DataShard.GetUserTables().end());
{
for (const auto& [indexPathId, indexInfo] : it->second->Indexes) {
if (indexInfo.Type == TUserTable::TTableIndex::EIndexType::EIndexTypeGlobalAsync) {
RemoveSenders.emplace_back(new TEvChangeExchange::TEvRemoveSender(indexPathId));
}
}
for (const auto& [streamPathId, _] : it->second->CdcStreams) {
RemoveSenders.emplace_back(new TEvChangeExchange::TEvRemoveSender(streamPathId));
}
}

DataShard.DropUserTable(txc, tableId);

// FIXME: transactions need to specify ownerId
Expand All @@ -96,12 +111,15 @@ EExecutionStatus TDropTableUnit::Execute(TOperation::TPtr op,
BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE);
op->Result()->SetStepOrderId(op->GetStepOrder().ToPair());

return EExecutionStatus::ExecutedNoMoreRestarts;
return EExecutionStatus::DelayCompleteNoMoreRestarts;
}

void TDropTableUnit::Complete(TOperation::TPtr,
const TActorContext &)
const TActorContext &ctx)
{
for (auto& ev : RemoveSenders) {
ctx.Send(DataShard.GetChangeSender(), ev.Release());
}
}

THolder<TExecutionUnit> CreateDropTableUnit(TDataShard &dataShard,
Expand Down
52 changes: 52 additions & 0 deletions ydb/core/tx/schemeshard/ut_index/ut_async_index.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include <ydb/core/base/path.h>
#include <ydb/core/scheme/scheme_tablecell.h>
#include <ydb/core/tx/datashard/change_exchange.h>
#include <ydb/core/tx/schemeshard/ut_helpers/helpers.h>
#include <ydb/core/tx/schemeshard/ut_helpers/test_with_reboots.h>
#include <ydb/core/testlib/tablet_helpers.h>
Expand Down Expand Up @@ -360,4 +361,55 @@ Y_UNIT_TEST_SUITE(TAsyncIndexTests) {
t.TestEnv->TestWaitNotification(runtime, t.TxId);
});
}

Y_UNIT_TEST_WITH_REBOOTS(DropTableWithInflightChanges) {
T t;
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
auto origObserver = runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
return TTestActorRuntime::DefaultObserverFunc(ev);
});

TVector<THolder<IEventHandle>> enqueued;
runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
if (ev->GetTypeRewrite() == NDataShard::TEvChangeExchange::EvEnqueueRecords) {
enqueued.emplace_back(ev.Release());
return TTestActorRuntime::EEventAction::DROP;
}
return origObserver(ev);
});

{
TInactiveZone inactive(activeZone);
TestCreateIndexedTable(runtime, ++t.TxId, "/MyRoot", R"(
TableDescription {
Name: "Table"
Columns { Name: "key" Type: "Uint32" }
Columns { Name: "indexed" Type: "Uint32" }
KeyColumnNames: ["key"]
}
IndexDescription {
Name: "UserDefinedIndex"
KeyColumnNames: ["indexed"]
Type: EIndexTypeGlobalAsync
}
)");
t.TestEnv->TestWaitNotification(runtime, t.TxId);

Prepare(runtime, "/MyRoot/Table", {1, 10, 100}, true);
}

TestDropTable(runtime, ++t.TxId, "/MyRoot", "Table");

runtime.SetObserverFunc(origObserver);
for (auto& ev : std::exchange(enqueued, {})) {
runtime.Send(ev.Release(), 0, true);
}

t.TestEnv->TestWaitNotification(runtime, t.TxId);
t.TestEnv->TestWaitTabletDeletion(runtime, {
TTestTxConfig::FakeHiveTablets,
TTestTxConfig::FakeHiveTablets + 1,
});
});
}
}

0 comments on commit 7173846

Please sign in to comment.