Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support StateStorage reconfiguration through distconf #4465

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ydb/core/blobstorage/nodewarden/distconf_fsm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,8 @@ namespace NKikimr::NStorage {
}
CurrentProposedStorageConfig.reset();
} else {
STLOG(PRI_DEBUG, BS_NODE, NWDC47, "no quorum for ProposedStorageConfig", (Record, *res),
(CurrentProposedStorageConfig, *CurrentProposedStorageConfig));
CurrentProposedStorageConfig.reset();
return "no quorum for ProposedStorageConfig";
}
Expand Down
84 changes: 84 additions & 0 deletions ydb/core/blobstorage/nodewarden/distconf_invoke.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ namespace NKikimr::NStorage {
case TQuery::kDropDonor:
return DropDonor(record.GetDropDonor());

case TQuery::kReassignStateStorageNode:
return ReassignStateStorageNode(record.GetReassignStateStorageNode());

case TQuery::REQUEST_NOT_SET:
return FinishWithError(TResult::ERROR, "Request field not set");
}
Expand Down Expand Up @@ -509,6 +512,87 @@ namespace NKikimr::NStorage {
StartProposition(&config);
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// State Storage operation

void ReassignStateStorageNode(const TQuery::TReassignStateStorageNode& cmd) {
if (!RunCommonChecks()) {
return;
}

NKikimrBlobStorage::TStorageConfig config = *Self->StorageConfig;

auto process = [&](const char *name, auto hasFunc, auto mutableFunc) {
if (!(config.*hasFunc)()) {
FinishWithError(TResult::ERROR, TStringBuilder() << name << " configuration is not filled in");
return false;
}

auto *m = (config.*mutableFunc)();
auto *ring = m->MutableRing();
if (ring->RingSize() && ring->NodeSize()) {
FinishWithError(TResult::ERROR, TStringBuilder() << name << " incorrect configuration:"
" both Ring and Node fields are set");
return false;
}

const size_t numItems = Max(ring->RingSize(), ring->NodeSize());
bool found = false;

auto replace = [&](auto *ring, size_t i) {
if (ring->GetNode(i) == cmd.GetFrom()) {
if (found) {
FinishWithError(TResult::ERROR, TStringBuilder() << name << " ambiguous From node");
return false;
} else {
found = true;
ring->MutableNode()->Set(i, cmd.GetTo());
}
}
return true;
};

for (size_t i = 0; i < numItems; ++i) {
if (ring->RingSize()) {
const auto& r = ring->GetRing(i);
if (r.RingSize()) {
FinishWithError(TResult::ERROR, TStringBuilder() << name << " incorrect configuration:"
" Ring is way too nested");
return false;
}
const size_t numNodes = r.NodeSize();
for (size_t k = 0; k < numNodes; ++k) {
if (r.GetNode(k) == cmd.GetFrom() && !replace(ring->MutableRing(i), k)) {
return false;
}
}
} else {
if (ring->GetNode(i) == cmd.GetFrom() && !replace(ring, i)) {
return false;
}
}
}
if (!found) {
FinishWithError(TResult::ERROR, TStringBuilder() << name << " From node not found");
return false;
}

return true;
};

#define F(NAME) \
if (cmd.Get##NAME() && !process(#NAME, &NKikimrBlobStorage::TStorageConfig::Has##NAME##Config, \
&NKikimrBlobStorage::TStorageConfig::Mutable##NAME##Config)) { \
return; \
}
F(StateStorage)
F(StateStorageBoard)
F(SchemeBoard)

config.SetGeneration(config.GetGeneration() + 1);
StartProposition(&config);
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Configuration proposition

Expand Down
6 changes: 0 additions & 6 deletions ydb/core/blobstorage/nodewarden/node_warden_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -516,23 +516,19 @@ namespace NKikimr::NStorage {
TIntrusivePtr<TStateStorageInfo> StateStorageInfo;
TIntrusivePtr<TStateStorageInfo> BoardInfo;
TIntrusivePtr<TStateStorageInfo> SchemeBoardInfo;
THashSet<TActorId> ReplicaStartPending;

void StartDistributedConfigKeeper();
void ForwardToDistributedConfigKeeper(STATEFN_SIG);

NKikimrBlobStorage::TStorageConfig StorageConfig;
THashSet<TActorId> StorageConfigSubscribers;
ui64 NextGoneCookie = 1;
std::unordered_map<ui64, std::function<void()>> GoneCallbacks;

void Handle(TEvNodeWardenQueryStorageConfig::TPtr ev);
void Handle(TEvNodeWardenStorageConfig::TPtr ev);
void HandleUnsubscribe(STATEFN_SIG);
void ApplyStorageConfig(const NKikimrBlobStorage::TNodeWardenServiceSet& current,
const NKikimrBlobStorage::TNodeWardenServiceSet *proposed);
void ApplyStateStorageConfig(const NKikimrBlobStorage::TStorageConfig *proposed);
void HandleGone(STATEFN_SIG);
void ApplyStaticServiceSet(const NKikimrBlobStorage::TNodeWardenServiceSet& ss);

void Handle(TEvNodeWardenQueryBaseConfig::TPtr ev);
Expand Down Expand Up @@ -630,8 +626,6 @@ namespace NKikimr::NStorage {
hFunc(TEvNodeWardenQueryBaseConfig, Handle);
hFunc(TEvNodeConfigInvokeOnRootResult, Handle);

fFunc(TEvents::TSystem::Gone, HandleGone);

default:
EnqueuePendingMessage(ev);
break;
Expand Down
91 changes: 22 additions & 69 deletions ydb/core/blobstorage/nodewarden/node_warden_resource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,21 +163,24 @@ void TNodeWarden::ApplyStateStorageConfig(const NKikimrBlobStorage::TStorageConf
};

TActorSystem *as = TActivationContext::ActorSystem();
if (!StateStorageProxyConfigured || changed(*StateStorageInfo, *stateStorageInfo) || changed(*BoardInfo, *boardInfo) ||
changed(*SchemeBoardInfo, *schemeBoardInfo)) { // reconfigure proxy
const bool changedStateStorage = !StateStorageProxyConfigured || changed(*StateStorageInfo, *stateStorageInfo);
const bool changedBoard = !StateStorageProxyConfigured || changed(*BoardInfo, *boardInfo);
const bool changedSchemeBoard = !StateStorageProxyConfigured || changed(*SchemeBoardInfo, *schemeBoardInfo);
if (changedStateStorage || changedBoard || changedSchemeBoard) { // reconfigure proxy
STLOG(PRI_INFO, BS_NODE, NW50, "updating state storage proxy configuration");
Send(MakeStateStorageProxyID(), new TEvStateStorage::TEvUpdateGroupConfig(stateStorageInfo, boardInfo,
Send(MakeStateStorageProxyID(), new TEvStateStorage::TEvUpdateGroupConfig(stateStorageInfo, boardInfo,
schemeBoardInfo));
StateStorageProxyConfigured = true;
} else { // no changes
return;
}

// generate actor ids of local replicas
// start new replicas if needed
THashSet<TActorId> localActorIds;
auto scanLocalActorIds = [&](const TIntrusivePtr<TStateStorageInfo>& info) {
if (info) {
for (const auto& ring : info->Rings) {
auto startReplicas = [&](TIntrusivePtr<TStateStorageInfo>&& info, auto&& factory, const char *comp, auto *which) {
// collect currently running local replicas
if (const auto& current = *which) {
for (const auto& ring : current->Rings) {
for (const auto& replicaId : ring.Replicas) {
if (replicaId.NodeId() == LocalNodeId) {
const auto [it, inserted] = localActorIds.insert(replicaId);
Expand All @@ -186,77 +189,33 @@ void TNodeWarden::ApplyStateStorageConfig(const NKikimrBlobStorage::TStorageConf
}
}
}
};
scanLocalActorIds(StateStorageInfo);
scanLocalActorIds(BoardInfo);
scanLocalActorIds(SchemeBoardInfo);

// start new replicas if needed
auto startReplicas = [&](TIntrusivePtr<TStateStorageInfo>&& info, auto&& factory, const char *comp, auto *which) {
for (const auto& ring : info->Rings) {
for (ui32 index = 0; index < ring.Replicas.size(); ++index) {
if (const TActorId& replicaId = ring.Replicas[index]; replicaId.NodeId() == LocalNodeId) {
if (ReplicaStartPending.contains(replicaId)) {
// this operation is already pending, we just have to wait
} else if (localActorIds.erase(replicaId)) {
if (const TActorId actorId = as->RegisterLocalService(replicaId, TActorId())) {
STLOG(PRI_INFO, BS_NODE, NW05, "terminating existing state storage replica",
(Component, comp), (ReplicaId, replicaId), (ActorId, actorId));
TActivationContext::Send(new IEventHandle(TEvents::TSystem::Poison, 0, actorId, SelfId(),
nullptr, 0)); // expect to terminate immediately upon reception of this message

auto startReplica = [this, factory, which, comp, expectedReplicaId = replicaId] {
const auto& info = *which;
ReplicaStartPending.erase(expectedReplicaId);
bool started = false;
for (const auto& ring : info->Rings) {
for (ui32 index = 0; index < ring.Replicas.size(); ++index) {
const auto& replicaId = ring.Replicas[index];
if (replicaId == expectedReplicaId) {
STLOG(PRI_INFO, BS_NODE, NW44, "delayed starting new state storage replica",
(Component, comp), (ReplicaId, replicaId), (Index, index), (Config, *info));
Y_ABORT_UNLESS(!started);
started = true;
TActorSystem *as = TActivationContext::ActorSystem();
const TActorId prevActorId = as->RegisterLocalService(replicaId,
as->Register(factory(info, index), TMailboxType::ReadAsFilled,
AppData()->SystemPoolId));
Y_VERIFY_S(!prevActorId, "unacceptable race in StateStorage replica registration"
" Component# " << comp
<< " Index# " << index
<< " Config# " << info->ToString()
<< " ReplicaId# " << replicaId);
}
}
}
if (!started) {
STLOG(PRI_INFO, BS_NODE, NW48, "did not start new state storage replica",
(Component, comp), (ReplicaId, expectedReplicaId), (Config, *info));
}
};

const TActorId forwardOnNondelivery = SelfId();
const ui64 cookie = NextGoneCookie++;
TActivationContext::Send(new IEventHandle(TEvents::TSystem::Gone, 0, actorId, SelfId(),
nullptr, cookie, &forwardOnNondelivery)); // this message is expected to be forwarded
GoneCallbacks.emplace(cookie, startReplica);
ReplicaStartPending.emplace(replicaId);
}
} else {
if (const auto [it, inserted] = localActorIds.insert(replicaId); inserted) {
STLOG(PRI_INFO, BS_NODE, NW08, "starting new state storage replica",
(Component, comp), (ReplicaId, replicaId), (Index, index), (Config, *info));
as->RegisterLocalService(replicaId, as->Register(factory(info, index), TMailboxType::ReadAsFilled,
AppData()->SystemPoolId));
} else {
// TODO(alexvru): update replica configuration somehow
}
}
}
}

*which = std::move(info);
};
startReplicas(std::move(stateStorageInfo), CreateStateStorageReplica, "StateStorage", &StateStorageInfo);
startReplicas(std::move(boardInfo), CreateStateStorageBoardReplica, "StateStorageBoard", &BoardInfo);
startReplicas(std::move(schemeBoardInfo), CreateSchemeBoardReplica, "SchemeBoard", &SchemeBoardInfo);
if (changedStateStorage) {
startReplicas(std::move(stateStorageInfo), CreateStateStorageReplica, "StateStorage", &StateStorageInfo);
}
if (changedBoard) {
startReplicas(std::move(boardInfo), CreateStateStorageBoardReplica, "StateStorageBoard", &BoardInfo);
}
if (changedSchemeBoard) {
startReplicas(std::move(schemeBoardInfo), CreateSchemeBoardReplica, "SchemeBoard", &SchemeBoardInfo);
}

// terminate unused replicas
for (const auto& replicaId : localActorIds) {
Expand All @@ -266,12 +225,6 @@ void TNodeWarden::ApplyStateStorageConfig(const NKikimrBlobStorage::TStorageConf
}
}

void TNodeWarden::HandleGone(STATEFN_SIG) {
auto nh = GoneCallbacks.extract(ev->Cookie);
Y_ABORT_UNLESS(nh);
nh.mapped()();
}

void TNodeWarden::ApplyStaticServiceSet(const NKikimrBlobStorage::TNodeWardenServiceSet& ss) {
ApplyServiceSet(ss, true /*isStatic*/, true /*comprehensive*/, false /*updateCache*/, "distconf");
}
Expand Down
13 changes: 13 additions & 0 deletions ydb/core/protos/blobstorage_distributed_config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,21 @@ message TEvNodeConfigInvokeOnRoot {
NKikimrBlobStorage.TVSlotId VSlotId = 2;
}

message TReassignStateStorageNode {
uint32 From = 1;
uint32 To = 2; // or zero to pick up automatically
bool StateStorage = 3;
bool StateStorageBoard = 4;
bool SchemeBoard = 5;
}

oneof Request {
TUpdateConfig UpdateConfig = 1;
TQueryConfig QueryConfig = 2;
TReassignGroupDisk ReassignGroupDisk = 3;
TStaticVDiskSlain StaticVDiskSlain = 4;
TDropDonor DropDonor = 5;
TReassignStateStorageNode ReassignStateStorageNode = 6;
}
}

Expand Down Expand Up @@ -211,6 +220,9 @@ message TEvNodeConfigInvokeOnRootResult {
message TDropDonor {
}

message TReassignStateStorageNode {
}

EStatus Status = 1;
optional string ErrorReason = 2;
TScepter Scepter = 3;
Expand All @@ -221,5 +233,6 @@ message TEvNodeConfigInvokeOnRootResult {
TReassignGroupDisk ReassignGroupDisk = 6;
TStaticVDiskSlain StaticVDiskSlain = 7;
TDropDonor DropDonor = 8;
TReassignStateStorageNode ReassignStateStorageNode = 9;
}
}
Loading