Skip to content

Commit

Permalink
Restart PDisk actor on PDisk restart (#4820)
Browse files Browse the repository at this point in the history
Fixes #4726
  • Loading branch information
SammyVimes authored May 27, 2024
1 parent 06552cf commit 2f366dc
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 28 deletions.
19 changes: 19 additions & 0 deletions ydb/apps/dstool/lib/dstool_cmd_cluster_workload_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def add_options(p):
p.add_argument('--enable-pdisk-encryption-keys-changes', action='store_true', help='Enable changes of PDisk encryption keys')
p.add_argument('--enable-kill-tablets', action='store_true', help='Enable tablet killer')
p.add_argument('--enable-kill-blob-depot', action='store_true', help='Enable BlobDepot killer')
p.add_argument('--enable-restart-pdisks', action='store_true', help='Enable PDisk restarter')
p.add_argument('--kill-signal', type=str, default='KILL', help='Kill signal to send to restart node')


Expand Down Expand Up @@ -144,6 +145,19 @@ def do_restart(node_id):
if args.enable_pdisk_encryption_keys_changes:
remove_old_pdisk_keys(pdisk_keys, pdisk_key_versions, node_id)

def do_restart_pdisk(node_id, pdisk_id):
assert can_act_on_vslot(node_id, pdisk_id)
request = common.kikimr_bsconfig.TConfigRequest(IgnoreDegradedGroupsChecks=True)
cmd = request.Command.add().RestartPDisk
cmd.TargetPDiskId.NodeId = node_id
cmd.TargetPDiskId.PDiskId = pdisk_id
try:
response = common.invoke_bsc_request(request)
except Exception as e:
raise Exception('failed to perform restart request: %s' % e)
if not response.Success:
raise Exception('Unexpected error from BSC: %s' % response.ErrorDescription)

def do_evict(vslot_id):
assert can_act_on_vslot(*vslot_id)
try:
Expand Down Expand Up @@ -230,13 +244,16 @@ def do_kill_blob_depot():
wipes = []
readonlies = []
unreadonlies = []
pdisk_restarts = []

for vslot in base_config.VSlot:
if common.is_dynamic_group(vslot.GroupId):
vslot_id = common.get_vslot_id(vslot.VSlotId)
vdisk_id = '[%08x:%d:%d:%d]' % (vslot.GroupId, vslot.FailRealmIdx, vslot.FailDomainIdx, vslot.VDiskIdx)
if vslot_id in vslot_readonly and not args.disable_readonly:
unreadonlies.append(('un-readonly vslot id: %s, vdisk id: %s' % (vslot_id, vdisk_id), (do_readonly, vslot, False)))
if can_act_on_vslot(*vslot_id[:2]) and args.enable_restart_pdisks:
pdisk_restarts.append(('restart pdisk node_id: %d, pdisk_id: %d' % vslot_id[:2], (do_restart_pdisk, *vslot_id[:2])))
if can_act_on_vslot(*vslot_id) and (recent_restarts or args.disable_restarts):
if not args.disable_evicts:
evicts.append(('evict vslot id: %s, vdisk id: %s' % (vslot_id, vdisk_id), (do_evict, vslot_id)))
Expand All @@ -258,6 +275,8 @@ def pick(v):
possible_actions.append(('readonly', (pick, readonlies)))
if unreadonlies:
possible_actions.append(('un-readonly', (pick, unreadonlies)))
if pdisk_restarts:
possible_actions.append(('restart-pdisk', (pick, pdisk_restarts)))

restarts = []

Expand Down
2 changes: 1 addition & 1 deletion ydb/apps/dstool/lib/dstool_cmd_group_virtual_create.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def do(args):
names_remaining.remove(group.VirtualGroupInfo.Name)
elif group.VirtualGroupInfo.State == common.EVirtualGroupState.CREATE_FAILED:
names_remaining.remove(group.VirtualGroupInfo.Name)
errors.append(f'{group.VirtualGroupInfo.Name}: {group.ErrorReason}')
errors.append(f'{group.VirtualGroupInfo.Name}: {group.VirtualGroupInfo.ErrorReason}')

if names_remaining:
time.sleep(1)
Expand Down
21 changes: 14 additions & 7 deletions ydb/core/blobstorage/nodewarden/node_warden_pdisk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,23 +273,30 @@ namespace NKikimr::NStorage {
void TNodeWarden::DoRestartLocalPDisk(const NKikimrBlobStorage::TNodeWardenServiceSet::TPDisk& pdisk) {
ui32 pdiskId = pdisk.GetPDiskID();

const TActorId actorId = MakeBlobStoragePDiskID(LocalNodeId, pdiskId);
STLOG(PRI_NOTICE, BS_NODE, NW75, "DoRestartLocalPDisk", (PDiskId, pdiskId));

const auto [_, inserted] = PDiskRestartInFlight.emplace(pdiskId);

if (!inserted) {
STLOG(PRI_NOTICE, BS_NODE, NW76, "Restart already in progress", (PDiskId, pdiskId));
// Restart is already in progress.
return;
}

auto it = LocalPDisks.find(TPDiskKey(LocalNodeId, pdiskId));
if (it == LocalPDisks.end()) {
PDiskRestartInFlight.erase(pdiskId);

STLOG(PRI_NOTICE, BS_NODE, NW77, "Restart state carried from previous start, just starting", (PDiskId, pdiskId));

// This can happen if warden didn't handle pdisk's restart before node's restart.
// In this case, PDisk has EntityStatus::RESTART instead of EntityStatus::INITIAL.
StartLocalPDisk(pdisk);
SendPDiskReport(pdiskId, NKikimrBlobStorage::TEvControllerNodeReport::PD_RESTARTED);
return;
}

const auto [_, inserted] = PDiskRestartInFlight.emplace(pdiskId);

if (!inserted) {
// Restart is already in progress.
return;
}
const TActorId actorId = MakeBlobStoragePDiskID(LocalNodeId, pdiskId);

TIntrusivePtr<TPDiskConfig> pdiskConfig = CreatePDiskConfig(it->second.Record);

Expand Down
59 changes: 45 additions & 14 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,21 @@ namespace NPDisk {

LWTRACE_USING(BLOBSTORAGE_PROVIDER);

void CreatePDiskActor(
TGenericExecutorThread& executorThread,
const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters,
const TIntrusivePtr<TPDiskConfig> &cfg,
const NPDisk::TMainKey &mainKey,
ui32 pDiskID, ui32 poolId, ui32 nodeId
) {

TActorId actorId = executorThread.RegisterActor(CreatePDisk(cfg, mainKey, counters), TMailboxType::ReadAsFilled, poolId);

TActorId pDiskServiceId = MakeBlobStoragePDiskID(nodeId, pDiskID);

executorThread.ActorSystem->RegisterLocalService(pDiskServiceId, actorId);
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// PDisk Actor
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -993,29 +1008,48 @@ class TPDiskActor : public TActorBootstrapped<TPDiskActor> {
return;
}

if (PendingRestartResponse) {
PendingRestartResponse(restartAllowed, ev->Get()->Details);
PendingRestartResponse = {};
}

if (restartAllowed) {
MainKey = ev->Get()->MainKey;
NPDisk::TMainKey newMainKey = ev->Get()->MainKey;

SecureWipeBuffer((ui8*)ev->Get()->MainKey.Keys.data(), sizeof(NPDisk::TKey) * ev->Get()->MainKey.Keys.size());

LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::BS_PDISK, "PDiskId# " << PDisk->PDiskId
<< " Going to restart PDisk since recieved TEvAskWardenRestartPDiskResult");

const TActorIdentity& thisActorId = SelfId();
ui32 nodeId = thisActorId.NodeId();
ui32 poolId = thisActorId.PoolID();
ui32 pdiskId = PDisk->PDiskId;

PDisk->Stop();

TIntrusivePtr<TPDiskConfig> actorCfg = std::move(Cfg);

auto& newCfg = ev->Get()->Config;

if (newCfg) {
Y_VERIFY_S(Cfg->PDiskId == PDisk->PDiskId,
"New config's PDiskId# " << newCfg->PDiskId << " is not equal to real PDiskId# " << PDisk->PDiskId);
Cfg = std::move(newCfg);
Y_VERIFY_S(newCfg->PDiskId == pdiskId,
"New config's PDiskId# " << newCfg->PDiskId << " is not equal to real PDiskId# " << pdiskId);

actorCfg = std::move(newCfg);
}

StartPDiskThread();
const TActorContext& actorCtx = ActorContext();

Send(ev->Sender, new TEvBlobStorage::TEvNotifyWardenPDiskRestarted(PDisk->PDiskId));
}
auto& counters = AppData(actorCtx)->Counters;

if (PendingRestartResponse) {
PendingRestartResponse(restartAllowed, ev->Get()->Details);
PendingRestartResponse = {};
TGenericExecutorThread& executorThread = actorCtx.ExecutorThread;

PassAway();

CreatePDiskActor(executorThread, counters, actorCfg, newMainKey, pdiskId, poolId, nodeId);

Send(ev->Sender, new TEvBlobStorage::TEvNotifyWardenPDiskRestarted(pdiskId));
}
}

Expand Down Expand Up @@ -1290,10 +1324,7 @@ IActor* CreatePDisk(const TIntrusivePtr<TPDiskConfig> &cfg, const NPDisk::TMainK

void TRealPDiskServiceFactory::Create(const TActorContext &ctx, ui32 pDiskID,
const TIntrusivePtr<TPDiskConfig> &cfg, const NPDisk::TMainKey &mainKey, ui32 poolId, ui32 nodeId) {
TActorId actorId = ctx.ExecutorThread.RegisterActor(
CreatePDisk(cfg, mainKey, AppData(ctx)->Counters), TMailboxType::ReadAsFilled, poolId);
TActorId pDiskServiceId = MakeBlobStoragePDiskID(nodeId, pDiskID);
ctx.ExecutorThread.ActorSystem->RegisterLocalService(pDiskServiceId, actorId);
CreatePDiskActor(ctx.ExecutorThread, AppData(ctx)->Counters, cfg, mainKey, pDiskID, poolId, nodeId);
}

} // NKikimr
1 change: 1 addition & 0 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ void TPDisk::Stop() {
LOG_NOTICE_S(*ActorSystem, NKikimrServices::BS_PDISK, "PDiskId# " << PDiskId
<< " shutdown owner info# " << StartupOwnerInfo());
}

BlockDevice->Stop();

// BlockDevice is stopped, the data will NOT hit the disk.
Expand Down
12 changes: 6 additions & 6 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -503,8 +503,9 @@ Y_UNIT_TEST_SUITE(TPDiskTest) {
TVDiskMock vdisk(&testCtx);
vdisk.InitFull();
vdisk.SendEvLogSync();
testCtx.Send(new TEvBlobStorage::TEvAskWardenRestartPDiskResult(testCtx.GetPDisk()->PDiskId, testCtx.MainKey, true, nullptr));
const auto evInitRes = testCtx.Recv<TEvBlobStorage::TEvNotifyWardenPDiskRestarted>();

testCtx.StartPDiskRestart();

vdisk.InitFull();
vdisk.SendEvLogSync();
}
Expand Down Expand Up @@ -928,8 +929,7 @@ Y_UNIT_TEST_SUITE(TPDiskTest) {
while (writeLog() == NKikimrProto::OK) {}
UNIT_ASSERT_VALUES_EQUAL(writeLog(), NKikimrProto::OUT_OF_SPACE);

testCtx.Send(new TEvBlobStorage::TEvAskWardenRestartPDiskResult(testCtx.GetPDisk()->PDiskId, testCtx.MainKey, true, nullptr));
const auto evInitRes = testCtx.Recv<TEvBlobStorage::TEvNotifyWardenPDiskRestarted>();
testCtx.StartPDiskRestart();

vdisk.InitFull();
vdisk.SendEvLogSync();
Expand All @@ -943,8 +943,8 @@ Y_UNIT_TEST_SUITE(PDiskCompatibilityInfo) {
using TCurrent = NKikimrConfig::TCurrentCompatibilityInfo;
THolder<NPDisk::TEvYardInitResult> RestartPDisk(TActorTestContext& testCtx, ui32 pdiskId, TVDiskMock& vdisk, TCurrent* newInfo) {
TCompatibilityInfoTest::Reset(newInfo);
testCtx.Send(new TEvBlobStorage::TEvAskWardenRestartPDiskResult(pdiskId, testCtx.MainKey, true, nullptr));
testCtx.Recv<TEvBlobStorage::TEvNotifyWardenPDiskRestarted>();
Y_UNUSED(pdiskId);
testCtx.StartPDiskRestart();
testCtx.Send(new NPDisk::TEvYardInit(vdisk.OwnerRound.fetch_add(1), vdisk.VDiskID, testCtx.TestCtx.PDiskGuid));
return testCtx.Recv<NPDisk::TEvYardInitResult>();
}
Expand Down
18 changes: 18 additions & 0 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,27 @@ struct TActorTestContext {
new NPDisk::TEvYardControl(NPDisk::TEvYardControl::GetPDiskPointer, nullptr),
NKikimrProto::OK);
PDisk = reinterpret_cast<NPDisk::TPDisk*>(evControlRes->Cookie);

PDiskActor = PDisk->PDiskActor;
}
return PDisk;
}

void StartPDiskRestart() {
ui32 pdiskId = GetPDisk()->PDiskId;

Send(new TEvBlobStorage::TEvAskWardenRestartPDiskResult(pdiskId, MainKey, true, nullptr));
const auto evInitRes = Recv<TEvBlobStorage::TEvNotifyWardenPDiskRestarted>();

if (!Settings.UsePDiskMock) {
TActorId wellKnownPDiskActorId = MakeBlobStoragePDiskID(PDiskActor->NodeId(), pdiskId);

PDisk = nullptr;

// We will temporarily use well know pdisk actor id, because restarted pdisk actor id is not yet known.
PDiskActor = wellKnownPDiskActorId;
}
}

template<typename T>
auto SafeRunOnPDisk(T&& f) {
Expand Down

0 comments on commit 2f366dc

Please sign in to comment.