Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug-358][api] Delete Cluster #369

Merged
merged 9 commits into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.datasophon.api;

import cn.hutool.extra.spring.EnableSpringUtil;
import com.datasophon.api.master.ActorUtils;
import com.datasophon.common.Constants;
import com.datasophon.common.cache.CacheUtils;
Expand All @@ -36,6 +37,7 @@
@ServletComponentScan
@ComponentScan("com.datasophon")
@MapperScan("com.datasophon.dao")
@EnableSpringUtil
public class DataSophonApplicationServer extends SpringBootServletInitializer {

public static void main(String[] args) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ public final class Constants {
public static final String MULTIPLE = "multiple";
public static final String CLUSTER_STATE = "cluster_state";
public static final String PATH = "path";

public static final String MV_PATH = "mv_path";
public static final String SERVICE_INSTANCE_ID = "service_instance_id";
public static final String IS_ENABLED = "is_enabled";
public static final String SORT_NUM = "sort_num";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.datasophon.common.command;

import com.datasophon.common.enums.ClusterCommandType;

import java.io.Serializable;

import lombok.Data;

@Data
public class ClusterCommand implements Serializable {

private final ClusterCommandType commandType;

private Integer clusterId;

public ClusterCommand(ClusterCommandType commandType) {
this.commandType = commandType;
}

public ClusterCommand(ClusterCommandType commandType, Integer clusterId) {
this.commandType = commandType;
this.clusterId = clusterId;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.datasophon.common.enums;

public enum ClusterCommandType {

CHECK,

UNINSTALL
88fantasy marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public class ClusterInfoEntity implements Serializable {
*/
private String frameVersion;
/**
* 集群状态 1:待配置2:正在运行
* 集群状态 1:待配置 2:正在运行 3: 停止 4: 删除中 5: 已删除
*/
private ClusterState clusterState;
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class ClusterServiceRoleInstanceEntity implements Serializable {
*/
private String hostname;
/**
* 服务角色状态 1:正在运行2:存在告警3:存在异常4:需要重启
* 服务角色状态 1:正在运行 2:停止 3:存在告警 4:退役中 5:已退役
*/
private ServiceRoleState serviceRoleState;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,16 @@
import com.baomidou.mybatisplus.annotation.EnumValue;
import com.fasterxml.jackson.annotation.JsonValue;

import java.util.Arrays;

public enum ClusterState {

UNINSTALLED(5, "已删除"),

88fantasy marked this conversation as resolved.
Show resolved Hide resolved
UNINSTALLING(4, "删除中"),

STOP(3, "停止"),

RUNNING(2, "正在运行"),
NEED_CONFIG(1, "待配置");

Expand Down Expand Up @@ -52,6 +60,10 @@ public void setDesc(String desc) {
this.desc = desc;
}

public static ClusterState of(int value) {
return Arrays.stream(values()).filter(state -> state.getValue() == value).findAny().orElse(null);
}

@Override
public String toString() {
return this.desc;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
import akka.actor.Props;
import akka.util.Timeout;
import com.datasophon.api.master.alert.ServiceRoleCheckActor;
import com.datasophon.common.command.ClusterCommand;
import com.datasophon.common.command.HostCheckCommand;
import com.datasophon.common.command.ServiceRoleCheckCommand;
import com.datasophon.common.enums.ClusterCommandType;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.apache.commons.lang.StringUtils;
Expand Down Expand Up @@ -70,6 +72,9 @@ public static void init() throws UnknownHostException, NoSuchAlgorithmException
actorSystem.actorOf(Props.create(MasterNodeProcessingActor.class),
getActorRefName(MasterNodeProcessingActor.class));

ActorRef clusterCheckActor =
actorSystem.actorOf(Props.create(ClusterActor.class), getActorRefName(ClusterActor.class));

// 节点检测 5m 检测一次
actorSystem.scheduler().schedule(
FiniteDuration.apply(30L, TimeUnit.SECONDS),
Expand All @@ -86,6 +91,17 @@ public static void init() throws UnknownHostException, NoSuchAlgorithmException
new ServiceRoleCheckCommand(),
actorSystem.dispatcher(),
ActorRef.noSender());

// 集群检测 1m 检测一次
actorSystem.scheduler().schedule(
FiniteDuration.apply(30L, TimeUnit.SECONDS),
FiniteDuration.apply(60L, TimeUnit.SECONDS),
clusterCheckActor,
new ClusterCommand(ClusterCommandType.CHECK),
actorSystem.dispatcher(),
ActorRef.noSender());


rand = SecureRandom.getInstanceStrong();
}

Expand Down Expand Up @@ -131,7 +147,7 @@ public static ActorRef getRemoteActor(String hostname, String actorName) {
try {
actorRef = Await.result(future, Duration.create(30, TimeUnit.SECONDS));
} catch (Exception e) {
e.printStackTrace();
logger.error(e.getMessage(), e);
}

return actorRef;
Expand All @@ -141,10 +157,11 @@ public static ActorRef getRemoteActor(String hostname, String actorName) {
* shutdown
*/
public static void shutdown() {
if(actorSystem != null) {
if (actorSystem != null) {
try {
actorSystem.shutdown();
} catch (Exception ignore){}
} catch (Exception ignore) {
}
actorSystem = null;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.datasophon.api.master;

import akka.actor.UntypedActor;
import cn.hutool.core.date.DateUtil;
import cn.hutool.extra.spring.SpringUtil;
import com.datasophon.api.service.ClusterHostService;
import com.datasophon.api.service.ClusterInfoService;
import com.datasophon.api.service.ClusterServiceRoleGroupConfigService;
import com.datasophon.api.service.ClusterServiceRoleInstanceService;
import com.datasophon.api.service.ServiceInstallService;
import com.datasophon.api.utils.ProcessUtils;
import com.datasophon.common.Constants;
import com.datasophon.common.command.ClusterCommand;
import com.datasophon.common.enums.ClusterCommandType;
import com.datasophon.common.model.Generators;
import com.datasophon.common.model.ServiceConfig;
import com.datasophon.common.utils.ExecResult;
import com.datasophon.common.utils.Result;
import com.datasophon.dao.entity.ClusterHostEntity;
import com.datasophon.dao.entity.ClusterInfoEntity;
import com.datasophon.dao.entity.ClusterServiceRoleGroupConfig;
import com.datasophon.dao.entity.ClusterServiceRoleInstanceEntity;
import com.datasophon.dao.enums.ClusterState;
import com.datasophon.dao.enums.ServiceRoleState;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* 节点状态监测
*/
public class ClusterActor extends UntypedActor {

private static final Logger logger = LoggerFactory.getLogger(ClusterActor.class);

private static final String DEPRECATED = "Deprecated";

@Override
public void onReceive(Object msg) throws Throwable {
if (msg instanceof ClusterCommand) {
logger.info("start to check cluster info");
ClusterServiceRoleInstanceService roleInstanceService =
SpringUtil.getBean(ClusterServiceRoleInstanceService.class);
ClusterInfoService clusterInfoService =
SpringUtil.getBean(ClusterInfoService.class);

// Host or cluster
final ClusterCommand clusterCommand = (ClusterCommand) msg;

if (ClusterCommandType.CHECK.equals(clusterCommand.getCommandType())) {
// 获取所有集群
Result result = clusterInfoService.getClusterList();
List<ClusterInfoEntity> clusterList = (List<ClusterInfoEntity>) result.getData();

for (ClusterInfoEntity clusterInfoEntity : clusterList) {
// 获取集群上正在运行的服务
int clusterId = clusterInfoEntity.getId();
List<ClusterServiceRoleInstanceEntity> roleInstanceList = roleInstanceService.getServiceRoleInstanceListByClusterId(clusterId);
if (!ClusterState.NEED_CONFIG.equals(clusterInfoEntity.getClusterState())) {
if (!roleInstanceList.isEmpty()) {
if (roleInstanceList.stream().allMatch(roleInstance -> ServiceRoleState.STOP.equals(roleInstance.getServiceRoleState()))) {
clusterInfoService.updateClusterState(clusterId, ClusterState.STOP.getValue());
} else {
clusterInfoService.updateClusterState(clusterId, ClusterState.RUNNING.getValue());
}
}

}
}

} else if (ClusterCommandType.UNINSTALL.equals(clusterCommand.getCommandType())) {
Integer clusterId = clusterCommand.getClusterId();
if (Objects.nonNull(clusterId)) {
ClusterInfoEntity clusterInfo = clusterInfoService.getById(clusterId);
if (Objects.nonNull(clusterInfo)) {
ClusterHostService clusterHostService = SpringUtil.getBean(ClusterHostService.class);
ClusterServiceRoleInstanceService clusterServiceRoleInstanceService = SpringUtil.getBean(ClusterServiceRoleInstanceService.class);
ClusterServiceRoleGroupConfigService clusterServiceRoleGroupConfigService = SpringUtil.getBean(ClusterServiceRoleGroupConfigService.class);
ServiceInstallService serviceInstallService = SpringUtil.getBean(ServiceInstallService.class);

// 检查服务实例配置与目录
List<ClusterServiceRoleInstanceEntity> roleInstanceList = clusterServiceRoleInstanceService.getServiceRoleInstanceListByClusterId(clusterId);
for (ClusterServiceRoleInstanceEntity roleInstance : roleInstanceList) {
String roleName = roleInstance.getServiceRoleName();
String hostname = roleInstance.getHostname();
ClusterServiceRoleGroupConfig config = clusterServiceRoleGroupConfigService.getConfigByRoleGroupId(roleInstance.getRoleGroupId());
List<ServiceConfig> oldConfigs = ProcessUtils.getServiceConfig(config);
Map<Generators, List<ServiceConfig>> configFileMap = new ConcurrentHashMap<>();
ProcessUtils.generateConfigFileMap(configFileMap, config);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

生成配置的目的是什么?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为了留底,

因为删除集群只是将状态变更为[已删除],并不会物理删除记录, 因此删除后依然可以在服务配置里查看到备份后的目录

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

删除集群会删除所有服务以及服务的配置,更改配置是否有必要?

Predicate<ServiceConfig> filter = c -> Constants.PATH.equals(c.getConfigType()) && !((String) c.getValue()).contains(DEPRECATED);
Consumer<ServiceConfig> peeker = c -> {
String oldPath = (String) c.getValue();
String newPath = String.format("%s_%s_%s_%s", oldPath, DEPRECATED, clusterId, DateUtil.today());
c.setDefaultValue(oldPath);
c.setValue(newPath);
};

for (Map.Entry<Generators, List<ServiceConfig>> configFile : configFileMap.entrySet()) {
List<ServiceConfig> serviceConfigs = configFile.getValue().stream()
.filter(filter)
.peek(c -> {
peeker.accept(c);
c.setConfigType(Constants.MV_PATH);
})
.collect(Collectors.toList());
if (!serviceConfigs.isEmpty()) {
configFileMap.replace(configFile.getKey(), serviceConfigs);
} else {
configFileMap.remove(configFile.getKey());
}
}

if (!configFileMap.isEmpty()) {
// 分发重命名命令
ExecResult execResult = new ExecResult();
try {
logger.info(
"start to uninstall {} in host {}",
roleName,
hostname);
execResult = ProcessUtils.configServiceRoleInstance(clusterInfo, configFileMap, roleInstance);
if (Objects.nonNull(execResult) && execResult.getExecResult()) {
logger.info(
"{} uninstall success in {}",
roleName,
hostname);
//保存配置
serviceInstallService.saveServiceConfig(clusterId,
roleInstance.getServiceName(),
oldConfigs.stream().peek(c -> {
if (filter.test(c)) {
peeker.accept(c);
}
}).collect(Collectors.toList()),
roleInstance.getRoleGroupId());

} else {
logger.info(
"{} uninstall failed in {}",
roleName,
hostname);
return;
}

} catch (Exception e) {
logger.info(
"{} uninstall failed in {}",
roleName,
hostname);
logger.error(ProcessUtils.getExceptionMessage(e));
return;
}
}
}
List<ClusterHostEntity> hostList = clusterHostService.getHostListByClusterId(clusterId);
clusterHostService.deleteHosts(hostList.stream().map(h -> String.valueOf(h.getId())).collect(Collectors.joining(Constants.COMMA)));
clusterInfoService.updateClusterState(clusterId, ClusterState.UNINSTALLED.getValue());
}
}
}
} else {
unhandled(msg);
}
}
}
Loading
Loading