diff --git a/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp b/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp index 3103fdc3763a..5b08cef7a37b 100644 --- a/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp +++ b/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp @@ -286,6 +286,8 @@ namespace NKikimr::NStorage { } CurrentProposedStorageConfig.reset(); } else { + STLOG(PRI_DEBUG, BS_NODE, NWDC47, "no quorum for ProposedStorageConfig", (Record, *res), + (CurrentProposedStorageConfig, *CurrentProposedStorageConfig)); CurrentProposedStorageConfig.reset(); return "no quorum for ProposedStorageConfig"; } diff --git a/ydb/core/blobstorage/nodewarden/distconf_invoke.cpp b/ydb/core/blobstorage/nodewarden/distconf_invoke.cpp index e332dbb99ecb..b275e864ef6a 100644 --- a/ydb/core/blobstorage/nodewarden/distconf_invoke.cpp +++ b/ydb/core/blobstorage/nodewarden/distconf_invoke.cpp @@ -141,6 +141,9 @@ namespace NKikimr::NStorage { case TQuery::kDropDonor: return DropDonor(record.GetDropDonor()); + case TQuery::kReassignStateStorageNode: + return ReassignStateStorageNode(record.GetReassignStateStorageNode()); + case TQuery::REQUEST_NOT_SET: return FinishWithError(TResult::ERROR, "Request field not set"); } @@ -509,6 +512,87 @@ namespace NKikimr::NStorage { StartProposition(&config); } + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // State Storage operation + + void ReassignStateStorageNode(const TQuery::TReassignStateStorageNode& cmd) { + if (!RunCommonChecks()) { + return; + } + + NKikimrBlobStorage::TStorageConfig config = *Self->StorageConfig; + + auto process = [&](const char *name, auto hasFunc, auto mutableFunc) { + if (!(config.*hasFunc)()) { + FinishWithError(TResult::ERROR, TStringBuilder() << name << " configuration is not filled in"); + return false; + } + + auto *m = (config.*mutableFunc)(); + auto *ring = m->MutableRing(); + if (ring->RingSize() && ring->NodeSize()) { + FinishWithError(TResult::ERROR, TStringBuilder() << name << " incorrect configuration:" + " both Ring and Node fields are set"); + return false; + } + + const size_t numItems = Max(ring->RingSize(), ring->NodeSize()); + bool found = false; + + auto replace = [&](auto *ring, size_t i) { + if (ring->GetNode(i) == cmd.GetFrom()) { + if (found) { + FinishWithError(TResult::ERROR, TStringBuilder() << name << " ambiguous From node"); + return false; + } else { + found = true; + ring->MutableNode()->Set(i, cmd.GetTo()); + } + } + return true; + }; + + for (size_t i = 0; i < numItems; ++i) { + if (ring->RingSize()) { + const auto& r = ring->GetRing(i); + if (r.RingSize()) { + FinishWithError(TResult::ERROR, TStringBuilder() << name << " incorrect configuration:" + " Ring is way too nested"); + return false; + } + const size_t numNodes = r.NodeSize(); + for (size_t k = 0; k < numNodes; ++k) { + if (r.GetNode(k) == cmd.GetFrom() && !replace(ring->MutableRing(i), k)) { + return false; + } + } + } else { + if (ring->GetNode(i) == cmd.GetFrom() && !replace(ring, i)) { + return false; + } + } + } + if (!found) { + FinishWithError(TResult::ERROR, TStringBuilder() << name << " From node not found"); + return false; + } + + return true; + }; + +#define F(NAME) \ + if (cmd.Get##NAME() && !process(#NAME, &NKikimrBlobStorage::TStorageConfig::Has##NAME##Config, \ + &NKikimrBlobStorage::TStorageConfig::Mutable##NAME##Config)) { \ + return; \ + } + F(StateStorage) + F(StateStorageBoard) + F(SchemeBoard) + + config.SetGeneration(config.GetGeneration() + 1); + StartProposition(&config); + } + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Configuration proposition diff --git a/ydb/core/blobstorage/nodewarden/node_warden_impl.h b/ydb/core/blobstorage/nodewarden/node_warden_impl.h index 2aebad96998c..985680808f02 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_impl.h +++ b/ydb/core/blobstorage/nodewarden/node_warden_impl.h @@ -516,15 +516,12 @@ namespace NKikimr::NStorage { TIntrusivePtr StateStorageInfo; TIntrusivePtr BoardInfo; TIntrusivePtr SchemeBoardInfo; - THashSet ReplicaStartPending; void StartDistributedConfigKeeper(); void ForwardToDistributedConfigKeeper(STATEFN_SIG); NKikimrBlobStorage::TStorageConfig StorageConfig; THashSet StorageConfigSubscribers; - ui64 NextGoneCookie = 1; - std::unordered_map> GoneCallbacks; void Handle(TEvNodeWardenQueryStorageConfig::TPtr ev); void Handle(TEvNodeWardenStorageConfig::TPtr ev); @@ -532,7 +529,6 @@ namespace NKikimr::NStorage { void ApplyStorageConfig(const NKikimrBlobStorage::TNodeWardenServiceSet& current, const NKikimrBlobStorage::TNodeWardenServiceSet *proposed); void ApplyStateStorageConfig(const NKikimrBlobStorage::TStorageConfig *proposed); - void HandleGone(STATEFN_SIG); void ApplyStaticServiceSet(const NKikimrBlobStorage::TNodeWardenServiceSet& ss); void Handle(TEvNodeWardenQueryBaseConfig::TPtr ev); @@ -630,8 +626,6 @@ namespace NKikimr::NStorage { hFunc(TEvNodeWardenQueryBaseConfig, Handle); hFunc(TEvNodeConfigInvokeOnRootResult, Handle); - fFunc(TEvents::TSystem::Gone, HandleGone); - default: EnqueuePendingMessage(ev); break; diff --git a/ydb/core/blobstorage/nodewarden/node_warden_resource.cpp b/ydb/core/blobstorage/nodewarden/node_warden_resource.cpp index dcbd6d50c28d..efb99dbdd236 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_resource.cpp +++ b/ydb/core/blobstorage/nodewarden/node_warden_resource.cpp @@ -163,21 +163,24 @@ void TNodeWarden::ApplyStateStorageConfig(const NKikimrBlobStorage::TStorageConf }; TActorSystem *as = TActivationContext::ActorSystem(); - if (!StateStorageProxyConfigured || changed(*StateStorageInfo, *stateStorageInfo) || changed(*BoardInfo, *boardInfo) || - changed(*SchemeBoardInfo, *schemeBoardInfo)) { // reconfigure proxy + const bool changedStateStorage = !StateStorageProxyConfigured || changed(*StateStorageInfo, *stateStorageInfo); + const bool changedBoard = !StateStorageProxyConfigured || changed(*BoardInfo, *boardInfo); + const bool changedSchemeBoard = !StateStorageProxyConfigured || changed(*SchemeBoardInfo, *schemeBoardInfo); + if (changedStateStorage || changedBoard || changedSchemeBoard) { // reconfigure proxy STLOG(PRI_INFO, BS_NODE, NW50, "updating state storage proxy configuration"); - Send(MakeStateStorageProxyID(), new TEvStateStorage::TEvUpdateGroupConfig(stateStorageInfo, boardInfo, + Send(MakeStateStorageProxyID(), new TEvStateStorage::TEvUpdateGroupConfig(stateStorageInfo, boardInfo, schemeBoardInfo)); StateStorageProxyConfigured = true; } else { // no changes return; } - // generate actor ids of local replicas + // start new replicas if needed THashSet localActorIds; - auto scanLocalActorIds = [&](const TIntrusivePtr& info) { - if (info) { - for (const auto& ring : info->Rings) { + auto startReplicas = [&](TIntrusivePtr&& info, auto&& factory, const char *comp, auto *which) { + // collect currently running local replicas + if (const auto& current = *which) { + for (const auto& ring : current->Rings) { for (const auto& replicaId : ring.Replicas) { if (replicaId.NodeId() == LocalNodeId) { const auto [it, inserted] = localActorIds.insert(replicaId); @@ -186,67 +189,17 @@ void TNodeWarden::ApplyStateStorageConfig(const NKikimrBlobStorage::TStorageConf } } } - }; - scanLocalActorIds(StateStorageInfo); - scanLocalActorIds(BoardInfo); - scanLocalActorIds(SchemeBoardInfo); - // start new replicas if needed - auto startReplicas = [&](TIntrusivePtr&& info, auto&& factory, const char *comp, auto *which) { for (const auto& ring : info->Rings) { for (ui32 index = 0; index < ring.Replicas.size(); ++index) { if (const TActorId& replicaId = ring.Replicas[index]; replicaId.NodeId() == LocalNodeId) { - if (ReplicaStartPending.contains(replicaId)) { - // this operation is already pending, we just have to wait - } else if (localActorIds.erase(replicaId)) { - if (const TActorId actorId = as->RegisterLocalService(replicaId, TActorId())) { - STLOG(PRI_INFO, BS_NODE, NW05, "terminating existing state storage replica", - (Component, comp), (ReplicaId, replicaId), (ActorId, actorId)); - TActivationContext::Send(new IEventHandle(TEvents::TSystem::Poison, 0, actorId, SelfId(), - nullptr, 0)); // expect to terminate immediately upon reception of this message - - auto startReplica = [this, factory, which, comp, expectedReplicaId = replicaId] { - const auto& info = *which; - ReplicaStartPending.erase(expectedReplicaId); - bool started = false; - for (const auto& ring : info->Rings) { - for (ui32 index = 0; index < ring.Replicas.size(); ++index) { - const auto& replicaId = ring.Replicas[index]; - if (replicaId == expectedReplicaId) { - STLOG(PRI_INFO, BS_NODE, NW44, "delayed starting new state storage replica", - (Component, comp), (ReplicaId, replicaId), (Index, index), (Config, *info)); - Y_ABORT_UNLESS(!started); - started = true; - TActorSystem *as = TActivationContext::ActorSystem(); - const TActorId prevActorId = as->RegisterLocalService(replicaId, - as->Register(factory(info, index), TMailboxType::ReadAsFilled, - AppData()->SystemPoolId)); - Y_VERIFY_S(!prevActorId, "unacceptable race in StateStorage replica registration" - " Component# " << comp - << " Index# " << index - << " Config# " << info->ToString() - << " ReplicaId# " << replicaId); - } - } - } - if (!started) { - STLOG(PRI_INFO, BS_NODE, NW48, "did not start new state storage replica", - (Component, comp), (ReplicaId, expectedReplicaId), (Config, *info)); - } - }; - - const TActorId forwardOnNondelivery = SelfId(); - const ui64 cookie = NextGoneCookie++; - TActivationContext::Send(new IEventHandle(TEvents::TSystem::Gone, 0, actorId, SelfId(), - nullptr, cookie, &forwardOnNondelivery)); // this message is expected to be forwarded - GoneCallbacks.emplace(cookie, startReplica); - ReplicaStartPending.emplace(replicaId); - } - } else { + if (const auto [it, inserted] = localActorIds.insert(replicaId); inserted) { STLOG(PRI_INFO, BS_NODE, NW08, "starting new state storage replica", (Component, comp), (ReplicaId, replicaId), (Index, index), (Config, *info)); as->RegisterLocalService(replicaId, as->Register(factory(info, index), TMailboxType::ReadAsFilled, AppData()->SystemPoolId)); + } else { + // TODO(alexvru): update replica configuration somehow } } } @@ -254,9 +207,15 @@ void TNodeWarden::ApplyStateStorageConfig(const NKikimrBlobStorage::TStorageConf *which = std::move(info); }; - startReplicas(std::move(stateStorageInfo), CreateStateStorageReplica, "StateStorage", &StateStorageInfo); - startReplicas(std::move(boardInfo), CreateStateStorageBoardReplica, "StateStorageBoard", &BoardInfo); - startReplicas(std::move(schemeBoardInfo), CreateSchemeBoardReplica, "SchemeBoard", &SchemeBoardInfo); + if (changedStateStorage) { + startReplicas(std::move(stateStorageInfo), CreateStateStorageReplica, "StateStorage", &StateStorageInfo); + } + if (changedBoard) { + startReplicas(std::move(boardInfo), CreateStateStorageBoardReplica, "StateStorageBoard", &BoardInfo); + } + if (changedSchemeBoard) { + startReplicas(std::move(schemeBoardInfo), CreateSchemeBoardReplica, "SchemeBoard", &SchemeBoardInfo); + } // terminate unused replicas for (const auto& replicaId : localActorIds) { @@ -266,12 +225,6 @@ void TNodeWarden::ApplyStateStorageConfig(const NKikimrBlobStorage::TStorageConf } } -void TNodeWarden::HandleGone(STATEFN_SIG) { - auto nh = GoneCallbacks.extract(ev->Cookie); - Y_ABORT_UNLESS(nh); - nh.mapped()(); -} - void TNodeWarden::ApplyStaticServiceSet(const NKikimrBlobStorage::TNodeWardenServiceSet& ss) { ApplyServiceSet(ss, true /*isStatic*/, true /*comprehensive*/, false /*updateCache*/, "distconf"); } diff --git a/ydb/core/protos/blobstorage_distributed_config.proto b/ydb/core/protos/blobstorage_distributed_config.proto index ca4e1ba69c53..161584647616 100644 --- a/ydb/core/protos/blobstorage_distributed_config.proto +++ b/ydb/core/protos/blobstorage_distributed_config.proto @@ -171,12 +171,21 @@ message TEvNodeConfigInvokeOnRoot { NKikimrBlobStorage.TVSlotId VSlotId = 2; } + message TReassignStateStorageNode { + uint32 From = 1; + uint32 To = 2; // or zero to pick up automatically + bool StateStorage = 3; + bool StateStorageBoard = 4; + bool SchemeBoard = 5; + } + oneof Request { TUpdateConfig UpdateConfig = 1; TQueryConfig QueryConfig = 2; TReassignGroupDisk ReassignGroupDisk = 3; TStaticVDiskSlain StaticVDiskSlain = 4; TDropDonor DropDonor = 5; + TReassignStateStorageNode ReassignStateStorageNode = 6; } } @@ -211,6 +220,9 @@ message TEvNodeConfigInvokeOnRootResult { message TDropDonor { } + message TReassignStateStorageNode { + } + EStatus Status = 1; optional string ErrorReason = 2; TScepter Scepter = 3; @@ -221,5 +233,6 @@ message TEvNodeConfigInvokeOnRootResult { TReassignGroupDisk ReassignGroupDisk = 6; TStaticVDiskSlain StaticVDiskSlain = 7; TDropDonor DropDonor = 8; + TReassignStateStorageNode ReassignStateStorageNode = 9; } }