Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restart PDisk actor on PDisk restart #4820

Merged
merged 4 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions ydb/apps/dstool/lib/dstool_cmd_cluster_workload_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def add_options(p):
p.add_argument('--enable-pdisk-encryption-keys-changes', action='store_true', help='Enable changes of PDisk encryption keys')
p.add_argument('--enable-kill-tablets', action='store_true', help='Enable tablet killer')
p.add_argument('--enable-kill-blob-depot', action='store_true', help='Enable BlobDepot killer')
p.add_argument('--enable-restart-pdisks', action='store_true', help='Enable PDisk restarter')
p.add_argument('--kill-signal', type=str, default='KILL', help='Kill signal to send to restart node')


Expand Down Expand Up @@ -144,6 +145,19 @@ def do_restart(node_id):
if args.enable_pdisk_encryption_keys_changes:
remove_old_pdisk_keys(pdisk_keys, pdisk_key_versions, node_id)

def do_restart_pdisk(node_id, pdisk_id):
assert can_act_on_vslot(node_id, pdisk_id)
request = common.kikimr_bsconfig.TConfigRequest(IgnoreDegradedGroupsChecks=True)
cmd = request.Command.add().RestartPDisk
cmd.TargetPDiskId.NodeId = node_id
cmd.TargetPDiskId.PDiskId = pdisk_id
try:
response = common.invoke_bsc_request(request)
except Exception as e:
raise Exception('failed to perform restart request: %s' % e)
if not response.Success:
raise Exception('Unexpected error from BSC: %s' % response.ErrorDescription)

def do_evict(vslot_id):
assert can_act_on_vslot(*vslot_id)
try:
Expand Down Expand Up @@ -230,13 +244,16 @@ def do_kill_blob_depot():
wipes = []
readonlies = []
unreadonlies = []
pdisk_restarts = []

for vslot in base_config.VSlot:
if common.is_dynamic_group(vslot.GroupId):
vslot_id = common.get_vslot_id(vslot.VSlotId)
vdisk_id = '[%08x:%d:%d:%d]' % (vslot.GroupId, vslot.FailRealmIdx, vslot.FailDomainIdx, vslot.VDiskIdx)
if vslot_id in vslot_readonly and not args.disable_readonly:
unreadonlies.append(('un-readonly vslot id: %s, vdisk id: %s' % (vslot_id, vdisk_id), (do_readonly, vslot, False)))
if can_act_on_vslot(*vslot_id[:2]) and args.enable_restart_pdisks:
pdisk_restarts.append(('restart pdisk node_id: %d, pdisk_id: %d' % vslot_id[:2], (do_restart_pdisk, *vslot_id[:2])))
if can_act_on_vslot(*vslot_id) and (recent_restarts or args.disable_restarts):
if not args.disable_evicts:
evicts.append(('evict vslot id: %s, vdisk id: %s' % (vslot_id, vdisk_id), (do_evict, vslot_id)))
Expand All @@ -258,6 +275,8 @@ def pick(v):
possible_actions.append(('readonly', (pick, readonlies)))
if unreadonlies:
possible_actions.append(('un-readonly', (pick, unreadonlies)))
if pdisk_restarts:
possible_actions.append(('restart-pdisk', (pick, pdisk_restarts)))

restarts = []

Expand Down
2 changes: 1 addition & 1 deletion ydb/apps/dstool/lib/dstool_cmd_group_virtual_create.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def do(args):
names_remaining.remove(group.VirtualGroupInfo.Name)
elif group.VirtualGroupInfo.State == common.EVirtualGroupState.CREATE_FAILED:
names_remaining.remove(group.VirtualGroupInfo.Name)
errors.append(f'{group.VirtualGroupInfo.Name}: {group.ErrorReason}')
errors.append(f'{group.VirtualGroupInfo.Name}: {group.VirtualGroupInfo.ErrorReason}')

if names_remaining:
time.sleep(1)
Expand Down
21 changes: 14 additions & 7 deletions ydb/core/blobstorage/nodewarden/node_warden_pdisk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,23 +273,30 @@ namespace NKikimr::NStorage {
void TNodeWarden::DoRestartLocalPDisk(const NKikimrBlobStorage::TNodeWardenServiceSet::TPDisk& pdisk) {
ui32 pdiskId = pdisk.GetPDiskID();

const TActorId actorId = MakeBlobStoragePDiskID(LocalNodeId, pdiskId);
STLOG(PRI_NOTICE, BS_NODE, NW75, "DoRestartLocalPDisk", (PDiskId, pdiskId));

const auto [_, inserted] = PDiskRestartInFlight.emplace(pdiskId);

if (!inserted) {
STLOG(PRI_NOTICE, BS_NODE, NW76, "Restart already in progress", (PDiskId, pdiskId));
// Restart is already in progress.
return;
}

auto it = LocalPDisks.find(TPDiskKey(LocalNodeId, pdiskId));
if (it == LocalPDisks.end()) {
PDiskRestartInFlight.erase(pdiskId);

STLOG(PRI_NOTICE, BS_NODE, NW77, "Restart state carried from previous start, just starting", (PDiskId, pdiskId));

// This can happen if warden didn't handle pdisk's restart before node's restart.
// In this case, PDisk has EntityStatus::RESTART instead of EntityStatus::INITIAL.
StartLocalPDisk(pdisk);
SendPDiskReport(pdiskId, NKikimrBlobStorage::TEvControllerNodeReport::PD_RESTARTED);
return;
}

const auto [_, inserted] = PDiskRestartInFlight.emplace(pdiskId);

if (!inserted) {
// Restart is already in progress.
return;
}
const TActorId actorId = MakeBlobStoragePDiskID(LocalNodeId, pdiskId);

TIntrusivePtr<TPDiskConfig> pdiskConfig = CreatePDiskConfig(it->second.Record);

Expand Down
59 changes: 45 additions & 14 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,21 @@ namespace NPDisk {

LWTRACE_USING(BLOBSTORAGE_PROVIDER);

void CreatePDiskActor(
TGenericExecutorThread& executorThread,
const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters,
const TIntrusivePtr<TPDiskConfig> &cfg,
const NPDisk::TMainKey &mainKey,
ui32 pDiskID, ui32 poolId, ui32 nodeId
) {

TActorId actorId = executorThread.RegisterActor(CreatePDisk(cfg, mainKey, counters), TMailboxType::ReadAsFilled, poolId);

TActorId pDiskServiceId = MakeBlobStoragePDiskID(nodeId, pDiskID);

executorThread.ActorSystem->RegisterLocalService(pDiskServiceId, actorId);
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// PDisk Actor
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -993,29 +1008,48 @@ class TPDiskActor : public TActorBootstrapped<TPDiskActor> {
return;
}

if (PendingRestartResponse) {
PendingRestartResponse(restartAllowed, ev->Get()->Details);
PendingRestartResponse = {};
}

if (restartAllowed) {
MainKey = ev->Get()->MainKey;
NPDisk::TMainKey newMainKey = ev->Get()->MainKey;

SecureWipeBuffer((ui8*)ev->Get()->MainKey.Keys.data(), sizeof(NPDisk::TKey) * ev->Get()->MainKey.Keys.size());

LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::BS_PDISK, "PDiskId# " << PDisk->PDiskId
<< " Going to restart PDisk since recieved TEvAskWardenRestartPDiskResult");

const TActorIdentity& thisActorId = SelfId();
ui32 nodeId = thisActorId.NodeId();
ui32 poolId = thisActorId.PoolID();
ui32 pdiskId = PDisk->PDiskId;

PDisk->Stop();

TIntrusivePtr<TPDiskConfig> actorCfg = std::move(Cfg);

auto& newCfg = ev->Get()->Config;

if (newCfg) {
Y_VERIFY_S(Cfg->PDiskId == PDisk->PDiskId,
"New config's PDiskId# " << newCfg->PDiskId << " is not equal to real PDiskId# " << PDisk->PDiskId);
Cfg = std::move(newCfg);
Y_VERIFY_S(newCfg->PDiskId == pdiskId,
"New config's PDiskId# " << newCfg->PDiskId << " is not equal to real PDiskId# " << pdiskId);

actorCfg = std::move(newCfg);
}

StartPDiskThread();
const TActorContext& actorCtx = ActorContext();

Send(ev->Sender, new TEvBlobStorage::TEvNotifyWardenPDiskRestarted(PDisk->PDiskId));
}
auto& counters = AppData(actorCtx)->Counters;

if (PendingRestartResponse) {
PendingRestartResponse(restartAllowed, ev->Get()->Details);
PendingRestartResponse = {};
TGenericExecutorThread& executorThread = actorCtx.ExecutorThread;

PassAway();

CreatePDiskActor(executorThread, counters, actorCfg, newMainKey, pdiskId, poolId, nodeId);

Send(ev->Sender, new TEvBlobStorage::TEvNotifyWardenPDiskRestarted(pdiskId));
}
}

Expand Down Expand Up @@ -1290,10 +1324,7 @@ IActor* CreatePDisk(const TIntrusivePtr<TPDiskConfig> &cfg, const NPDisk::TMainK

void TRealPDiskServiceFactory::Create(const TActorContext &ctx, ui32 pDiskID,
const TIntrusivePtr<TPDiskConfig> &cfg, const NPDisk::TMainKey &mainKey, ui32 poolId, ui32 nodeId) {
TActorId actorId = ctx.ExecutorThread.RegisterActor(
CreatePDisk(cfg, mainKey, AppData(ctx)->Counters), TMailboxType::ReadAsFilled, poolId);
TActorId pDiskServiceId = MakeBlobStoragePDiskID(nodeId, pDiskID);
ctx.ExecutorThread.ActorSystem->RegisterLocalService(pDiskServiceId, actorId);
CreatePDiskActor(ctx.ExecutorThread, AppData(ctx)->Counters, cfg, mainKey, pDiskID, poolId, nodeId);
}

} // NKikimr
1 change: 1 addition & 0 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ void TPDisk::Stop() {
LOG_NOTICE_S(*ActorSystem, NKikimrServices::BS_PDISK, "PDiskId# " << PDiskId
<< " shutdown owner info# " << StartupOwnerInfo());
}

BlockDevice->Stop();

// BlockDevice is stopped, the data will NOT hit the disk.
Expand Down
12 changes: 6 additions & 6 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -503,8 +503,9 @@ Y_UNIT_TEST_SUITE(TPDiskTest) {
TVDiskMock vdisk(&testCtx);
vdisk.InitFull();
vdisk.SendEvLogSync();
testCtx.Send(new TEvBlobStorage::TEvAskWardenRestartPDiskResult(testCtx.GetPDisk()->PDiskId, testCtx.MainKey, true, nullptr));
const auto evInitRes = testCtx.Recv<TEvBlobStorage::TEvNotifyWardenPDiskRestarted>();

testCtx.StartPDiskRestart();

vdisk.InitFull();
vdisk.SendEvLogSync();
}
Expand Down Expand Up @@ -928,8 +929,7 @@ Y_UNIT_TEST_SUITE(TPDiskTest) {
while (writeLog() == NKikimrProto::OK) {}
UNIT_ASSERT_VALUES_EQUAL(writeLog(), NKikimrProto::OUT_OF_SPACE);

testCtx.Send(new TEvBlobStorage::TEvAskWardenRestartPDiskResult(testCtx.GetPDisk()->PDiskId, testCtx.MainKey, true, nullptr));
const auto evInitRes = testCtx.Recv<TEvBlobStorage::TEvNotifyWardenPDiskRestarted>();
testCtx.StartPDiskRestart();

vdisk.InitFull();
vdisk.SendEvLogSync();
Expand All @@ -943,8 +943,8 @@ Y_UNIT_TEST_SUITE(PDiskCompatibilityInfo) {
using TCurrent = NKikimrConfig::TCurrentCompatibilityInfo;
THolder<NPDisk::TEvYardInitResult> RestartPDisk(TActorTestContext& testCtx, ui32 pdiskId, TVDiskMock& vdisk, TCurrent* newInfo) {
TCompatibilityInfoTest::Reset(newInfo);
testCtx.Send(new TEvBlobStorage::TEvAskWardenRestartPDiskResult(pdiskId, testCtx.MainKey, true, nullptr));
testCtx.Recv<TEvBlobStorage::TEvNotifyWardenPDiskRestarted>();
Y_UNUSED(pdiskId);
testCtx.StartPDiskRestart();
testCtx.Send(new NPDisk::TEvYardInit(vdisk.OwnerRound.fetch_add(1), vdisk.VDiskID, testCtx.TestCtx.PDiskGuid));
return testCtx.Recv<NPDisk::TEvYardInitResult>();
}
Expand Down
16 changes: 16 additions & 0 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,25 @@ struct TActorTestContext {
new NPDisk::TEvYardControl(NPDisk::TEvYardControl::GetPDiskPointer, nullptr),
NKikimrProto::OK);
PDisk = reinterpret_cast<NPDisk::TPDisk*>(evControlRes->Cookie);

PDiskActor = PDisk->PDiskActor;
}
return PDisk;
}

void StartPDiskRestart() {
Send(new TEvBlobStorage::TEvAskWardenRestartPDiskResult(GetPDisk()->PDiskId, MainKey, true, nullptr));
const auto evInitRes = Recv<TEvBlobStorage::TEvNotifyWardenPDiskRestarted>();

if (!Settings.UsePDiskMock) {
TActorId wellKnownPDiskActorId = MakeBlobStoragePDiskID(PDiskActor->NodeId(), PDisk->PDiskId);

PDisk = nullptr;

// We will temporarily use well know pdisk actor id, because restarted pdisk actor id is not yet known.
PDiskActor = wellKnownPDiskActorId;
}
}

template<typename T>
auto SafeRunOnPDisk(T&& f) {
Expand Down
Loading