Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#218][feat] Auto-start services when worker restart. #231

Merged
merged 1 commit into from
Apr 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,18 @@
import com.datasophon.common.enums.InstallState;
import com.datasophon.common.model.HostInfo;
import com.datasophon.common.model.StartWorkerMessage;
import com.datasophon.common.utils.CollectionUtils;
import com.datasophon.common.utils.Result;
import com.datasophon.dao.entity.ClusterHostEntity;
import com.datasophon.dao.entity.ClusterInfoEntity;
import com.datasophon.dao.entity.ClusterServiceRoleInstanceEntity;
import com.datasophon.dao.enums.MANAGED;
import com.datasophon.dao.enums.ServiceRoleState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

import java.util.*;

import static java.util.stream.Collectors.*;

public class WorkerStartActor extends UntypedActor {

Expand Down Expand Up @@ -81,14 +82,46 @@ public void onReceive(Object message) throws Throwable {
hostEntity.setManaged(MANAGED.YES);
clusterHostService.updateById(hostEntity);
}
//tell to worker what need to start


//add to prometheus
ActorRef prometheusActor = ActorUtils.getLocalActor(PrometheusActor.class,ActorUtils.getActorRefName(PrometheusActor.class));
ActorRef prometheusActor = ActorUtils.getLocalActor(PrometheusActor.class, ActorUtils.getActorRefName(PrometheusActor.class));
GenerateHostPrometheusConfig prometheusConfigCommand = new GenerateHostPrometheusConfig();
prometheusConfigCommand.setClusterId(cluster.getId());
prometheusActor.tell(prometheusConfigCommand, getSelf());

//tell to worker what need to start
autoStartServiceNeeded(msg.getHostname(), cluster.getId());
}
}

/**
* Automatically start services that need to be started
*
* @param clusterId
*/
private void autoStartServiceNeeded(String hostname, Integer clusterId) {
ClusterServiceRoleInstanceService roleInstanceService = SpringTool.getApplicationContext().getBean(ClusterServiceRoleInstanceService.class);
ClusterServiceCommandService serviceCommandService = SpringTool.getApplicationContext().getBean(ClusterServiceCommandService.class);

List<ClusterServiceRoleInstanceEntity> serviceRoleList = roleInstanceService.getStoppedServiceRoleListByHostnameAndClusterId(hostname, clusterId);
if (CollectionUtils.isEmpty(serviceRoleList)) {
logger.info("No services need to start at host {}.", hostname);
return;
}

Map<Integer, List<String>> serviceRoleMap = serviceRoleList.stream()
.collect(
groupingBy(
ClusterServiceRoleInstanceEntity::getServiceId,
mapping(i -> String.valueOf(i.getId()), toList())
)
);
Result result = serviceCommandService.generateServiceRoleCommands(clusterId, CommandType.START_SERVICE, serviceRoleMap);
if (result.getCode() == 200) {
logger.info("Auto-start services successful");
} else {
logger.info("Some service auto-start failed, please check logs of the services that failed to start.");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@


import java.util.List;
import java.util.Map;

/**
* 集群服务操作指令表
Expand All @@ -41,6 +42,8 @@ public interface ClusterServiceCommandService extends IService<ClusterServiceCom

Result generateServiceCommand(Integer clusterId, CommandType command, List<String> ids);

Result generateServiceRoleCommands(Integer clusterId, CommandType commandType, Map<Integer, List<String>> instanceIdMap);

Result generateServiceRoleCommand(Integer clusterId, CommandType command, Integer serviceIntanceId, List<String> ids);

void startExecuteCommand(Integer clusterId, String commandType, String commandIds);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
*/
public interface ClusterServiceRoleInstanceService extends IService<ClusterServiceRoleInstanceEntity> {

List<ClusterServiceRoleInstanceEntity> getStoppedServiceRoleListByHostnameAndClusterId(String hostname, Integer clusterId);

List<ClusterServiceRoleInstanceEntity> getServiceRoleListByHostnameAndClusterId(String hostname, Integer clusterId);

List<ClusterServiceRoleInstanceEntity> getServiceRoleInstanceListByServiceIdAndRoleState(Integer id, ServiceRoleState stop);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,21 +87,21 @@ public class ClusterServiceCommandServiceImpl extends ServiceImpl<ClusterService

@Override
@Transactional
public Result generateCommand(Integer clusterId, CommandType commandType,List<String> serviceNames) {
public Result generateCommand(Integer clusterId, CommandType commandType, List<String> serviceNames) {
ClusterInfoEntity clusterInfo = clusterInfoService.getById(clusterId);

List<ClusterServiceCommandEntity> list = new ArrayList<>();
List<ClusterServiceCommandHostEntity> commandHostList = new ArrayList<>();
List<ClusterServiceCommandHostCommandEntity> hostCommandList = new ArrayList<>();
List<String> commandIds = new ArrayList<String>();

Map<String, List<String>> serviceRoleHostMap = (Map<String, List<String>>) CacheUtils.get(clusterInfo.getClusterCode() +Constants.UNDERLINE+ Constants.SERVICE_ROLE_HOST_MAPPING);
Map<String, List<String>> serviceRoleHostMap = (Map<String, List<String>>) CacheUtils.get(clusterInfo.getClusterCode() + Constants.UNDERLINE + Constants.SERVICE_ROLE_HOST_MAPPING);

for (String serviceName : serviceNames) {
for (String serviceName : serviceNames) {
//1、生成操作指令
ClusterServiceInstanceEntity serviceInstance = serviceInstanceService.getServiceInstanceByClusterIdAndServiceName(clusterId, serviceName);

ClusterServiceCommandEntity commandEntity = ProcessUtils.generateCommandEntity(clusterId,commandType,serviceName);
ClusterServiceCommandEntity commandEntity = ProcessUtils.generateCommandEntity(clusterId, commandType, serviceName);
commandEntity.setServiceInstanceId(serviceInstance.getId());
list.add(commandEntity);
String commandId = commandEntity.getCommandId();
Expand All @@ -113,22 +113,22 @@ public Result generateCommand(Integer clusterId, CommandType commandType,List<St
List<FrameServiceRoleEntity> serviceRoleList = (List<FrameServiceRoleEntity>) result.getData();
HashMap<String, ClusterServiceCommandHostEntity> map = new HashMap<>();
for (FrameServiceRoleEntity serviceRole : serviceRoleList) {
if(Objects.nonNull(serviceRoleHostMap) && serviceRoleHostMap.containsKey(serviceRole.getServiceRoleName())){
if (Objects.nonNull(serviceRoleHostMap) && serviceRoleHostMap.containsKey(serviceRole.getServiceRoleName())) {
List<String> hosts = serviceRoleHostMap.get(serviceRole.getServiceRoleName());
for (String hostname : hosts) {
if(alreadyExistsServiceRole(serviceRole.getServiceRoleName(),hostname,clusterId)){
if (alreadyExistsServiceRole(serviceRole.getServiceRoleName(), hostname, clusterId)) {
continue;
}else{
} else {
ClusterServiceCommandHostEntity commandHost;
if(map.containsKey(hostname)){
if (map.containsKey(hostname)) {
commandHost = map.get(hostname);
}else{
} else {
commandHost = ProcessUtils.generateCommandHostEntity(commandId, hostname);
commandHostList.add(commandHost);
map.put(hostname,commandHost);
map.put(hostname, commandHost);
}
//4、生成主机操作指令
ClusterServiceCommandHostCommandEntity hostCommand = ProcessUtils.generateCommandHostCommandEntity(commandType, commandId, serviceRole.getServiceRoleName(),serviceRole.getServiceRoleType(), commandHost);
ClusterServiceCommandHostCommandEntity hostCommand = ProcessUtils.generateCommandHostCommandEntity(commandType, commandId, serviceRole.getServiceRoleName(), serviceRole.getServiceRoleType(), commandHost);
hostCommandList.add(hostCommand);
}
}
Expand All @@ -138,12 +138,12 @@ public Result generateCommand(Integer clusterId, CommandType commandType,List<St
commandService.saveBatch(list);
commandHostService.saveBatch(commandHostList);
hostCommandService.saveBatch(hostCommandList);
return Result.success(String.join(",",commandIds));
return Result.success(String.join(",", commandIds));
}

private boolean alreadyExistsServiceRole(String serviceRoleName, String hostname, Integer clusterId) {
ClusterServiceRoleInstanceEntity serviceRole = roleInstanceService.getOneServiceRole(serviceRoleName, hostname, clusterId);
if(Objects.nonNull(serviceRole) ){
if (Objects.nonNull(serviceRole)) {
return true;
}
return false;
Expand All @@ -153,15 +153,13 @@ private boolean alreadyExistsServiceRole(String serviceRoleName, String hostname
@Override
public Result getServiceCommandlist(Integer clusterId, Integer page, Integer pageSize) {
Integer offset = (page - 1) * pageSize;
List<ClusterServiceCommandEntity> list = this.list(new QueryWrapper<ClusterServiceCommandEntity>()
.orderByDesc(Constants.CREATE_TIME)
.last("limit " + offset + "," + pageSize));
List<ClusterServiceCommandEntity> list = this.list(new QueryWrapper<ClusterServiceCommandEntity>().orderByDesc(Constants.CREATE_TIME).last("limit " + offset + "," + pageSize));
Integer total = this.count();
for (ClusterServiceCommandEntity commandEntity : list) {
commandEntity.setCommandStateCode(commandEntity.getCommandState().getValue());
Date createTime = commandEntity.getCreateTime();
Date endTime = commandEntity.getEndTime();
if(Objects.isNull(endTime)){
if (Objects.isNull(endTime)) {
endTime = new Date();
}
long between = DateUtil.between(createTime, endTime, DateUnit.MS);
Expand All @@ -175,6 +173,7 @@ public Result getServiceCommandlist(Integer clusterId, Integer page, Integer pag
* 1、生成指令
* 2、生成主机指令
* 3、生产主机上操作指令
*
* @param clusterId
* @param commandType
* @param serviceInstanceIds
Expand All @@ -190,7 +189,7 @@ public Result generateServiceCommand(Integer clusterId, CommandType commandType,
int id = Integer.parseInt(serviceInstanceId);
//查询服务对应的服务角色实例
List<ClusterServiceRoleInstanceEntity> roleInstanceList = roleInstanceService.getServiceRoleInstanceListByServiceId(id);
if(Objects.isNull(roleInstanceList) || roleInstanceList.size() == 0){
if (Objects.isNull(roleInstanceList) || roleInstanceList.size() == 0) {
continue;
}
ClusterServiceInstanceEntity serviceInstance = serviceInstanceService.getById(id);
Expand All @@ -203,31 +202,40 @@ public Result generateServiceCommand(Integer clusterId, CommandType commandType,
HashMap<String, ClusterServiceCommandHostEntity> map = new HashMap<>();
for (ClusterServiceRoleInstanceEntity roleInstance : roleInstanceList) {
ClusterServiceCommandHostEntity commandHost;
if(map.containsKey(roleInstance.getHostname())){
if (map.containsKey(roleInstance.getHostname())) {
commandHost = map.get(roleInstance.getHostname());
}else {
} else {
commandHost = ProcessUtils.generateCommandHostEntity(commandId, roleInstance.getHostname());
commandHostList.add(commandHost);
}
ClusterServiceCommandHostCommandEntity hostCommand = ProcessUtils.generateCommandHostCommandEntity(commandType, commandId, roleInstance.getServiceRoleName(),roleInstance.getRoleType(), commandHost);
ClusterServiceCommandHostCommandEntity hostCommand = ProcessUtils.generateCommandHostCommandEntity(commandType, commandId, roleInstance.getServiceRoleName(), roleInstance.getRoleType(), commandHost);
hostCommandList.add(hostCommand);
map.put(roleInstance.getHostname(),commandHost);
map.put(roleInstance.getHostname(), commandHost);
}
}
if(list.size() > 0){
if (list.size() > 0) {
commandService.saveBatch(list);
commandHostService.saveBatch(commandHostList);
hostCommandService.saveBatch(hostCommandList);

//通知commandActor执行命令
ActorRef dagBuildActor = ActorUtils.getLocalActor(DAGBuildActor.class,ActorUtils.getActorRefName(DAGBuildActor.class));
dagBuildActor.tell(new StartExecuteCommandCommand(commandIds,clusterId, commandType),ActorRef.noSender());
ActorRef dagBuildActor = ActorUtils.getLocalActor(DAGBuildActor.class, ActorUtils.getActorRefName(DAGBuildActor.class));
dagBuildActor.tell(new StartExecuteCommandCommand(commandIds, clusterId, commandType), ActorRef.noSender());
}
return Result.success(String.join(",",commandIds));
return Result.success(String.join(",", commandIds));
}

@Override
public Result generateServiceRoleCommand(Integer clusterId, CommandType commandType, Integer serviceInstanceId,List<String> serviceRoleInstanceIds) {
public Result generateServiceRoleCommands(Integer clusterId, CommandType commandType, Map<Integer, List<String>> instanceIdMap) {
Result result = null;
for (Map.Entry<Integer, List<String>> entry : instanceIdMap.entrySet()) {
result = generateServiceRoleCommand(clusterId, commandType, entry.getKey(), entry.getValue());
}
return result;
}

@Override
public Result generateServiceRoleCommand(Integer clusterId, CommandType commandType, Integer serviceInstanceId, List<String> serviceRoleInstanceIds) {
List<ClusterServiceCommandEntity> list = new ArrayList<>();
List<ClusterServiceCommandHostEntity> commandHostList = new ArrayList<>();
List<ClusterServiceCommandHostCommandEntity> hostCommandList = new ArrayList<>();
Expand All @@ -246,33 +254,33 @@ public Result generateServiceRoleCommand(Integer clusterId, CommandType commandT
ClusterServiceRoleInstanceEntity roleInstance = roleInstanceService.getById(id);

ClusterServiceCommandHostEntity commandHost;
if(map.containsKey(roleInstance.getHostname())){
if (map.containsKey(roleInstance.getHostname())) {
commandHost = map.get(roleInstance.getHostname());
}else {
} else {
commandHost = ProcessUtils.generateCommandHostEntity(commandId, roleInstance.getHostname());
commandHostList.add(commandHost);
}
ClusterServiceCommandHostCommandEntity hostCommand = ProcessUtils.generateCommandHostCommandEntity(commandType, commandId, roleInstance.getServiceRoleName(),roleInstance.getRoleType(), commandHost);
ClusterServiceCommandHostCommandEntity hostCommand = ProcessUtils.generateCommandHostCommandEntity(commandType, commandId, roleInstance.getServiceRoleName(), roleInstance.getRoleType(), commandHost);
hostCommandList.add(hostCommand);
map.put(roleInstance.getHostname(),commandHost);
map.put(roleInstance.getHostname(), commandHost);
}
commandService.saveBatch(list);
commandHostService.saveBatch(commandHostList);
hostCommandService.saveBatch(hostCommandList);

//通知commandActor执行命令
ActorRef dagBuildActor = ActorUtils.getLocalActor(DAGBuildActor.class,ActorUtils.getActorRefName(DAGBuildActor.class));
dagBuildActor.tell(new StartExecuteCommandCommand(commandIds,clusterId, commandType),ActorRef.noSender());
return Result.success(String.join(",",commandIds));
ActorRef dagBuildActor = ActorUtils.getLocalActor(DAGBuildActor.class, ActorUtils.getActorRefName(DAGBuildActor.class));
dagBuildActor.tell(new StartExecuteCommandCommand(commandIds, clusterId, commandType), ActorRef.noSender());
return Result.success(String.join(",", commandIds));
}

@Override
public void startExecuteCommand(Integer clusterId, String commandType, String commandIds) {
List<String> list = Arrays.asList(commandIds.split(","));
CommandType command = EnumUtil.fromString(CommandType.class, commandType);
//通知commandActor执行命令
ActorRef dagBuildActor = ActorUtils.getLocalActor(DAGBuildActor.class,ActorUtils.getActorRefName(DAGBuildActor.class));
dagBuildActor.tell(new StartExecuteCommandCommand(list,clusterId, command),ActorRef.noSender());
ActorRef dagBuildActor = ActorUtils.getLocalActor(DAGBuildActor.class, ActorUtils.getActorRefName(DAGBuildActor.class));
dagBuildActor.tell(new StartExecuteCommandCommand(list, clusterId, command), ActorRef.noSender());
}

@Override
Expand All @@ -283,11 +291,6 @@ public void cancelCommand(String commandId) {

@Override
public ClusterServiceCommandEntity getLastRestartCommand(Integer serviceInstanceId) {
return this.getOne(new QueryWrapper<ClusterServiceCommandEntity>()
.eq(Constants.SERVICE_INSTANCE_ID,serviceInstanceId)
.eq(Constants.COMMAND_TYPE,CommandType.RESTART_SERVICE.getValue())
.or()
.eq(Constants.COMMAND_TYPE,CommandType.INSTALL_SERVICE.getValue())
.orderByDesc(Constants.CREATE_TIME).last("limit 1"));
return this.getOne(new QueryWrapper<ClusterServiceCommandEntity>().eq(Constants.SERVICE_INSTANCE_ID, serviceInstanceId).eq(Constants.COMMAND_TYPE, CommandType.RESTART_SERVICE.getValue()).or().eq(Constants.COMMAND_TYPE, CommandType.INSTALL_SERVICE.getValue()).orderByDesc(Constants.CREATE_TIME).last("limit 1"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,15 @@ public class ClusterServiceRoleInstanceServiceImpl extends ServiceImpl<ClusterSe
@Autowired
private ClusterAlertHistoryService alertHistoryService;

@Override
public List<ClusterServiceRoleInstanceEntity> getStoppedServiceRoleListByHostnameAndClusterId(String hostname, Integer clusterId) {
return this.lambdaQuery()
.eq(ClusterServiceRoleInstanceEntity::getClusterId, clusterId)
.eq(ClusterServiceRoleInstanceEntity::getHostname, hostname)
.eq(ClusterServiceRoleInstanceEntity::getServiceRoleState, ServiceRoleState.STOP)
.list();
}

@Override
public List<ClusterServiceRoleInstanceEntity> getServiceRoleListByHostnameAndClusterId(String hostname, Integer clusterId) {
return this.list(new QueryWrapper<ClusterServiceRoleInstanceEntity>()
Expand Down