From be100866297e9bfa672e3af109893e91e70ca680 Mon Sep 17 00:00:00 2001 From: ruanwe <88fantasy@gmail.com> Date: Mon, 7 Aug 2023 10:31:35 +0800 Subject: [PATCH 1/7] [fix][api] delete cluster MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 新增集群状态[停止],当服务全停止时集群会改为停止状态, 此时可以删除集群 --- .../api/DataSophonApplicationServer.java | 2 + .../com/datasophon/api/master/ActorUtils.java | 15 ++ .../api/master/ClusterCheckActor.java | 77 ++++++ .../datasophon/api/master/HostCheckActor.java | 241 +++++++++--------- .../api/master/ServiceCommandActor.java | 22 +- .../ClusterServiceRoleInstanceService.java | 4 + .../service/impl/ClusterInfoServiceImpl.java | 19 +- .../ClusterServiceInstanceServiceImpl.java | 19 +- ...ClusterServiceRoleInstanceServiceImpl.java | 16 +- .../common/command/ClusterCheckCommand.java | 28 ++ .../dao/entity/ClusterInfoEntity.java | 2 +- .../ClusterServiceRoleInstanceEntity.java | 2 +- .../datasophon/dao/enums/ClusterState.java | 8 + datasophon-ui/src/pages/colonyManage/list.vue | 4 +- 14 files changed, 299 insertions(+), 160 deletions(-) create mode 100644 datasophon-api/src/main/java/com/datasophon/api/master/ClusterCheckActor.java create mode 100644 datasophon-common/src/main/java/com/datasophon/common/command/ClusterCheckCommand.java diff --git a/datasophon-api/src/main/java/com/datasophon/api/DataSophonApplicationServer.java b/datasophon-api/src/main/java/com/datasophon/api/DataSophonApplicationServer.java index 5bab4cf0..d34958a7 100644 --- a/datasophon-api/src/main/java/com/datasophon/api/DataSophonApplicationServer.java +++ b/datasophon-api/src/main/java/com/datasophon/api/DataSophonApplicationServer.java @@ -17,6 +17,7 @@ package com.datasophon.api; +import cn.hutool.extra.spring.EnableSpringUtil; import com.datasophon.api.master.ActorUtils; import com.datasophon.common.Constants; import com.datasophon.common.cache.CacheUtils; @@ -36,6 +37,7 @@ @ServletComponentScan @ComponentScan("com.datasophon") @MapperScan("com.datasophon.dao") +@EnableSpringUtil public class DataSophonApplicationServer extends SpringBootServletInitializer { public static void main(String[] args) { diff --git a/datasophon-api/src/main/java/com/datasophon/api/master/ActorUtils.java b/datasophon-api/src/main/java/com/datasophon/api/master/ActorUtils.java index 9e8075fd..204fcade 100644 --- a/datasophon-api/src/main/java/com/datasophon/api/master/ActorUtils.java +++ b/datasophon-api/src/main/java/com/datasophon/api/master/ActorUtils.java @@ -23,6 +23,7 @@ import akka.actor.Props; import akka.util.Timeout; import com.datasophon.api.master.alert.ServiceRoleCheckActor; +import com.datasophon.common.command.ClusterCheckCommand; import com.datasophon.common.command.HostCheckCommand; import com.datasophon.common.command.ServiceRoleCheckCommand; import com.typesafe.config.Config; @@ -70,6 +71,9 @@ public static void init() throws UnknownHostException, NoSuchAlgorithmException actorSystem.actorOf(Props.create(MasterNodeProcessingActor.class), getActorRefName(MasterNodeProcessingActor.class)); + ActorRef clusterCheckActor = + actorSystem.actorOf(Props.create(ClusterCheckActor.class), getActorRefName(ClusterCheckActor.class)); + // 节点检测 5m 检测一次 actorSystem.scheduler().schedule( FiniteDuration.apply(30L, TimeUnit.SECONDS), @@ -86,6 +90,17 @@ public static void init() throws UnknownHostException, NoSuchAlgorithmException new ServiceRoleCheckCommand(), actorSystem.dispatcher(), ActorRef.noSender()); + + // 集群检测 1m 检测一次 + actorSystem.scheduler().schedule( + FiniteDuration.apply(30L, TimeUnit.SECONDS), + FiniteDuration.apply(60L, TimeUnit.SECONDS), + clusterCheckActor, + new ClusterCheckCommand(), + actorSystem.dispatcher(), + ActorRef.noSender()); + + rand = SecureRandom.getInstanceStrong(); } diff --git a/datasophon-api/src/main/java/com/datasophon/api/master/ClusterCheckActor.java b/datasophon-api/src/main/java/com/datasophon/api/master/ClusterCheckActor.java new file mode 100644 index 00000000..25edab97 --- /dev/null +++ b/datasophon-api/src/main/java/com/datasophon/api/master/ClusterCheckActor.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datasophon.api.master; + +import akka.actor.UntypedActor; +import cn.hutool.extra.spring.SpringUtil; +import com.datasophon.api.service.ClusterInfoService; +import com.datasophon.api.service.ClusterServiceRoleInstanceService; +import com.datasophon.common.command.ClusterCheckCommand; +import com.datasophon.common.utils.Result; +import com.datasophon.dao.entity.ClusterInfoEntity; +import com.datasophon.dao.entity.ClusterServiceRoleInstanceEntity; +import com.datasophon.dao.enums.ClusterState; +import com.datasophon.dao.enums.ServiceRoleState; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * 节点状态监测 + */ +public class ClusterCheckActor extends UntypedActor { + + private static final Logger logger = LoggerFactory.getLogger(ClusterCheckActor.class); + + @Override + public void onReceive(Object msg) throws Throwable { + if (msg instanceof ClusterCheckCommand) { + logger.info("start to check cluster info"); + ClusterServiceRoleInstanceService roleInstanceService = + SpringUtil.getBean(ClusterServiceRoleInstanceService.class); + ClusterInfoService clusterInfoService = + SpringUtil.getBean(ClusterInfoService.class); + + // Host or cluster + final ClusterCheckCommand hostCheckCommand = (ClusterCheckCommand) msg; + + // 获取所有集群 + Result result = clusterInfoService.getClusterList(); + List clusterList = (List) result.getData(); + + for (ClusterInfoEntity clusterInfoEntity : clusterList) { + // 获取集群上正在运行的服务 + int clusterId = clusterInfoEntity.getId(); + List roleInstanceList = roleInstanceService.getServiceRoleInstanceListByClusterId(clusterId); + if (!ClusterState.NEED_CONFIG.equals(clusterInfoEntity.getClusterState())) { + if (!roleInstanceList.isEmpty()) { + if (roleInstanceList.stream().allMatch(roleInstance -> ServiceRoleState.STOP.equals(roleInstance.getServiceRoleState()))) { + clusterInfoService.updateClusterState(clusterId, ClusterState.STOP.getValue()); + } else { + clusterInfoService.updateClusterState(clusterId, ClusterState.RUNNING.getValue()); + } + } + + } + } + } else { + unhandled(msg); + } + } +} diff --git a/datasophon-api/src/main/java/com/datasophon/api/master/HostCheckActor.java b/datasophon-api/src/main/java/com/datasophon/api/master/HostCheckActor.java index 0d661f4a..05246286 100644 --- a/datasophon-api/src/main/java/com/datasophon/api/master/HostCheckActor.java +++ b/datasophon-api/src/main/java/com/datasophon/api/master/HostCheckActor.java @@ -21,6 +21,7 @@ import akka.actor.UntypedActor; import akka.pattern.Patterns; import akka.util.Timeout; +import cn.hutool.extra.spring.SpringUtil; import com.datasophon.api.service.ClusterHostService; import com.datasophon.api.service.ClusterInfoService; import com.datasophon.api.service.ClusterServiceRoleInstanceService; @@ -54,132 +55,132 @@ */ public class HostCheckActor extends UntypedActor { - private static final Logger logger = LoggerFactory.getLogger(HostCheckActor.class); + private static final Logger logger = LoggerFactory.getLogger(HostCheckActor.class); - @Override - public void onReceive(Object msg) throws Throwable { - if (msg instanceof HostCheckCommand) { - logger.info("start to check host info"); - ClusterHostService clusterHostService = - SpringTool.getApplicationContext().getBean(ClusterHostService.class); - ClusterServiceRoleInstanceService roleInstanceService = - SpringTool.getApplicationContext().getBean(ClusterServiceRoleInstanceService.class); - ClusterInfoService clusterInfoService = - SpringTool.getApplicationContext().getBean(ClusterInfoService.class); + @Override + public void onReceive(Object msg) throws Throwable { + if (msg instanceof HostCheckCommand) { + logger.info("start to check host info"); + ClusterHostService clusterHostService = + SpringUtil.getBean(ClusterHostService.class); + ClusterServiceRoleInstanceService roleInstanceService = + SpringUtil.getBean(ClusterServiceRoleInstanceService.class); + ClusterInfoService clusterInfoService = + SpringUtil.getBean(ClusterInfoService.class); - // Host or cluster - final HostCheckCommand hostCheckCommand = (HostCheckCommand)msg; - final HostInfo hostInfo = hostCheckCommand.getHostInfo(); + // Host or cluster + final HostCheckCommand hostCheckCommand = (HostCheckCommand) msg; + final HostInfo hostInfo = hostCheckCommand.getHostInfo(); - // 获取当前安装并且正在运行的集群 - Result result = clusterInfoService.runningClusterList(); - List clusterList = (List) result.getData(); + // 获取当前安装并且正在运行的集群 + Result result = clusterInfoService.runningClusterList(); + List clusterList = (List) result.getData(); - for (ClusterInfoEntity clusterInfoEntity : clusterList) { - // 获取集群上安装的 Prometheus 服务, 从 Prometheus 获取CPU、磁盘使用量等 - ClusterServiceRoleInstanceEntity prometheusInstance = - roleInstanceService.getOneServiceRole("Prometheus", "", clusterInfoEntity.getId()); - if (Objects.nonNull(prometheusInstance)) { - // 集群正常安装了 Prometheus - List list = clusterHostService.getHostListByClusterId(clusterInfoEntity.getId()); - String promUrl = "http://" + prometheusInstance.getHostname() + ":9090/api/v1/query"; - for (ClusterHostEntity clusterHostEntity : list) { - if(hostInfo != null && !StringUtils.equals(clusterHostEntity.getHostname(), hostInfo.getHostname())) { - // 指定了节点,直接只处理这一个节点的 - continue; - } - try { - String hostname = clusterHostEntity.getHostname(); - // 查询内存总量 - String totalMemPromQl = "node_memory_MemTotal_bytes{job=~\"node\",instance=\"" + hostname - + ":9100\"}/1024/1024/1024"; - String totalMemStr = PromInfoUtils.getSinglePrometheusMetric(promUrl, totalMemPromQl); - if (StringUtils.isNotBlank(totalMemStr)) { - int totalMem = Double.valueOf(totalMemStr).intValue(); - clusterHostEntity.setTotalMem(totalMem); - } - // 查询内存使用量 - String memAvailablePromQl = "node_memory_MemAvailable_bytes{job=~\"node\",instance=\"" - + hostname + ":9100\"}/1024/1024/1024"; - String memAvailableStr = - PromInfoUtils.getSinglePrometheusMetric(promUrl, memAvailablePromQl); - if (StringUtils.isNotBlank(memAvailableStr)) { - int memAvailable = Double.valueOf(memAvailableStr).intValue(); - Integer memUsed = clusterHostEntity.getTotalMem() - memAvailable; - clusterHostEntity.setUsedMem(memUsed); - } - // 总磁盘容量 - String totalDistPromQl = "sum(node_filesystem_size_bytes{instance=\"" + hostname - + ":9100\",fstype=~\"ext4|xfs\",mountpoint !~\".*pod.*\"})/1024/1024/1024"; - String totalDiskStr = PromInfoUtils.getSinglePrometheusMetric(promUrl, totalDistPromQl); - if (StringUtils.isNotBlank(totalDiskStr)) { - int totalDisk = Double.valueOf(totalDiskStr).intValue(); - clusterHostEntity.setTotalDisk(totalDisk); - } - // 查询磁盘使用量 - String diskUsedPromQl = "sum(node_filesystem_size_bytes{instance=\"" + hostname - + ":9100\",fstype=~\"ext.*|xfs\",mountpoint !~\".*pod.*\"}-node_filesystem_free_bytes{instance=\"" - + hostname - + ":9100\",fstype=~\"ext.*|xfs\",mountpoint !~\".*pod.*\"})/1024/1024/1024"; - String diskUsed = PromInfoUtils.getSinglePrometheusMetric(promUrl, diskUsedPromQl); - if (StringUtils.isNotBlank(diskUsed)) { - clusterHostEntity.setUsedDisk(Double.valueOf(diskUsed).intValue()); - } - // 查询cpu负载 - String cpuLoadPromQl = "node_load5{job=~\"node\",instance=\"" + hostname + ":9100\"}"; - String cpuLoad = PromInfoUtils.getSinglePrometheusMetric(promUrl, cpuLoadPromQl); - if (StringUtils.isNotBlank(cpuLoad)) { - clusterHostEntity.setAverageLoad(cpuLoad); - } - } catch (Exception e) { - logger.warn("check cluster state error, cause: {}", e.getMessage()); - } - } - if (list.size() > 0) { - clusterHostService.updateBatchById(list); - } - } else { - // 没有 Prometheus?直接获取节点,通过 rpc 检测是否启动 - List hosts = clusterHostService.getHostListByClusterId(clusterInfoEntity.getId()); - List checkedHosts = new ArrayList<>(hosts.size()); - for (ClusterHostEntity host : hosts) { - if(hostInfo != null && !StringUtils.equals(host.getHostname(), hostInfo.getHostname())) { - // 指定了节点,直接只处理这一个节点的 - continue; - } - // copy 一个新的,只更新状态 - ClusterHostEntity checkedHost = new ClusterHostEntity(); - checkedHost.setId(host.getId()); - checkedHost.setCheckTime(new Date()); - try { - // rpc 检测 - final ActorRef pingActor = ActorUtils.getRemoteActor(host.getHostname(), "pingActor"); - PingCommand pingCommand = new PingCommand(); - pingCommand.setMessage("ping"); - Timeout timeout = new Timeout(Duration.create(180, TimeUnit.SECONDS)); - Future execFuture = Patterns.ask(pingActor, pingCommand, timeout); - ExecResult execResult = (ExecResult) Await.result(execFuture, timeout.duration()); - if (execResult.getExecResult()) { - logger.info("ping host: {} success", host.getHostname()); - } else { - logger.warn("ping host: {} fail, reason: {}", host.getHostname(), execResult.getExecOut()); - throw new IllegalStateException("ping host: " + host.getHostname() + " failed."); - } - checkedHost.setHostState(1); - checkedHost.setManaged(MANAGED.YES); - } catch (Exception e) { - logger.warn("host: " + host.getHostname() + " rpc error, cause: " + e.getMessage()); - checkedHost.setHostState(2); - } - checkedHosts.add(checkedHost); - } - if (checkedHosts.size() > 0) { - clusterHostService.updateBatchById(checkedHosts); - } - } + for (ClusterInfoEntity clusterInfoEntity : clusterList) { + // 获取集群上安装的 Prometheus 服务, 从 Prometheus 获取CPU、磁盘使用量等 + ClusterServiceRoleInstanceEntity prometheusInstance = + roleInstanceService.getOneServiceRole("Prometheus", "", clusterInfoEntity.getId()); + if (Objects.nonNull(prometheusInstance)) { + // 集群正常安装了 Prometheus + List list = clusterHostService.getHostListByClusterId(clusterInfoEntity.getId()); + String promUrl = "http://" + prometheusInstance.getHostname() + ":9090/api/v1/query"; + for (ClusterHostEntity clusterHostEntity : list) { + if (hostInfo != null && !StringUtils.equals(clusterHostEntity.getHostname(), hostInfo.getHostname())) { + // 指定了节点,直接只处理这一个节点的 + continue; } + try { + String hostname = clusterHostEntity.getHostname(); + // 查询内存总量 + String totalMemPromQl = "node_memory_MemTotal_bytes{job=~\"node\",instance=\"" + hostname + + ":9100\"}/1024/1024/1024"; + String totalMemStr = PromInfoUtils.getSinglePrometheusMetric(promUrl, totalMemPromQl); + if (StringUtils.isNotBlank(totalMemStr)) { + int totalMem = Double.valueOf(totalMemStr).intValue(); + clusterHostEntity.setTotalMem(totalMem); + } + // 查询内存使用量 + String memAvailablePromQl = "node_memory_MemAvailable_bytes{job=~\"node\",instance=\"" + + hostname + ":9100\"}/1024/1024/1024"; + String memAvailableStr = + PromInfoUtils.getSinglePrometheusMetric(promUrl, memAvailablePromQl); + if (StringUtils.isNotBlank(memAvailableStr)) { + int memAvailable = Double.valueOf(memAvailableStr).intValue(); + Integer memUsed = clusterHostEntity.getTotalMem() - memAvailable; + clusterHostEntity.setUsedMem(memUsed); + } + // 总磁盘容量 + String totalDistPromQl = "sum(node_filesystem_size_bytes{instance=\"" + hostname + + ":9100\",fstype=~\"ext4|xfs\",mountpoint !~\".*pod.*\"})/1024/1024/1024"; + String totalDiskStr = PromInfoUtils.getSinglePrometheusMetric(promUrl, totalDistPromQl); + if (StringUtils.isNotBlank(totalDiskStr)) { + int totalDisk = Double.valueOf(totalDiskStr).intValue(); + clusterHostEntity.setTotalDisk(totalDisk); + } + // 查询磁盘使用量 + String diskUsedPromQl = "sum(node_filesystem_size_bytes{instance=\"" + hostname + + ":9100\",fstype=~\"ext.*|xfs\",mountpoint !~\".*pod.*\"}-node_filesystem_free_bytes{instance=\"" + + hostname + + ":9100\",fstype=~\"ext.*|xfs\",mountpoint !~\".*pod.*\"})/1024/1024/1024"; + String diskUsed = PromInfoUtils.getSinglePrometheusMetric(promUrl, diskUsedPromQl); + if (StringUtils.isNotBlank(diskUsed)) { + clusterHostEntity.setUsedDisk(Double.valueOf(diskUsed).intValue()); + } + // 查询cpu负载 + String cpuLoadPromQl = "node_load5{job=~\"node\",instance=\"" + hostname + ":9100\"}"; + String cpuLoad = PromInfoUtils.getSinglePrometheusMetric(promUrl, cpuLoadPromQl); + if (StringUtils.isNotBlank(cpuLoad)) { + clusterHostEntity.setAverageLoad(cpuLoad); + } + } catch (Exception e) { + logger.warn("check cluster state error, cause: {}", e.getMessage()); + } + } + if (list.size() > 0) { + clusterHostService.updateBatchById(list); + } } else { - unhandled(msg); + // 没有 Prometheus?直接获取节点,通过 rpc 检测是否启动 + List hosts = clusterHostService.getHostListByClusterId(clusterInfoEntity.getId()); + List checkedHosts = new ArrayList<>(hosts.size()); + for (ClusterHostEntity host : hosts) { + if (hostInfo != null && !StringUtils.equals(host.getHostname(), hostInfo.getHostname())) { + // 指定了节点,直接只处理这一个节点的 + continue; + } + // copy 一个新的,只更新状态 + ClusterHostEntity checkedHost = new ClusterHostEntity(); + checkedHost.setId(host.getId()); + checkedHost.setCheckTime(new Date()); + try { + // rpc 检测 + final ActorRef pingActor = ActorUtils.getRemoteActor(host.getHostname(), "pingActor"); + PingCommand pingCommand = new PingCommand(); + pingCommand.setMessage("ping"); + Timeout timeout = new Timeout(Duration.create(180, TimeUnit.SECONDS)); + Future execFuture = Patterns.ask(pingActor, pingCommand, timeout); + ExecResult execResult = (ExecResult) Await.result(execFuture, timeout.duration()); + if (execResult.getExecResult()) { + logger.info("ping host: {} success", host.getHostname()); + } else { + logger.warn("ping host: {} fail, reason: {}", host.getHostname(), execResult.getExecOut()); + throw new IllegalStateException("ping host: " + host.getHostname() + " failed."); + } + checkedHost.setHostState(1); + checkedHost.setManaged(MANAGED.YES); + } catch (Exception e) { + logger.warn("host: " + host.getHostname() + " rpc error, cause: " + e.getMessage()); + checkedHost.setHostState(2); + } + checkedHosts.add(checkedHost); + } + if (checkedHosts.size() > 0) { + clusterHostService.updateBatchById(checkedHosts); + } } + } + } else { + unhandled(msg); } + } } diff --git a/datasophon-api/src/main/java/com/datasophon/api/master/ServiceCommandActor.java b/datasophon-api/src/main/java/com/datasophon/api/master/ServiceCommandActor.java index 4ff75b00..8de9f095 100644 --- a/datasophon-api/src/main/java/com/datasophon/api/master/ServiceCommandActor.java +++ b/datasophon-api/src/main/java/com/datasophon/api/master/ServiceCommandActor.java @@ -100,20 +100,20 @@ public void onReceive(Object msg) throws Throwable { message.getCommandHostId()); Integer totalProgress = service.getHostCommandTotalProgressByHostnameAndCommandHostId(message.getHostname(), message.getCommandHostId()); - Integer progress = totalProgress / size; + int progress = totalProgress / size; commandHost.setCommandProgress(progress); if (progress == 100) { List list = service.findFailedHostCommand(message.getHostname(), message.getCommandHostId()); - if (list.size() > 0) { + if (!list.isEmpty()) { commandHost.setCommandState(CommandState.FAILED); } else { commandHost.setCommandState(CommandState.SUCCESS); } List cancelList = service.findCanceledHostCommand(message.getHostname(), message.getCommandHostId()); - if (cancelList.size() > 0) { + if (!cancelList.isEmpty()) { commandHost.setCommandState(CommandState.CANCEL); } } @@ -121,7 +121,7 @@ public void onReceive(Object msg) throws Throwable { .eq(Constants.COMMAND_HOST_ID, message.getCommandHostId())); Integer size1 = commandHostService.getCommandHostSizeByCommandId(message.getCommandId()); Integer totalProgress1 = commandHostService.getCommandHostTotalProgressByCommandId(message.getCommandId()); - Integer progress1 = totalProgress1 / size1; + int progress1 = totalProgress1 / size1; ClusterServiceCommandEntity command = commandService.lambdaQuery() .eq(ClusterServiceCommandEntity::getCommandId, message.getCommandId()).one(); command.setCommandProgress(progress1); @@ -132,7 +132,7 @@ public void onReceive(Object msg) throws Throwable { String serviceName = command.getServiceName(); ClusterInfoEntity clusterInfo = clusterInfoService.getById(command.getClusterId()); - if (command.getCommandType() == 4 && HDFS.equals(serviceName.toLowerCase())) { + if (command.getCommandType() == 4 && HDFS.equalsIgnoreCase(serviceName)) { //update web ui updateHDFSWebUi(clusterInfo.getId(), command.getServiceInstanceId()); } @@ -140,12 +140,12 @@ public void onReceive(Object msg) throws Throwable { // update cluster state if (command.getCommandType() == 1) { - if (clusterInfo.getClusterState().equals(ClusterState.NEED_CONFIG)) { + if (ClusterState.NEED_CONFIG.equals(clusterInfo.getClusterState())) { clusterInfo.setClusterState(ClusterState.RUNNING); clusterInfoService.updateById(clusterInfo); } - if (HDFS.equals(serviceName.toLowerCase())) { + if (HDFS.equalsIgnoreCase(serviceName)) { ActorRef hdfsECActor = ActorUtils.getLocalActor(HdfsECActor.class, ActorUtils.getActorRefName(HdfsECActor.class)); HdfsEcCommand hdfsEcCommand = new HdfsEcCommand(); @@ -156,7 +156,7 @@ public void onReceive(Object msg) throws Throwable { logger.info("start to generate prometheus config"); ActorRef prometheusActor = ActorUtils.getLocalActor(PrometheusActor.class, ActorUtils.getActorRefName(PrometheusActor.class)); - if (STARROCKS.equals(serviceName.toLowerCase()) || DORIS.equals(serviceName.toLowerCase())) { + if (STARROCKS.equalsIgnoreCase(serviceName) || DORIS.equalsIgnoreCase(serviceName)) { GenerateSRPromConfigCommand prometheusConfigCommand = new GenerateSRPromConfigCommand(); prometheusConfigCommand.setServiceInstanceId(command.getServiceInstanceId()); prometheusConfigCommand.setClusterFrame(clusterInfo.getClusterFrame()); @@ -175,14 +175,14 @@ public void onReceive(Object msg) throws Throwable { } List list = commandHostService.findFailedCommandHost(message.getCommandId()); - if (list.size() > 0) { + if (!list.isEmpty()) { command.setCommandState(CommandState.FAILED); command.setEndTime(new Date()); } List cancelList = commandHostService.findCanceledCommandHost(message.getCommandId()); - if (cancelList.size() > 0) { + if (!cancelList.isEmpty()) { command.setCommandState(CommandState.CANCEL); command.setEndTime(new Date()); } @@ -196,7 +196,7 @@ private void enableAlertConfig(String serviceName, Integer clusterId) { ClusterAlertQuotaService alertQuotaService = SpringTool.getApplicationContext().getBean(ClusterAlertQuotaService.class); List list = alertQuotaService.listAlertQuotaByServiceName(serviceName); - List ids = list.stream().map(e -> e.getId()).collect(Collectors.toList()); + List ids = list.stream().map(ClusterAlertQuota::getId).collect(Collectors.toList()); String alertQuotaIds = StringUtils.join(ids, ","); alertQuotaService.start(clusterId, alertQuotaIds); } diff --git a/datasophon-api/src/main/java/com/datasophon/api/service/ClusterServiceRoleInstanceService.java b/datasophon-api/src/main/java/com/datasophon/api/service/ClusterServiceRoleInstanceService.java index afddae55..68e20492 100644 --- a/datasophon-api/src/main/java/com/datasophon/api/service/ClusterServiceRoleInstanceService.java +++ b/datasophon-api/src/main/java/com/datasophon/api/service/ClusterServiceRoleInstanceService.java @@ -50,6 +50,8 @@ Result listAll(Integer serviceInstanceId, String hostname, Integer serviceRoleSt List getServiceRoleInstanceListByServiceId(int id); + List getServiceRoleInstanceListByClusterId(int clusterId); + Result deleteServiceRole(List idList); List getServiceRoleInstanceListByClusterIdAndRoleName(Integer clusterId, @@ -77,4 +79,6 @@ List getStoppedRoleInstanceOnHost(Integer clus ClusterServiceRoleInstanceEntity getServiceRoleInsByHostAndName(String hostName, String serviceRoleName); List listRoleIns(String hostname, String serviceName); + + } diff --git a/datasophon-api/src/main/java/com/datasophon/api/service/impl/ClusterInfoServiceImpl.java b/datasophon-api/src/main/java/com/datasophon/api/service/impl/ClusterInfoServiceImpl.java index 670181b7..9db4be46 100644 --- a/datasophon-api/src/main/java/com/datasophon/api/service/impl/ClusterInfoServiceImpl.java +++ b/datasophon-api/src/main/java/com/datasophon/api/service/impl/ClusterInfoServiceImpl.java @@ -95,15 +95,14 @@ public class ClusterInfoServiceImpl extends ServiceImpl list = this .list(new QueryWrapper().eq(Constants.CLUSTER_CODE, clusterInfo.getClusterCode())); - if (Objects.nonNull(list) && list.size() >= 1) { + if (Objects.nonNull(list) && !list.isEmpty()) { return Result.error(Status.CLUSTER_CODE_EXISTS.getMsg()); } clusterInfo.setCreateTime(new Date()); @@ -170,11 +169,15 @@ public Result runningClusterList() { @Override public Result updateClusterState(Integer clusterId, Integer clusterState) { ClusterInfoEntity clusterInfo = this.getById(clusterId); - if (clusterState == 2) { - clusterInfo.setClusterState(ClusterState.RUNNING); + ClusterState state = ClusterState.of(clusterState); + if (state != null) { + clusterInfo.setClusterState(state); + this.updateById(clusterInfo); + return Result.success(); + } + else { + return Result.error("未知状态"); } - this.updateById(clusterInfo); - return Result.success(); } @Override @@ -187,7 +190,7 @@ public Result updateCluster(ClusterInfoEntity clusterInfo) { // 集群编码判重 List list = this .list(new QueryWrapper().eq(Constants.CLUSTER_CODE, clusterInfo.getClusterCode())); - if (Objects.nonNull(list) && list.size() >= 1) { + if (Objects.nonNull(list) && !list.isEmpty()) { ClusterInfoEntity clusterInfoEntity = list.get(0); if (!clusterInfoEntity.getId().equals(clusterInfo.getId())) { return Result.error(Status.CLUSTER_CODE_EXISTS.getMsg()); diff --git a/datasophon-api/src/main/java/com/datasophon/api/service/impl/ClusterServiceInstanceServiceImpl.java b/datasophon-api/src/main/java/com/datasophon/api/service/impl/ClusterServiceInstanceServiceImpl.java index 5e22a497..d44ca77c 100644 --- a/datasophon-api/src/main/java/com/datasophon/api/service/impl/ClusterServiceInstanceServiceImpl.java +++ b/datasophon-api/src/main/java/com/datasophon/api/service/impl/ClusterServiceInstanceServiceImpl.java @@ -129,7 +129,7 @@ public Result listAll(Integer clusterId) { List totalRoleList = roleInstanceService.lambdaQuery() .eq(ClusterServiceRoleInstanceEntity::getServiceId, serviceInstance.getId()) .list(); - if (Objects.nonNull(totalRoleList) && totalRoleList.size() == 0) { + if (Objects.nonNull(totalRoleList) && totalRoleList.isEmpty()) { serviceInstance.setServiceState(ServiceState.WAIT_INSTALL); needUpdate = true; } @@ -139,7 +139,7 @@ public Result listAll(Integer clusterId) { .eq(ClusterServiceRoleInstanceEntity::getServiceId, serviceInstance.getId()) .eq(ClusterServiceRoleInstanceEntity::getServiceRoleState, ServiceRoleState.STOP) .list(); - if (Objects.nonNull(roleList) && roleList.size() > 0) { + if (Objects.nonNull(roleList) && !roleList.isEmpty()) { if (!ServiceState.EXISTS_EXCEPTION.equals(serviceInstance.getServiceState())) { serviceInstance.setServiceState(ServiceState.EXISTS_EXCEPTION); needUpdate = true; @@ -157,7 +157,7 @@ public Result listAll(Integer clusterId) { .eq(ClusterServiceRoleInstanceEntity::getServiceId, serviceInstance.getId()) .eq(ClusterServiceRoleInstanceEntity::getServiceRoleState, ServiceRoleState.EXISTS_ALARM) .list(); - if (Objects.nonNull(alarmRoleList) && alarmRoleList.size() > 0) { + if (Objects.nonNull(alarmRoleList) && !alarmRoleList.isEmpty()) { if (!ServiceState.EXISTS_ALARM.equals(serviceInstance.getServiceState()) && !ServiceState.EXISTS_EXCEPTION.equals(serviceInstance.getServiceState())) { serviceInstance.setServiceState(ServiceState.EXISTS_ALARM); @@ -173,7 +173,7 @@ public Result listAll(Integer clusterId) { // 查询是否进行了配置更新 List obsoleteRoleList = roleInstanceService.getObsoleteService(serviceInstance.getId()); - if (Objects.nonNull(obsoleteRoleList) && obsoleteRoleList.size() == 0 + if (Objects.nonNull(obsoleteRoleList) && obsoleteRoleList.isEmpty() && serviceInstance.getNeedRestart() == NeedRestart.YES) { serviceInstance.setNeedRestart(NeedRestart.NO); needUpdate = true; @@ -237,7 +237,7 @@ public Result delServiceInstance(Integer serviceInstanceId) { } List roleGroups = roleGroupService.listRoleGroupByServiceInstanceId(serviceInstanceId); - List roleGroupIds = roleGroups.stream().map(e -> e.getId()).collect(Collectors.toList()); + List roleGroupIds = roleGroups.stream().map(ClusterServiceInstanceRoleGroup::getId).collect(Collectors.toList()); List roleGroupConfigList = roleGroupConfigService.listRoleGroupConfigsByRoleGroupIds(roleGroupIds); List roleInstanceList = @@ -247,9 +247,9 @@ public Result delServiceInstance(Integer serviceInstanceId) { roleGroupService.removeByIds(roleGroupIds); // del role group config roleGroupConfigService - .removeByIds(roleGroupConfigList.stream().map(e -> e.getId()).collect(Collectors.toList())); + .removeByIds(roleGroupConfigList.stream().map(ClusterServiceRoleGroupConfig::getId).collect(Collectors.toList())); // del service role instance - if (roleInstanceList.size() > 0) { + if (!roleInstanceList.isEmpty()) { List roleInsIds = roleInstanceList.stream().map(e -> e.getId().toString()).collect(Collectors.toList()); roleInstanceService.deleteServiceRole(roleInsIds); @@ -272,9 +272,6 @@ public List listRunningServiceInstance(Integer clu private boolean hasRunningRoleInstance(Integer serviceInstanceId) { List list = roleInstanceService.getRunningServiceRoleInstanceListByServiceId(serviceInstanceId); - if (list.size() > 0) { - return true; - } - return false; + return !list.isEmpty(); } } diff --git a/datasophon-api/src/main/java/com/datasophon/api/service/impl/ClusterServiceRoleInstanceServiceImpl.java b/datasophon-api/src/main/java/com/datasophon/api/service/impl/ClusterServiceRoleInstanceServiceImpl.java index 3d19486e..bd397796 100644 --- a/datasophon-api/src/main/java/com/datasophon/api/service/impl/ClusterServiceRoleInstanceServiceImpl.java +++ b/datasophon-api/src/main/java/com/datasophon/api/service/impl/ClusterServiceRoleInstanceServiceImpl.java @@ -147,7 +147,7 @@ public ClusterServiceRoleInstanceEntity getOneServiceRole(String name, String ho @Override public Result listAll(Integer serviceInstanceId, String hostname, Integer serviceRoleState, String serviceRoleName, Integer roleGroupId, Integer page, Integer pageSize) { - Integer offset = (page - 1) * pageSize; + int offset = (page - 1) * pageSize; LambdaQueryChainWrapper wrapper = this.lambdaQuery() .eq(ClusterServiceRoleInstanceEntity::getServiceId, serviceInstanceId) @@ -213,6 +213,11 @@ public List getServiceRoleInstanceListByServic return this.lambdaQuery().eq(ClusterServiceRoleInstanceEntity::getServiceId, id).list(); } + @Override + public List getServiceRoleInstanceListByClusterId(int clusterId) { + return this.lambdaQuery().eq(ClusterServiceRoleInstanceEntity::getClusterId, clusterId).list(); + } + @Override public Result deleteServiceRole(List idList) { Collection list = this.listByIds(idList); @@ -226,7 +231,7 @@ public Result deleteServiceRole(List idList) { needRemoveList.add(instance.getId()); } } - if (needRemoveList.size() > 0) { + if (!needRemoveList.isEmpty()) { alertHistoryService.removeAlertByRoleInstanceIds(needRemoveList); this.removeByIds(needRemoveList); // delete if there is a webui @@ -239,9 +244,8 @@ public Result deleteServiceRole(List idList) { @Override public List getServiceRoleInstanceListByClusterIdAndRoleName(Integer clusterId, String roleName) { - List list = this.list(new QueryWrapper() + return this.list(new QueryWrapper() .eq(Constants.CLUSTER_ID, clusterId).eq(Constants.SERVICE_ROLE_NAME, roleName)); - return list; } @Override @@ -258,7 +262,7 @@ public Result restartObsoleteService(Integer roleGroupId) { List list = this.list(new QueryWrapper() .eq(Constants.ROLE_GROUP_ID, roleGroupId) .eq(Constants.NEET_RESTART, NeedRestart.YES)); - if (Objects.nonNull(list) && list.size() > 0) { + if (Objects.nonNull(list) && !list.isEmpty()) { List ids = list.stream().map(e -> e.getId() + "").collect(Collectors.toList()); commandService.generateServiceRoleCommand(roleGroup.getClusterId(), CommandType.RESTART_SERVICE, roleGroup.getServiceInstanceId(), ids); @@ -300,7 +304,7 @@ public Result decommissionNode(String serviceRoleInstanceIds, String serviceName type = "nmexclude"; roleName = "ResourceManager"; } - if (hosts.size() > 0) { + if (!hosts.isEmpty()) { ProcessUtils.hdfsEcMethond(serviceInstanceId, this, hosts, "blacklist", roleName); } return Result.success(); diff --git a/datasophon-common/src/main/java/com/datasophon/common/command/ClusterCheckCommand.java b/datasophon-common/src/main/java/com/datasophon/common/command/ClusterCheckCommand.java new file mode 100644 index 00000000..ba19f6c0 --- /dev/null +++ b/datasophon-common/src/main/java/com/datasophon/common/command/ClusterCheckCommand.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datasophon.common.command; + +import java.io.Serializable; +import lombok.Data; + +@Data +public class ClusterCheckCommand implements Serializable { + + public ClusterCheckCommand() { + } +} diff --git a/datasophon-dao/src/main/java/com/datasophon/dao/entity/ClusterInfoEntity.java b/datasophon-dao/src/main/java/com/datasophon/dao/entity/ClusterInfoEntity.java index 9bcab86f..abea9e08 100644 --- a/datasophon-dao/src/main/java/com/datasophon/dao/entity/ClusterInfoEntity.java +++ b/datasophon-dao/src/main/java/com/datasophon/dao/entity/ClusterInfoEntity.java @@ -65,7 +65,7 @@ public class ClusterInfoEntity implements Serializable { */ private String frameVersion; /** - * 集群状态 1:待配置2:正在运行 + * 集群状态 1:待配置 2:正在运行 3: 已停止 */ private ClusterState clusterState; /** diff --git a/datasophon-dao/src/main/java/com/datasophon/dao/entity/ClusterServiceRoleInstanceEntity.java b/datasophon-dao/src/main/java/com/datasophon/dao/entity/ClusterServiceRoleInstanceEntity.java index 45b7f78c..e297c108 100644 --- a/datasophon-dao/src/main/java/com/datasophon/dao/entity/ClusterServiceRoleInstanceEntity.java +++ b/datasophon-dao/src/main/java/com/datasophon/dao/entity/ClusterServiceRoleInstanceEntity.java @@ -50,7 +50,7 @@ public class ClusterServiceRoleInstanceEntity implements Serializable { */ private String hostname; /** - * 服务角色状态 1:正在运行2:存在告警3:存在异常4:需要重启 + * 服务角色状态 1:正在运行 2:停止 3:存在告警 4:退役中 5:已退役 */ private ServiceRoleState serviceRoleState; diff --git a/datasophon-dao/src/main/java/com/datasophon/dao/enums/ClusterState.java b/datasophon-dao/src/main/java/com/datasophon/dao/enums/ClusterState.java index 5580dacc..77d34d63 100644 --- a/datasophon-dao/src/main/java/com/datasophon/dao/enums/ClusterState.java +++ b/datasophon-dao/src/main/java/com/datasophon/dao/enums/ClusterState.java @@ -19,9 +19,13 @@ import com.baomidou.mybatisplus.annotation.EnumValue; import com.fasterxml.jackson.annotation.JsonValue; +import java.util.Arrays; +import java.util.Optional; public enum ClusterState { + STOP(3, "停止"), + RUNNING(2, "正在运行"), NEED_CONFIG(1, "待配置"); @@ -52,6 +56,10 @@ public void setDesc(String desc) { this.desc = desc; } + public static ClusterState of(int value) { + return Arrays.stream(values()).filter(state -> state.getValue() == value).findAny().orElse(null); + } + @Override public String toString() { return this.desc; diff --git a/datasophon-ui/src/pages/colonyManage/list.vue b/datasophon-ui/src/pages/colonyManage/list.vue index 0dbcbc29..067ec724 100644 --- a/datasophon-ui/src/pages/colonyManage/list.vue +++ b/datasophon-ui/src/pages/colonyManage/list.vue @@ -16,12 +16,12 @@
- +
{{ item.clusterName }}
- + {{item.clusterState}}
From 52b528be73bd239d04842ce7220fb1b3c910aefa Mon Sep 17 00:00:00 2001 From: ruanwe <88fantasy@gmail.com> Date: Wed, 9 Aug 2023 09:30:11 +0800 Subject: [PATCH 2/7] [fix][api] delete cluster MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 新增集群状态[删除中][已删除],当删除集群时会切换为[删除中],并发送命令将数据文件夹重命名,最后移除集群主机并置[已删除] --- .../com/datasophon/api/master/ActorUtils.java | 22 +- .../datasophon/api/master/ClusterActor.java | 191 ++++++++++++++++++ .../api/master/ClusterCheckActor.java | 77 ------- .../api/master/MasterServiceActor.java | 18 +- .../api/master/WorkerServiceActor.java | 2 +- .../api/service/ClusterHostService.java | 6 +- .../service/impl/ClusterHostServiceImpl.java | 12 +- .../service/impl/ClusterInfoServiceImpl.java | 41 ++-- .../ClusterServiceInstanceServiceImpl.java | 1 - .../datasophon/api/utils/ProcessUtils.java | 102 ++++++---- .../java/com/datasophon/common/Constants.java | 2 + ...rCheckCommand.java => ClusterCommand.java} | 20 +- .../common/enums/ClusterCommandType.java | 8 + .../dao/entity/ClusterInfoEntity.java | 2 +- .../datasophon/dao/enums/ClusterState.java | 5 + .../handler/ConfigureServiceHandler.java | 19 ++ 16 files changed, 353 insertions(+), 175 deletions(-) create mode 100644 datasophon-api/src/main/java/com/datasophon/api/master/ClusterActor.java delete mode 100644 datasophon-api/src/main/java/com/datasophon/api/master/ClusterCheckActor.java rename datasophon-common/src/main/java/com/datasophon/common/command/{ClusterCheckCommand.java => ClusterCommand.java} (66%) create mode 100644 datasophon-common/src/main/java/com/datasophon/common/enums/ClusterCommandType.java diff --git a/datasophon-api/src/main/java/com/datasophon/api/master/ActorUtils.java b/datasophon-api/src/main/java/com/datasophon/api/master/ActorUtils.java index 204fcade..5fedb26e 100644 --- a/datasophon-api/src/main/java/com/datasophon/api/master/ActorUtils.java +++ b/datasophon-api/src/main/java/com/datasophon/api/master/ActorUtils.java @@ -23,9 +23,10 @@ import akka.actor.Props; import akka.util.Timeout; import com.datasophon.api.master.alert.ServiceRoleCheckActor; -import com.datasophon.common.command.ClusterCheckCommand; +import com.datasophon.common.command.ClusterCommand; import com.datasophon.common.command.HostCheckCommand; import com.datasophon.common.command.ServiceRoleCheckCommand; +import com.datasophon.common.enums.ClusterCommandType; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import org.apache.commons.lang.StringUtils; @@ -72,7 +73,7 @@ public static void init() throws UnknownHostException, NoSuchAlgorithmException getActorRefName(MasterNodeProcessingActor.class)); ActorRef clusterCheckActor = - actorSystem.actorOf(Props.create(ClusterCheckActor.class), getActorRefName(ClusterCheckActor.class)); + actorSystem.actorOf(Props.create(ClusterActor.class), getActorRefName(ClusterActor.class)); // 节点检测 5m 检测一次 actorSystem.scheduler().schedule( @@ -93,12 +94,12 @@ public static void init() throws UnknownHostException, NoSuchAlgorithmException // 集群检测 1m 检测一次 actorSystem.scheduler().schedule( - FiniteDuration.apply(30L, TimeUnit.SECONDS), - FiniteDuration.apply(60L, TimeUnit.SECONDS), - clusterCheckActor, - new ClusterCheckCommand(), - actorSystem.dispatcher(), - ActorRef.noSender()); + FiniteDuration.apply(30L, TimeUnit.SECONDS), + FiniteDuration.apply(60L, TimeUnit.SECONDS), + clusterCheckActor, + new ClusterCommand(ClusterCommandType.CHECK), + actorSystem.dispatcher(), + ActorRef.noSender()); rand = SecureRandom.getInstanceStrong(); @@ -156,10 +157,11 @@ public static ActorRef getRemoteActor(String hostname, String actorName) { * shutdown */ public static void shutdown() { - if(actorSystem != null) { + if (actorSystem != null) { try { actorSystem.shutdown(); - } catch (Exception ignore){} + } catch (Exception ignore) { + } actorSystem = null; } } diff --git a/datasophon-api/src/main/java/com/datasophon/api/master/ClusterActor.java b/datasophon-api/src/main/java/com/datasophon/api/master/ClusterActor.java new file mode 100644 index 00000000..47609c3a --- /dev/null +++ b/datasophon-api/src/main/java/com/datasophon/api/master/ClusterActor.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datasophon.api.master; + +import akka.actor.UntypedActor; +import cn.hutool.core.date.DateUtil; +import cn.hutool.extra.spring.SpringUtil; +import com.datasophon.api.service.ClusterHostService; +import com.datasophon.api.service.ClusterInfoService; +import com.datasophon.api.service.ClusterServiceRoleGroupConfigService; +import com.datasophon.api.service.ClusterServiceRoleInstanceService; +import com.datasophon.api.service.ServiceInstallService; +import com.datasophon.api.utils.ProcessUtils; +import com.datasophon.common.Constants; +import com.datasophon.common.command.ClusterCommand; +import com.datasophon.common.enums.ClusterCommandType; +import com.datasophon.common.model.Generators; +import com.datasophon.common.model.ServiceConfig; +import com.datasophon.common.utils.ExecResult; +import com.datasophon.common.utils.Result; +import com.datasophon.dao.entity.ClusterHostEntity; +import com.datasophon.dao.entity.ClusterInfoEntity; +import com.datasophon.dao.entity.ClusterServiceRoleGroupConfig; +import com.datasophon.dao.entity.ClusterServiceRoleInstanceEntity; +import com.datasophon.dao.enums.ClusterState; +import com.datasophon.dao.enums.ServiceRoleState; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * 节点状态监测 + */ +public class ClusterActor extends UntypedActor { + + private static final Logger logger = LoggerFactory.getLogger(ClusterActor.class); + + private static final String DEPRECATED = "Deprecated"; + + @Override + public void onReceive(Object msg) throws Throwable { + if (msg instanceof ClusterCommand) { + logger.info("start to check cluster info"); + ClusterServiceRoleInstanceService roleInstanceService = + SpringUtil.getBean(ClusterServiceRoleInstanceService.class); + ClusterInfoService clusterInfoService = + SpringUtil.getBean(ClusterInfoService.class); + + // Host or cluster + final ClusterCommand clusterCommand = (ClusterCommand) msg; + + if (ClusterCommandType.CHECK.equals(clusterCommand.getCommandType())) { + // 获取所有集群 + Result result = clusterInfoService.getClusterList(); + List clusterList = (List) result.getData(); + + for (ClusterInfoEntity clusterInfoEntity : clusterList) { + // 获取集群上正在运行的服务 + int clusterId = clusterInfoEntity.getId(); + List roleInstanceList = roleInstanceService.getServiceRoleInstanceListByClusterId(clusterId); + if (!ClusterState.NEED_CONFIG.equals(clusterInfoEntity.getClusterState())) { + if (!roleInstanceList.isEmpty()) { + if (roleInstanceList.stream().allMatch(roleInstance -> ServiceRoleState.STOP.equals(roleInstance.getServiceRoleState()))) { + clusterInfoService.updateClusterState(clusterId, ClusterState.STOP.getValue()); + } else { + clusterInfoService.updateClusterState(clusterId, ClusterState.RUNNING.getValue()); + } + } + + } + } + + } else if (ClusterCommandType.UNINSTALL.equals(clusterCommand.getCommandType())) { + Integer clusterId = clusterCommand.getClusterId(); + if (Objects.nonNull(clusterId)) { + ClusterInfoEntity clusterInfo = clusterInfoService.getById(clusterId); + if (Objects.nonNull(clusterInfo)) { + ClusterHostService clusterHostService = SpringUtil.getBean(ClusterHostService.class); + ClusterServiceRoleInstanceService clusterServiceRoleInstanceService = SpringUtil.getBean(ClusterServiceRoleInstanceService.class); + ClusterServiceRoleGroupConfigService clusterServiceRoleGroupConfigService = SpringUtil.getBean(ClusterServiceRoleGroupConfigService.class); + ServiceInstallService serviceInstallService = SpringUtil.getBean(ServiceInstallService.class); + + // 检查服务实例配置与目录 + List roleInstanceList = clusterServiceRoleInstanceService.getServiceRoleInstanceListByClusterId(clusterId); + for (ClusterServiceRoleInstanceEntity roleInstance : roleInstanceList) { + String roleName = roleInstance.getServiceRoleName(); + String hostname = roleInstance.getHostname(); + ClusterServiceRoleGroupConfig config = clusterServiceRoleGroupConfigService.getConfigByRoleGroupId(roleInstance.getRoleGroupId()); + List oldConfigs = ProcessUtils.getServiceConfig(config); + Map> configFileMap = new ConcurrentHashMap<>(); + ProcessUtils.generateConfigFileMap(configFileMap, config); + Predicate filter = c -> Constants.PATH.equals(c.getConfigType()) && !((String) c.getValue()).contains(DEPRECATED); + Consumer peeker = c -> { + String oldPath = (String) c.getValue(); + String newPath = String.format("%s_%s_%s_%s", oldPath, DEPRECATED, clusterId, DateUtil.today()); + c.setDefaultValue(oldPath); + c.setValue(newPath); + }; + + for (Map.Entry> configFile : configFileMap.entrySet()) { + List serviceConfigs = configFile.getValue().stream() + .filter(filter) + .peek(c -> { + peeker.accept(c); + c.setConfigType(Constants.MV_PATH); + }) + .collect(Collectors.toList()); + if (!serviceConfigs.isEmpty()) { + configFileMap.replace(configFile.getKey(), serviceConfigs); + } else { + configFileMap.remove(configFile.getKey()); + } + } + + if (!configFileMap.isEmpty()) { + // 分发重命名命令 + ExecResult execResult = new ExecResult(); + try { + logger.info( + "start to uninstall {} in host {}", + roleName, + hostname); + execResult = ProcessUtils.configServiceRoleInstance(clusterInfo, configFileMap, roleInstance); + if (Objects.nonNull(execResult) && execResult.getExecResult()) { + logger.info( + "{} uninstall success in {}", + roleName, + hostname); + //保存配置 + serviceInstallService.saveServiceConfig(clusterId, + roleInstance.getServiceName(), + oldConfigs.stream().peek(c -> { + if (filter.test(c)) { + peeker.accept(c); + } + }).collect(Collectors.toList()), + roleInstance.getRoleGroupId()); + + } else { + logger.info( + "{} uninstall failed in {}", + roleName, + hostname); + return; + } + + } catch (Exception e) { + logger.info( + "{} uninstall failed in {}", + roleName, + hostname); + logger.error(ProcessUtils.getExceptionMessage(e)); + return; + } + } + } + List hostList = clusterHostService.getHostListByClusterId(clusterId); + clusterHostService.deleteHosts(hostList.stream().map(h -> String.valueOf(h.getId())).collect(Collectors.joining(Constants.COMMA))); + clusterInfoService.updateClusterState(clusterId, ClusterState.UNINSTALLED.getValue()); + } + } + } + } else { + unhandled(msg); + } + } +} diff --git a/datasophon-api/src/main/java/com/datasophon/api/master/ClusterCheckActor.java b/datasophon-api/src/main/java/com/datasophon/api/master/ClusterCheckActor.java deleted file mode 100644 index 25edab97..00000000 --- a/datasophon-api/src/main/java/com/datasophon/api/master/ClusterCheckActor.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.datasophon.api.master; - -import akka.actor.UntypedActor; -import cn.hutool.extra.spring.SpringUtil; -import com.datasophon.api.service.ClusterInfoService; -import com.datasophon.api.service.ClusterServiceRoleInstanceService; -import com.datasophon.common.command.ClusterCheckCommand; -import com.datasophon.common.utils.Result; -import com.datasophon.dao.entity.ClusterInfoEntity; -import com.datasophon.dao.entity.ClusterServiceRoleInstanceEntity; -import com.datasophon.dao.enums.ClusterState; -import com.datasophon.dao.enums.ServiceRoleState; -import java.util.List; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * 节点状态监测 - */ -public class ClusterCheckActor extends UntypedActor { - - private static final Logger logger = LoggerFactory.getLogger(ClusterCheckActor.class); - - @Override - public void onReceive(Object msg) throws Throwable { - if (msg instanceof ClusterCheckCommand) { - logger.info("start to check cluster info"); - ClusterServiceRoleInstanceService roleInstanceService = - SpringUtil.getBean(ClusterServiceRoleInstanceService.class); - ClusterInfoService clusterInfoService = - SpringUtil.getBean(ClusterInfoService.class); - - // Host or cluster - final ClusterCheckCommand hostCheckCommand = (ClusterCheckCommand) msg; - - // 获取所有集群 - Result result = clusterInfoService.getClusterList(); - List clusterList = (List) result.getData(); - - for (ClusterInfoEntity clusterInfoEntity : clusterList) { - // 获取集群上正在运行的服务 - int clusterId = clusterInfoEntity.getId(); - List roleInstanceList = roleInstanceService.getServiceRoleInstanceListByClusterId(clusterId); - if (!ClusterState.NEED_CONFIG.equals(clusterInfoEntity.getClusterState())) { - if (!roleInstanceList.isEmpty()) { - if (roleInstanceList.stream().allMatch(roleInstance -> ServiceRoleState.STOP.equals(roleInstance.getServiceRoleState()))) { - clusterInfoService.updateClusterState(clusterId, ClusterState.STOP.getValue()); - } else { - clusterInfoService.updateClusterState(clusterId, ClusterState.RUNNING.getValue()); - } - } - - } - } - } else { - unhandled(msg); - } - } -} diff --git a/datasophon-api/src/main/java/com/datasophon/api/master/MasterServiceActor.java b/datasophon-api/src/main/java/com/datasophon/api/master/MasterServiceActor.java index 8ca53407..2441fbf9 100644 --- a/datasophon-api/src/main/java/com/datasophon/api/master/MasterServiceActor.java +++ b/datasophon-api/src/main/java/com/datasophon/api/master/MasterServiceActor.java @@ -102,12 +102,12 @@ public void onReceive(Object message) { (Integer) CacheUtils.get("UseRoleGroup_" + serviceInstanceId); ClusterServiceRoleGroupConfig config = roleGroupConfigService.getConfigByRoleGroupId(roleGroupId); - buildConfigFileMap(configFileMap, config); + ProcessUtils.generateConfigFileMap(configFileMap, config); } else if (serviceRoleInstance.getNeedRestart() == NeedRestart.YES) { ClusterServiceRoleGroupConfig config = roleGroupConfigService.getConfigByRoleGroupId( serviceRoleInstance.getRoleGroupId()); - buildConfigFileMap(configFileMap, config); + ProcessUtils.generateConfigFileMap(configFileMap, config); needReConfig = true; } logger.info("enable ranger plugin is {}", enableRangerPlugin); @@ -117,7 +117,7 @@ public void onReceive(Object message) { case INSTALL_SERVICE: try { logger.info( - "start to install {} int host {}", + "start to install {} in host {}", serviceRoleInfo.getName(), serviceRoleInfo.getHostname()); @@ -312,16 +312,4 @@ private boolean isEnableRangerPlugin(Integer clusterId, String serviceName) { return false; } - // generate configFileMap - private void buildConfigFileMap( - HashMap> configFileMap, - ClusterServiceRoleGroupConfig config) { - Map map = - JSONObject.parseObject(config.getConfigFileJson(), Map.class); - for (JSONObject fileJson : map.keySet()) { - Generators generators = fileJson.toJavaObject(Generators.class); - List serviceConfigs = map.get(fileJson).toJavaList(ServiceConfig.class); - configFileMap.put(generators, serviceConfigs); - } - } } diff --git a/datasophon-api/src/main/java/com/datasophon/api/master/WorkerServiceActor.java b/datasophon-api/src/main/java/com/datasophon/api/master/WorkerServiceActor.java index 0beb6515..997ba37d 100644 --- a/datasophon-api/src/main/java/com/datasophon/api/master/WorkerServiceActor.java +++ b/datasophon-api/src/main/java/com/datasophon/api/master/WorkerServiceActor.java @@ -62,7 +62,7 @@ public void onReceive(Object message) throws Throwable { serviceRoleInfo.getName(), serviceRoleInfo.getHostname(), serviceRoleInfo.getClusterId()); - HashMap> configFileMap = new HashMap<>(); + Map> configFileMap = new HashMap<>(); boolean needReConfig = false; if (executeServiceRoleCommand.getCommandType() == CommandType.INSTALL_SERVICE) { Integer roleGroupId = (Integer) CacheUtils.get("UseRoleGroup_" + serviceInstanceId); diff --git a/datasophon-api/src/main/java/com/datasophon/api/service/ClusterHostService.java b/datasophon-api/src/main/java/com/datasophon/api/service/ClusterHostService.java index 8ca2ea31..76a5016d 100644 --- a/datasophon-api/src/main/java/com/datasophon/api/service/ClusterHostService.java +++ b/datasophon-api/src/main/java/com/datasophon/api/service/ClusterHostService.java @@ -24,7 +24,7 @@ import java.util.List; /** - * 集群主机表 + * 集群主机表 * * @author gaodayu * @email gaodayu2022@163.com @@ -42,12 +42,12 @@ Result listByPage(Integer clusterId, String hostname, String ip, String cpuArchi Result getRoleListByHostname(Integer clusterId, String hostname); - /** * 批量删除主机。 * 删除主机,首先停止主机上的服务 * 其次删除主机 worker,同时移除 Prometheus hosts * 然后删除主机运行的实例 + * * @param hostIds * @return */ @@ -55,7 +55,7 @@ Result listByPage(Integer clusterId, String hostname, String ip, String cpuArchi Result getRack(Integer clusterId); - void deleteHostByClusterId(Integer id); + void removeHostByClusterId(Integer id); void updateBatchNodeLabel(List hostIds, String nodeLabel); diff --git a/datasophon-api/src/main/java/com/datasophon/api/service/impl/ClusterHostServiceImpl.java b/datasophon-api/src/main/java/com/datasophon/api/service/impl/ClusterHostServiceImpl.java index eba51d3e..349060a0 100644 --- a/datasophon-api/src/main/java/com/datasophon/api/service/impl/ClusterHostServiceImpl.java +++ b/datasophon-api/src/main/java/com/datasophon/api/service/impl/ClusterHostServiceImpl.java @@ -137,7 +137,7 @@ public Result getRoleListByHostname(Integer clusterId, String hostname) { @Transactional public Result deleteHosts(String hostIds) { // 批量移除 - List ids = Arrays.asList(hostIds.split(",")); + String[] ids = hostIds.split(Constants.COMMA); for (String hostId : ids) { ClusterHostEntity host = this.getById(hostId); // 获取主机上安装的服务 @@ -147,9 +147,9 @@ public Result deleteHosts(String hostIds) { .eq(Constants.HOSTNAME, host.getHostname()) .eq(Constants.SERVICE_ROLE_STATE, ServiceRoleState.RUNNING) .ne(Constants.ROLE_TYPE, RoleType.CLIENT)); - List roles = list.stream().map(e -> e.getServiceRoleName()).collect(Collectors.toList()); - if (Objects.nonNull(list) && list.size() > 0) { - return Result.error(host.getHostname() + Status.HOST_EXIT_ONE_RUNNING_ROLE.getMsg() + roles.toString()); + List roles = list.stream().map(ClusterServiceRoleInstanceEntity::getServiceRoleName).collect(Collectors.toList()); + if (!list.isEmpty()) { + return Result.error(host.getHostname() + Status.HOST_EXIT_ONE_RUNNING_ROLE.getMsg() + roles); } ClusterInfoEntity clusterInfo = clusterInfoService.getById(host.getClusterId()); String clusterCode = clusterInfo.getClusterCode(); @@ -160,7 +160,7 @@ public Result deleteHosts(String hostIds) { this.removeById(hostId); - if(host.getHostState().intValue() != 2){ + if (host.getHostState() != 2) { //stop the worker on this host ActorRef execCmdActor = ActorUtils.getRemoteActor(host.getHostname(), "executeCmdActor"); ExecuteCmdCommand command = new ExecuteCmdCommand(); @@ -213,7 +213,7 @@ public Result getRack(Integer clusterId) { } @Override - public void deleteHostByClusterId(Integer clusterId) { + public void removeHostByClusterId(Integer clusterId) { this.remove(new QueryWrapper().eq(Constants.CLUSTER_ID, clusterId)); } diff --git a/datasophon-api/src/main/java/com/datasophon/api/service/impl/ClusterInfoServiceImpl.java b/datasophon-api/src/main/java/com/datasophon/api/service/impl/ClusterInfoServiceImpl.java index 9db4be46..05441e94 100644 --- a/datasophon-api/src/main/java/com/datasophon/api/service/impl/ClusterInfoServiceImpl.java +++ b/datasophon-api/src/main/java/com/datasophon/api/service/impl/ClusterInfoServiceImpl.java @@ -17,34 +17,27 @@ package com.datasophon.api.service.impl; +import akka.actor.ActorRef; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.datasophon.api.configuration.ConfigBean; import com.datasophon.api.enums.Status; import com.datasophon.api.load.GlobalVariables; -import com.datasophon.api.service.AlertGroupService; -import com.datasophon.api.service.ClusterAlertGroupMapService; -import com.datasophon.api.service.ClusterHostService; -import com.datasophon.api.service.ClusterInfoService; -import com.datasophon.api.service.ClusterNodeLabelService; -import com.datasophon.api.service.ClusterQueueCapacityService; -import com.datasophon.api.service.ClusterRackService; -import com.datasophon.api.service.ClusterRoleUserService; -import com.datasophon.api.service.ClusterYarnSchedulerService; -import com.datasophon.api.service.FrameServiceService; +import com.datasophon.api.master.ActorUtils; +import com.datasophon.api.master.ClusterActor; +import com.datasophon.api.service.*; import com.datasophon.api.utils.PackageUtils; import com.datasophon.api.utils.ProcessUtils; import com.datasophon.api.utils.SecurityUtils; import com.datasophon.common.Constants; import com.datasophon.common.cache.CacheUtils; +import com.datasophon.common.command.ClusterCommand; +import com.datasophon.common.enums.ClusterCommandType; import com.datasophon.common.utils.Result; -import com.datasophon.dao.entity.AlertGroupEntity; -import com.datasophon.dao.entity.ClusterAlertGroupMap; -import com.datasophon.dao.entity.ClusterInfoEntity; -import com.datasophon.dao.entity.FrameServiceEntity; -import com.datasophon.dao.entity.UserInfoEntity; +import com.datasophon.dao.entity.*; import com.datasophon.dao.enums.ClusterState; import com.datasophon.dao.mapper.ClusterInfoMapper; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -54,12 +47,14 @@ import java.util.List; import java.util.Objects; +@Slf4j @Service("clusterInfoService") @Transactional public class ClusterInfoServiceImpl extends ServiceImpl implements ClusterInfoService { + @Autowired private ClusterInfoMapper clusterInfoMapper; @@ -95,7 +90,7 @@ public class ClusterInfoServiceImpl extends ServiceImpl ids) { Integer id = ids.get(0); ClusterInfoEntity clusterInfo = this.getById(id); - this.removeByIds(ids); + + ActorUtils.getLocalActor( + ClusterActor.class, "clusterActor") + .tell(new ClusterCommand(ClusterCommandType.UNINSTALL, id), ActorRef.noSender()); + + this.updateClusterState(id, ClusterState.UNINSTALLING.getValue()); + // delete host - clusterHostService.deleteHostByClusterId(id); +// clusterHostService.removeHostByClusterId(id); } + } diff --git a/datasophon-api/src/main/java/com/datasophon/api/service/impl/ClusterServiceInstanceServiceImpl.java b/datasophon-api/src/main/java/com/datasophon/api/service/impl/ClusterServiceInstanceServiceImpl.java index d44ca77c..f4a45325 100644 --- a/datasophon-api/src/main/java/com/datasophon/api/service/impl/ClusterServiceInstanceServiceImpl.java +++ b/datasophon-api/src/main/java/com/datasophon/api/service/impl/ClusterServiceInstanceServiceImpl.java @@ -110,7 +110,6 @@ public Result listAll(Integer clusterId) { Map globalVariables = GlobalVariables.get(clusterId); List list = this.list(new QueryWrapper() .eq(Constants.CLUSTER_ID, clusterId).orderByAsc(Constants.SORT_NUM)); - ClusterInfoEntity clusterInfo = clusterInfoService.getById(clusterId); for (ClusterServiceInstanceEntity serviceInstance : list) { serviceInstance.setServiceStateCode(serviceInstance.getServiceState().getValue()); boolean needUpdate = false; diff --git a/datasophon-api/src/main/java/com/datasophon/api/utils/ProcessUtils.java b/datasophon-api/src/main/java/com/datasophon/api/utils/ProcessUtils.java index 1e0a6ac8..bbf8db8a 100644 --- a/datasophon-api/src/main/java/com/datasophon/api/utils/ProcessUtils.java +++ b/datasophon-api/src/main/java/com/datasophon/api/utils/ProcessUtils.java @@ -17,6 +17,19 @@ package com.datasophon.api.utils; +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import akka.actor.Props; +import akka.dispatch.OnComplete; +import akka.pattern.Patterns; +import akka.util.Timeout; +import cn.hutool.core.bean.BeanUtil; +import cn.hutool.core.util.IdUtil; +import cn.hutool.crypto.SecureUtil; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.datasophon.api.load.GlobalVariables; import com.datasophon.api.load.ServiceConfigMap; import com.datasophon.api.master.ActorUtils; @@ -40,9 +53,9 @@ import com.datasophon.common.utils.PropertyUtils; import com.datasophon.dao.entity.*; import com.datasophon.dao.enums.*; - import org.apache.commons.lang.StringUtils; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; @@ -54,24 +67,6 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONArray; -import com.alibaba.fastjson.JSONObject; -import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; - -import akka.actor.ActorRef; -import akka.actor.ActorSelection; -import akka.actor.Props; -import akka.pattern.Patterns; -import akka.util.Timeout; - -import cn.hutool.core.bean.BeanUtil; -import cn.hutool.core.util.IdUtil; -import cn.hutool.crypto.SecureUtil; - public class ProcessUtils { private static final Logger logger = LoggerFactory.getLogger(ProcessUtils.class); @@ -282,7 +277,7 @@ public static ClusterServiceCommandHostCommandEntity handleCommandResult(String ActorRef commandActor = ActorUtils.getLocalActor(ServiceCommandActor.class, "commandActor"); ActorUtils.actorSystem.scheduler().scheduleOnce(FiniteDuration.apply( - 1L, TimeUnit.SECONDS), + 1L, TimeUnit.SECONDS), commandActor, message, ActorUtils.actorSystem.dispatcher(), ActorRef.noSender()); @@ -291,19 +286,19 @@ public static ClusterServiceCommandHostCommandEntity handleCommandResult(String } public static void buildExecuteServiceRoleCommand( - Integer clusterId, - CommandType commandType, - String clusterCode, - DAGGraph dag, - Map activeTaskList, - Map errorTaskList, - Map readyToSubmitTaskList, - Map completeTaskList, - String node, - List masterRoles, - ServiceRoleInfo workerRole, - ActorRef serviceActor, - ServiceRoleType serviceRoleType) { + Integer clusterId, + CommandType commandType, + String clusterCode, + DAGGraph dag, + Map activeTaskList, + Map errorTaskList, + Map readyToSubmitTaskList, + Map completeTaskList, + String node, + List masterRoles, + ServiceRoleInfo workerRole, + ActorRef serviceActor, + ServiceRoleType serviceRoleType) { ExecuteServiceRoleCommand executeServiceRoleCommand = new ExecuteServiceRoleCommand(clusterId, node, masterRoles); executeServiceRoleCommand.setServiceRoleType(serviceRoleType); @@ -452,7 +447,7 @@ public static void createServiceActor(ClusterInfoEntity clusterInfo) { logger.info("create {} actor", clusterInfo.getClusterCode() + "-serviceActor-" + frameServiceEntity.getServiceName()); ActorUtils.actorSystem.actorOf(Props.create(MasterServiceActor.class) - .withDispatcher("my-forkjoin-dispatcher"), + .withDispatcher("my-forkjoin-dispatcher"), clusterInfo.getClusterCode() + "-serviceActor-" + frameServiceEntity.getServiceName()); } } @@ -507,12 +502,39 @@ public static ExecResult startInstallService(ServiceRoleInfo serviceRoleInfo) th return execResult; } + public static ExecResult configServiceRoleInstance(ClusterInfoEntity clusterInfo, + Map> configFileMap, + ClusterServiceRoleInstanceEntity roleInstanceEntity) throws Exception { + ServiceRoleInfo serviceRoleInfo = new ServiceRoleInfo(); + serviceRoleInfo.setName(roleInstanceEntity.getServiceRoleName()); + serviceRoleInfo.setParentName(roleInstanceEntity.getServiceName()); + serviceRoleInfo.setConfigFileMap(configFileMap); + serviceRoleInfo.setDecompressPackageName(PackageUtils.getServiceDcPackageName(clusterInfo.getClusterFrame(), "YARN")); + serviceRoleInfo.setHostname(roleInstanceEntity.getHostname()); + ServiceConfigureHandler configureHandler = new ServiceConfigureHandler(); + return configureHandler.handlerRequest(serviceRoleInfo); + } + + public static void asyncConfigServiceRoleInstance(ClusterInfoEntity clusterInfo, + Map> configFileMap, + ClusterServiceRoleInstanceEntity roleInstanceEntity, + OnComplete onComplete) { + ServiceRoleInfo serviceRoleInfo = new ServiceRoleInfo(); + serviceRoleInfo.setName(roleInstanceEntity.getServiceRoleName()); + serviceRoleInfo.setParentName(roleInstanceEntity.getServiceName()); + serviceRoleInfo.setConfigFileMap(configFileMap); + serviceRoleInfo.setDecompressPackageName(PackageUtils.getServiceDcPackageName(clusterInfo.getClusterFrame(), "YARN")); + serviceRoleInfo.setHostname(roleInstanceEntity.getHostname()); + ServiceConfigureAsyncHandler configureAsyncHandler = new ServiceConfigureAsyncHandler(onComplete); + configureAsyncHandler.handlerRequest(serviceRoleInfo); + } + /** * @param configFileMap * @param config * @Description: 生成configFileMap */ - public static void generateConfigFileMap(HashMap> configFileMap, + public static void generateConfigFileMap(Map> configFileMap, ClusterServiceRoleGroupConfig config) { Map map = JSONObject.parseObject(config.getConfigFileJson(), Map.class); for (JSONObject fileJson : map.keySet()) { @@ -522,6 +544,10 @@ public static void generateConfigFileMap(HashMap } } + public static List getServiceConfig(ClusterServiceRoleGroupConfig config) { + return JSONObject.parseArray(config.getConfigJson(), ServiceConfig.class); + } + public static ServiceConfig createServiceConfig(String configName, Object configValue, String type) { ServiceConfig serviceConfig = new ServiceConfig(); serviceConfig.setName(configName); @@ -535,8 +561,7 @@ public static ServiceConfig createServiceConfig(String configName, Object config public static ClusterInfoEntity getClusterInfo(Integer clusterId) { ClusterInfoService clusterInfoService = SpringTool.getApplicationContext().getBean(ClusterInfoService.class); - ClusterInfoEntity clusterInfo = clusterInfoService.getById(clusterId); - return clusterInfo; + return clusterInfoService.getById(clusterId); } /** @@ -678,4 +703,5 @@ public static void saveAlert(ClusterServiceRoleInstanceEntity roleInstanceEntity roleInstanceService.updateById(roleInstanceEntity); } + } diff --git a/datasophon-common/src/main/java/com/datasophon/common/Constants.java b/datasophon-common/src/main/java/com/datasophon/common/Constants.java index c2fe8090..6de4c347 100644 --- a/datasophon-common/src/main/java/com/datasophon/common/Constants.java +++ b/datasophon-common/src/main/java/com/datasophon/common/Constants.java @@ -88,6 +88,8 @@ public final class Constants { public static final String MULTIPLE = "multiple"; public static final String CLUSTER_STATE = "cluster_state"; public static final String PATH = "path"; + + public static final String MV_PATH = "mv_path"; public static final String SERVICE_INSTANCE_ID = "service_instance_id"; public static final String IS_ENABLED = "is_enabled"; public static final String SORT_NUM = "sort_num"; diff --git a/datasophon-common/src/main/java/com/datasophon/common/command/ClusterCheckCommand.java b/datasophon-common/src/main/java/com/datasophon/common/command/ClusterCommand.java similarity index 66% rename from datasophon-common/src/main/java/com/datasophon/common/command/ClusterCheckCommand.java rename to datasophon-common/src/main/java/com/datasophon/common/command/ClusterCommand.java index ba19f6c0..b8b47627 100644 --- a/datasophon-common/src/main/java/com/datasophon/common/command/ClusterCheckCommand.java +++ b/datasophon-common/src/main/java/com/datasophon/common/command/ClusterCommand.java @@ -17,12 +17,26 @@ package com.datasophon.common.command; +import com.datasophon.common.enums.ClusterCommandType; + import java.io.Serializable; + import lombok.Data; @Data -public class ClusterCheckCommand implements Serializable { +public class ClusterCommand implements Serializable { + + private final ClusterCommandType commandType; + + private Integer clusterId; + + public ClusterCommand(ClusterCommandType commandType) { + this.commandType = commandType; + } + + public ClusterCommand(ClusterCommandType commandType, Integer clusterId) { + this.commandType = commandType; + this.clusterId = clusterId; + } - public ClusterCheckCommand() { - } } diff --git a/datasophon-common/src/main/java/com/datasophon/common/enums/ClusterCommandType.java b/datasophon-common/src/main/java/com/datasophon/common/enums/ClusterCommandType.java new file mode 100644 index 00000000..aaf1cd92 --- /dev/null +++ b/datasophon-common/src/main/java/com/datasophon/common/enums/ClusterCommandType.java @@ -0,0 +1,8 @@ +package com.datasophon.common.enums; + +public enum ClusterCommandType { + + CHECK, + + UNINSTALL; +} diff --git a/datasophon-dao/src/main/java/com/datasophon/dao/entity/ClusterInfoEntity.java b/datasophon-dao/src/main/java/com/datasophon/dao/entity/ClusterInfoEntity.java index abea9e08..69794d40 100644 --- a/datasophon-dao/src/main/java/com/datasophon/dao/entity/ClusterInfoEntity.java +++ b/datasophon-dao/src/main/java/com/datasophon/dao/entity/ClusterInfoEntity.java @@ -65,7 +65,7 @@ public class ClusterInfoEntity implements Serializable { */ private String frameVersion; /** - * 集群状态 1:待配置 2:正在运行 3: 已停止 + * 集群状态 1:待配置 2:正在运行 3: 停止 4: 删除中 5: 已删除 */ private ClusterState clusterState; /** diff --git a/datasophon-dao/src/main/java/com/datasophon/dao/enums/ClusterState.java b/datasophon-dao/src/main/java/com/datasophon/dao/enums/ClusterState.java index 77d34d63..58032823 100644 --- a/datasophon-dao/src/main/java/com/datasophon/dao/enums/ClusterState.java +++ b/datasophon-dao/src/main/java/com/datasophon/dao/enums/ClusterState.java @@ -19,11 +19,16 @@ import com.baomidou.mybatisplus.annotation.EnumValue; import com.fasterxml.jackson.annotation.JsonValue; + import java.util.Arrays; import java.util.Optional; public enum ClusterState { + UNINSTALLED(5, "已删除"), + + UNINSTALLING(4, "删除中"), + STOP(3, "停止"), RUNNING(2, "正在运行"), diff --git a/datasophon-worker/src/main/java/com/datasophon/worker/handler/ConfigureServiceHandler.java b/datasophon-worker/src/main/java/com/datasophon/worker/handler/ConfigureServiceHandler.java index 064da032..bbaadf61 100644 --- a/datasophon-worker/src/main/java/com/datasophon/worker/handler/ConfigureServiceHandler.java +++ b/datasophon-worker/src/main/java/com/datasophon/worker/handler/ConfigureServiceHandler.java @@ -104,6 +104,9 @@ public ExecResult configure(Map> cofigFileMap, if (Constants.PATH.equals(config.getConfigType())) { createPath(config, runAs); } + if (Constants.MV_PATH.equals(config.getConfigType())) { + movePath(config, runAs); + } if (Constants.CUSTOM.equals(config.getConfigType())) { addToCustomList(iterator, customConfList, config); } @@ -208,6 +211,22 @@ private void createPath(ServiceConfig config, RunAs runAs) { } } + private void movePath(ServiceConfig config, RunAs runAs) { + String oldPath = (String) config.getDefaultValue(); + String newPath = (String) config.getValue(); + if(FileUtil.exist(oldPath) && !FileUtil.exist(newPath)) { + if (newPath.contains(Constants.COMMA)) { + for (String dir : newPath.split(Constants.COMMA)) { + mkdir(dir, runAs); + } + } else { + mkdir(newPath, runAs); + } + FileUtil.move(new File(oldPath), new File(newPath), false); + logger.info("move path {} to {}", oldPath, newPath); + } + } + private void addToCustomList(Iterator iterator, ArrayList customConfList, ServiceConfig config) { List list = (List) config.getValue(); From 83f3cf8ea991ede418c417815d3e8fe434b7b1db Mon Sep 17 00:00:00 2001 From: ruanwe <88fantasy@gmail.com> Date: Wed, 9 Aug 2023 09:35:57 +0800 Subject: [PATCH 3/7] [fix][api] delete cluster MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 新增集群状态[删除中][已删除],当删除集群时会切换为[删除中],并发送命令将数据文件夹重命名,最后移除集群主机并置[已删除] --- .../com/datasophon/api/master/ActorUtils.java | 2 +- .../datasophon/api/master/HostCheckActor.java | 5 +- .../api/master/MasterServiceActor.java | 22 ++---- .../service/ServiceConfigureAsyncHandler.java | 76 +++++++++++++++++++ .../service/ServiceConfigureHandler.java | 2 +- .../service/impl/ClusterHostServiceImpl.java | 2 +- .../ClusterServiceInstanceServiceImpl.java | 19 +---- ...ClusterServiceRoleInstanceServiceImpl.java | 4 +- .../datasophon/api/utils/ProcessUtils.java | 6 +- .../common/enums/ClusterCommandType.java | 2 +- .../datasophon/dao/enums/ClusterState.java | 1 - .../handler/ConfigureServiceHandler.java | 2 +- 12 files changed, 94 insertions(+), 49 deletions(-) create mode 100644 datasophon-api/src/main/java/com/datasophon/api/master/handler/service/ServiceConfigureAsyncHandler.java diff --git a/datasophon-api/src/main/java/com/datasophon/api/master/ActorUtils.java b/datasophon-api/src/main/java/com/datasophon/api/master/ActorUtils.java index 5fedb26e..0277f24b 100644 --- a/datasophon-api/src/main/java/com/datasophon/api/master/ActorUtils.java +++ b/datasophon-api/src/main/java/com/datasophon/api/master/ActorUtils.java @@ -147,7 +147,7 @@ public static ActorRef getRemoteActor(String hostname, String actorName) { try { actorRef = Await.result(future, Duration.create(30, TimeUnit.SECONDS)); } catch (Exception e) { - e.printStackTrace(); + logger.error(e.getMessage(), e); } return actorRef; diff --git a/datasophon-api/src/main/java/com/datasophon/api/master/HostCheckActor.java b/datasophon-api/src/main/java/com/datasophon/api/master/HostCheckActor.java index 05246286..9ab902a1 100644 --- a/datasophon-api/src/main/java/com/datasophon/api/master/HostCheckActor.java +++ b/datasophon-api/src/main/java/com/datasophon/api/master/HostCheckActor.java @@ -25,7 +25,6 @@ import com.datasophon.api.service.ClusterHostService; import com.datasophon.api.service.ClusterInfoService; import com.datasophon.api.service.ClusterServiceRoleInstanceService; -import com.datasophon.api.utils.SpringTool; import com.datasophon.common.command.HostCheckCommand; import com.datasophon.common.command.PingCommand; import com.datasophon.common.model.HostInfo; @@ -136,7 +135,7 @@ public void onReceive(Object msg) throws Throwable { logger.warn("check cluster state error, cause: {}", e.getMessage()); } } - if (list.size() > 0) { + if (!list.isEmpty()) { clusterHostService.updateBatchById(list); } } else { @@ -174,7 +173,7 @@ public void onReceive(Object msg) throws Throwable { } checkedHosts.add(checkedHost); } - if (checkedHosts.size() > 0) { + if (!checkedHosts.isEmpty()) { clusterHostService.updateBatchById(checkedHosts); } } diff --git a/datasophon-api/src/main/java/com/datasophon/api/master/MasterServiceActor.java b/datasophon-api/src/main/java/com/datasophon/api/master/MasterServiceActor.java index 2441fbf9..4e18f4b2 100644 --- a/datasophon-api/src/main/java/com/datasophon/api/master/MasterServiceActor.java +++ b/datasophon-api/src/main/java/com/datasophon/api/master/MasterServiceActor.java @@ -19,6 +19,7 @@ package com.datasophon.api.master; +import akka.actor.UntypedActor; import com.datasophon.api.load.GlobalVariables; import com.datasophon.api.master.handler.service.ServiceHandler; import com.datasophon.api.master.handler.service.ServiceStopHandler; @@ -39,20 +40,10 @@ import com.datasophon.dao.entity.ClusterServiceRoleInstanceEntity; import com.datasophon.dao.enums.NeedRestart; import com.datasophon.dao.enums.ServiceRoleState; - -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.alibaba.fastjson.JSONArray; -import com.alibaba.fastjson.JSONObject; - -import akka.actor.UntypedActor; +import java.util.*; public class MasterServiceActor extends UntypedActor { @@ -79,7 +70,7 @@ public void onReceive(Object message) { List serviceRoleInfoList = executeServiceRoleCommand.getMasterRoles(); Collections.sort(serviceRoleInfoList); - Integer successNum = 0; + int successNum = 0; for (ServiceRoleInfo serviceRoleInfo : serviceRoleInfoList) { logger.info( "{} service role size is {}", @@ -305,11 +296,8 @@ public void onReceive(Object message) { private boolean isEnableRangerPlugin(Integer clusterId, String serviceName) { Map globalVariables = GlobalVariables.get(clusterId); - if (globalVariables.containsKey("${enable" + serviceName + "Plugin}") - && "true".equals(globalVariables.get("${enable" + serviceName + "Plugin}"))) { - return true; - } - return false; + return globalVariables.containsKey("${enable" + serviceName + "Plugin}") + && "true".equals(globalVariables.get("${enable" + serviceName + "Plugin}")); } } diff --git a/datasophon-api/src/main/java/com/datasophon/api/master/handler/service/ServiceConfigureAsyncHandler.java b/datasophon-api/src/main/java/com/datasophon/api/master/handler/service/ServiceConfigureAsyncHandler.java new file mode 100644 index 00000000..25e7e297 --- /dev/null +++ b/datasophon-api/src/main/java/com/datasophon/api/master/handler/service/ServiceConfigureAsyncHandler.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datasophon.api.master.handler.service; + +import akka.actor.ActorSelection; +import akka.dispatch.OnComplete; +import akka.pattern.Patterns; +import akka.util.Timeout; +import com.datasophon.api.master.ActorUtils; +import com.datasophon.common.cache.CacheUtils; +import com.datasophon.common.command.GenerateServiceConfigCommand; +import com.datasophon.common.model.ServiceRoleInfo; +import com.datasophon.common.utils.ExecResult; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; + +public class ServiceConfigureAsyncHandler extends ServiceHandler { + + private final OnComplete function; + + public ServiceConfigureAsyncHandler(OnComplete function) { + this.function = function; + } + + @Override + public ExecResult handlerRequest(ServiceRoleInfo serviceRoleInfo) { + ExecResult execResult = new ExecResult(); + execResult.setExecResult(true); + // config + GenerateServiceConfigCommand generateServiceConfigCommand = new GenerateServiceConfigCommand(); + generateServiceConfigCommand.setServiceName(serviceRoleInfo.getParentName()); + generateServiceConfigCommand.setCofigFileMap(serviceRoleInfo.getConfigFileMap()); + generateServiceConfigCommand.setDecompressPackageName(serviceRoleInfo.getDecompressPackageName()); + generateServiceConfigCommand.setRunAs(serviceRoleInfo.getRunAs()); + if ("zkserver".equalsIgnoreCase(serviceRoleInfo.getName())) { + generateServiceConfigCommand.setMyid((Integer) CacheUtils.get("zkserver_" + serviceRoleInfo.getHostname())); + } + generateServiceConfigCommand.setServiceRoleName(serviceRoleInfo.getName()); + ActorSelection configActor = ActorUtils.actorSystem.actorSelection( + "akka.tcp://datasophon@" + serviceRoleInfo.getHostname() + ":2552/user/worker/configureServiceActor"); + + Timeout timeout = new Timeout(Duration.create(180, TimeUnit.SECONDS)); + final Future configureFuture = Patterns.ask(configActor, generateServiceConfigCommand, timeout); + configureFuture.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, Object success) throws Throwable { + if (failure != null) { + function.onComplete(failure, success); + } else { + ExecResult configResult = (ExecResult) success; + if (Objects.nonNull(configResult) && configResult.getExecResult() && Objects.nonNull(getNext())) { + getNext().handlerRequest(serviceRoleInfo); + } + } + } + }, ActorUtils.actorSystem.dispatcher()); + return execResult; + } +} diff --git a/datasophon-api/src/main/java/com/datasophon/api/master/handler/service/ServiceConfigureHandler.java b/datasophon-api/src/main/java/com/datasophon/api/master/handler/service/ServiceConfigureHandler.java index 8e476b42..820750e3 100644 --- a/datasophon-api/src/main/java/com/datasophon/api/master/handler/service/ServiceConfigureHandler.java +++ b/datasophon-api/src/main/java/com/datasophon/api/master/handler/service/ServiceConfigureHandler.java @@ -44,7 +44,7 @@ public ExecResult handlerRequest(ServiceRoleInfo serviceRoleInfo) throws Excepti generateServiceConfigCommand.setCofigFileMap(serviceRoleInfo.getConfigFileMap()); generateServiceConfigCommand.setDecompressPackageName(serviceRoleInfo.getDecompressPackageName()); generateServiceConfigCommand.setRunAs(serviceRoleInfo.getRunAs()); - if ("zkserver".equals(serviceRoleInfo.getName().toLowerCase())) { + if ("zkserver".equalsIgnoreCase(serviceRoleInfo.getName())) { generateServiceConfigCommand.setMyid((Integer) CacheUtils.get("zkserver_" + serviceRoleInfo.getHostname())); } generateServiceConfigCommand.setServiceRoleName(serviceRoleInfo.getName()); diff --git a/datasophon-api/src/main/java/com/datasophon/api/service/impl/ClusterHostServiceImpl.java b/datasophon-api/src/main/java/com/datasophon/api/service/impl/ClusterHostServiceImpl.java index 349060a0..534671f9 100644 --- a/datasophon-api/src/main/java/com/datasophon/api/service/impl/ClusterHostServiceImpl.java +++ b/datasophon-api/src/main/java/com/datasophon/api/service/impl/ClusterHostServiceImpl.java @@ -81,7 +81,7 @@ public ClusterHostEntity getClusterHostByHostname(String hostname) { @Override public Result listByPage(Integer clusterId, String hostname, String ip, String cpuArchitecture, Integer hostState, String orderField, String orderType, Integer page, Integer pageSize) { - Integer offset = (page - 1) * pageSize; + int offset = (page - 1) * pageSize; List list = this.list(new QueryWrapper().eq(Constants.CLUSTER_ID, clusterId) .eq(Constants.MANAGED, 1) diff --git a/datasophon-api/src/main/java/com/datasophon/api/service/impl/ClusterServiceInstanceServiceImpl.java b/datasophon-api/src/main/java/com/datasophon/api/service/impl/ClusterServiceInstanceServiceImpl.java index f4a45325..b34ec72f 100644 --- a/datasophon-api/src/main/java/com/datasophon/api/service/impl/ClusterServiceInstanceServiceImpl.java +++ b/datasophon-api/src/main/java/com/datasophon/api/service/impl/ClusterServiceInstanceServiceImpl.java @@ -22,27 +22,12 @@ import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.datasophon.api.enums.Status; import com.datasophon.api.load.GlobalVariables; -import com.datasophon.api.service.ClusterAlertHistoryService; -import com.datasophon.api.service.ClusterInfoService; -import com.datasophon.api.service.ClusterServiceDashboardService; -import com.datasophon.api.service.ClusterServiceInstanceRoleGroupService; -import com.datasophon.api.service.ClusterServiceInstanceService; -import com.datasophon.api.service.ClusterServiceRoleGroupConfigService; -import com.datasophon.api.service.ClusterServiceRoleInstanceService; -import com.datasophon.api.service.ClusterServiceRoleInstanceWebuisService; -import com.datasophon.api.service.FrameServiceRoleService; +import com.datasophon.api.service.*; import com.datasophon.common.Constants; import com.datasophon.common.model.SimpleServiceConfig; import com.datasophon.common.utils.PlaceholderUtils; import com.datasophon.common.utils.Result; -import com.datasophon.dao.entity.ClusterAlertHistory; -import com.datasophon.dao.entity.ClusterInfoEntity; -import com.datasophon.dao.entity.ClusterServiceDashboard; -import com.datasophon.dao.entity.ClusterServiceInstanceEntity; -import com.datasophon.dao.entity.ClusterServiceInstanceRoleGroup; -import com.datasophon.dao.entity.ClusterServiceRoleGroupConfig; -import com.datasophon.dao.entity.ClusterServiceRoleInstanceEntity; -import com.datasophon.dao.entity.FrameServiceRoleEntity; +import com.datasophon.dao.entity.*; import com.datasophon.dao.enums.NeedRestart; import com.datasophon.dao.enums.ServiceRoleState; import com.datasophon.dao.enums.ServiceState; diff --git a/datasophon-api/src/main/java/com/datasophon/api/service/impl/ClusterServiceRoleInstanceServiceImpl.java b/datasophon-api/src/main/java/com/datasophon/api/service/impl/ClusterServiceRoleInstanceServiceImpl.java index bd397796..cc81382f 100644 --- a/datasophon-api/src/main/java/com/datasophon/api/service/impl/ClusterServiceRoleInstanceServiceImpl.java +++ b/datasophon-api/src/main/java/com/datasophon/api/service/impl/ClusterServiceRoleInstanceServiceImpl.java @@ -138,7 +138,7 @@ public ClusterServiceRoleInstanceEntity getOneServiceRole(String name, String ho .eq(Constants.SERVICE_ROLE_NAME, name) .eq(StringUtils.isNotBlank(hostname), Constants.HOSTNAME, hostname) .eq(Constants.CLUSTER_ID, id)); - if (Objects.nonNull(list) && list.size() > 0) { + if (Objects.nonNull(list) && !list.isEmpty()) { return list.get(0); } return null; @@ -274,7 +274,7 @@ public Result restartObsoleteService(Integer roleGroupId) { @Override public Result decommissionNode(String serviceRoleInstanceIds, String serviceName) throws Exception { - TreeSet hosts = new TreeSet(); + TreeSet hosts = new TreeSet<>(); Integer serviceInstanceId = null; String serviceRoleName = ""; for (String str : serviceRoleInstanceIds.split(",")) { diff --git a/datasophon-api/src/main/java/com/datasophon/api/utils/ProcessUtils.java b/datasophon-api/src/main/java/com/datasophon/api/utils/ProcessUtils.java index bbf8db8a..84c7f104 100644 --- a/datasophon-api/src/main/java/com/datasophon/api/utils/ProcessUtils.java +++ b/datasophon-api/src/main/java/com/datasophon/api/utils/ProcessUtils.java @@ -135,7 +135,7 @@ public static void saveServiceInstallInfo(ServiceRoleInfo serviceRoleInfo) { roleInstance.setRoleGroupId(roleGroup.getId()); roleInstance.setNeedRestart(NeedRestart.NO); serviceRoleInstanceService.save(roleInstance); - if (Constants.ZKSERVER.equals(roleInstance.getServiceRoleName().toLowerCase())) { + if (Constants.ZKSERVER.equalsIgnoreCase(roleInstance.getServiceRoleName())) { ClusterZkService clusterZkService = SpringTool.getApplicationContext().getBean(ClusterZkService.class); ClusterZk clusterZk = new ClusterZk(); clusterZk.setMyid((Integer) CacheUtils.get("zkserver_" + serviceRoleInfo.getHostname())); @@ -586,9 +586,7 @@ public static List addAll(List left, List iter = res.iterator(); - while (iter.hasNext()) { - ServiceConfig item = iter.next(); + for (ServiceConfig item : res) { // 如果set中包含id则remove if (!set.contains(item.getName())) { left.add(item); diff --git a/datasophon-common/src/main/java/com/datasophon/common/enums/ClusterCommandType.java b/datasophon-common/src/main/java/com/datasophon/common/enums/ClusterCommandType.java index aaf1cd92..8d461615 100644 --- a/datasophon-common/src/main/java/com/datasophon/common/enums/ClusterCommandType.java +++ b/datasophon-common/src/main/java/com/datasophon/common/enums/ClusterCommandType.java @@ -4,5 +4,5 @@ public enum ClusterCommandType { CHECK, - UNINSTALL; + UNINSTALL } diff --git a/datasophon-dao/src/main/java/com/datasophon/dao/enums/ClusterState.java b/datasophon-dao/src/main/java/com/datasophon/dao/enums/ClusterState.java index 58032823..1130bb48 100644 --- a/datasophon-dao/src/main/java/com/datasophon/dao/enums/ClusterState.java +++ b/datasophon-dao/src/main/java/com/datasophon/dao/enums/ClusterState.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.annotation.JsonValue; import java.util.Arrays; -import java.util.Optional; public enum ClusterState { diff --git a/datasophon-worker/src/main/java/com/datasophon/worker/handler/ConfigureServiceHandler.java b/datasophon-worker/src/main/java/com/datasophon/worker/handler/ConfigureServiceHandler.java index bbaadf61..e06c7809 100644 --- a/datasophon-worker/src/main/java/com/datasophon/worker/handler/ConfigureServiceHandler.java +++ b/datasophon-worker/src/main/java/com/datasophon/worker/handler/ConfigureServiceHandler.java @@ -148,7 +148,7 @@ public ExecResult configure(Map> cofigFileMap, customConfList.add(serviceConfig); } configs.addAll(customConfList); - if (configs.size() > 0) { + if (!configs.isEmpty()) { // extra app, package: META, templates File extTemplateDir = new File(Constants.INSTALL_PATH + File.separator + decompressPackageName, "templates"); From 8d2856d9007a02558c7c48212cb27b077e74bc9f Mon Sep 17 00:00:00 2001 From: ruanwe <88fantasy@gmail.com> Date: Wed, 9 Aug 2023 10:32:06 +0800 Subject: [PATCH 4/7] [fix][api] delete cluster merge dev --- .../src/main/java/com/datasophon/api/master/ClusterActor.java | 0 .../api/master/handler/service/ServiceConfigureAsyncHandler.java | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename {datasophon-api => datasophon-service}/src/main/java/com/datasophon/api/master/ClusterActor.java (100%) rename {datasophon-api => datasophon-service}/src/main/java/com/datasophon/api/master/handler/service/ServiceConfigureAsyncHandler.java (100%) diff --git a/datasophon-api/src/main/java/com/datasophon/api/master/ClusterActor.java b/datasophon-service/src/main/java/com/datasophon/api/master/ClusterActor.java similarity index 100% rename from datasophon-api/src/main/java/com/datasophon/api/master/ClusterActor.java rename to datasophon-service/src/main/java/com/datasophon/api/master/ClusterActor.java diff --git a/datasophon-api/src/main/java/com/datasophon/api/master/handler/service/ServiceConfigureAsyncHandler.java b/datasophon-service/src/main/java/com/datasophon/api/master/handler/service/ServiceConfigureAsyncHandler.java similarity index 100% rename from datasophon-api/src/main/java/com/datasophon/api/master/handler/service/ServiceConfigureAsyncHandler.java rename to datasophon-service/src/main/java/com/datasophon/api/master/handler/service/ServiceConfigureAsyncHandler.java From ad49de42cd4bd2432a0591f37f47da0115fa715e Mon Sep 17 00:00:00 2001 From: ruanwe <88fantasy@gmail.com> Date: Thu, 10 Aug 2023 08:28:56 +0800 Subject: [PATCH 5/7] [fix][api] delete cluster rename --- .../java/com/datasophon/common/enums/ClusterCommandType.java | 2 +- .../src/main/java/com/datasophon/dao/enums/ClusterState.java | 4 ++-- .../src/main/java/com/datasophon/api/master/ClusterActor.java | 4 ++-- .../datasophon/api/service/impl/ClusterInfoServiceImpl.java | 4 ++-- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/datasophon-common/src/main/java/com/datasophon/common/enums/ClusterCommandType.java b/datasophon-common/src/main/java/com/datasophon/common/enums/ClusterCommandType.java index 8d461615..d7bd696e 100644 --- a/datasophon-common/src/main/java/com/datasophon/common/enums/ClusterCommandType.java +++ b/datasophon-common/src/main/java/com/datasophon/common/enums/ClusterCommandType.java @@ -4,5 +4,5 @@ public enum ClusterCommandType { CHECK, - UNINSTALL + DELETE } diff --git a/datasophon-dao/src/main/java/com/datasophon/dao/enums/ClusterState.java b/datasophon-dao/src/main/java/com/datasophon/dao/enums/ClusterState.java index 1130bb48..5480577a 100644 --- a/datasophon-dao/src/main/java/com/datasophon/dao/enums/ClusterState.java +++ b/datasophon-dao/src/main/java/com/datasophon/dao/enums/ClusterState.java @@ -24,9 +24,9 @@ public enum ClusterState { - UNINSTALLED(5, "已删除"), + DELETED(5, "已删除"), - UNINSTALLING(4, "删除中"), + DELETING(4, "删除中"), STOP(3, "停止"), diff --git a/datasophon-service/src/main/java/com/datasophon/api/master/ClusterActor.java b/datasophon-service/src/main/java/com/datasophon/api/master/ClusterActor.java index 47609c3a..4c1efd55 100644 --- a/datasophon-service/src/main/java/com/datasophon/api/master/ClusterActor.java +++ b/datasophon-service/src/main/java/com/datasophon/api/master/ClusterActor.java @@ -94,7 +94,7 @@ public void onReceive(Object msg) throws Throwable { } } - } else if (ClusterCommandType.UNINSTALL.equals(clusterCommand.getCommandType())) { + } else if (ClusterCommandType.DELETE.equals(clusterCommand.getCommandType())) { Integer clusterId = clusterCommand.getClusterId(); if (Objects.nonNull(clusterId)) { ClusterInfoEntity clusterInfo = clusterInfoService.getById(clusterId); @@ -180,7 +180,7 @@ public void onReceive(Object msg) throws Throwable { } List hostList = clusterHostService.getHostListByClusterId(clusterId); clusterHostService.deleteHosts(hostList.stream().map(h -> String.valueOf(h.getId())).collect(Collectors.joining(Constants.COMMA))); - clusterInfoService.updateClusterState(clusterId, ClusterState.UNINSTALLED.getValue()); + clusterInfoService.updateClusterState(clusterId, ClusterState.DELETED.getValue()); } } } diff --git a/datasophon-service/src/main/java/com/datasophon/api/service/impl/ClusterInfoServiceImpl.java b/datasophon-service/src/main/java/com/datasophon/api/service/impl/ClusterInfoServiceImpl.java index 6b34c853..3d89878c 100644 --- a/datasophon-service/src/main/java/com/datasophon/api/service/impl/ClusterInfoServiceImpl.java +++ b/datasophon-service/src/main/java/com/datasophon/api/service/impl/ClusterInfoServiceImpl.java @@ -205,9 +205,9 @@ public void deleteCluster(List ids) { ActorUtils.getLocalActor( ClusterActor.class, "clusterActor") - .tell(new ClusterCommand(ClusterCommandType.UNINSTALL, id), ActorRef.noSender()); + .tell(new ClusterCommand(ClusterCommandType.DELETE, id), ActorRef.noSender()); - this.updateClusterState(id, ClusterState.UNINSTALLING.getValue()); + this.updateClusterState(id, ClusterState.DELETING.getValue()); // delete host // clusterHostService.removeHostByClusterId(id); From b61702cafbabe213387d069f7295b8acce8ce915 Mon Sep 17 00:00:00 2001 From: ruanwe <88fantasy@gmail.com> Date: Thu, 10 Aug 2023 15:29:12 +0800 Subject: [PATCH 6/7] [fix][api] delete cluster MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 修改为删除所有服务及集群 --- .../ClusterServiceInstanceController.java | 2 +- .../com/datasophon/common/utils/Result.java | 4 ++ .../datasophon/dao/enums/ClusterState.java | 2 - .../datasophon/api/master/ClusterActor.java | 44 +++++-------------- .../ClusterServiceInstanceService.java | 4 +- .../service/impl/ClusterInfoServiceImpl.java | 19 +++++--- .../ClusterServiceInstanceServiceImpl.java | 6 +-- 7 files changed, 37 insertions(+), 44 deletions(-) diff --git a/datasophon-api/src/main/java/com/datasophon/api/controller/ClusterServiceInstanceController.java b/datasophon-api/src/main/java/com/datasophon/api/controller/ClusterServiceInstanceController.java index b6126d22..0b953feb 100644 --- a/datasophon-api/src/main/java/com/datasophon/api/controller/ClusterServiceInstanceController.java +++ b/datasophon-api/src/main/java/com/datasophon/api/controller/ClusterServiceInstanceController.java @@ -39,7 +39,7 @@ public class ClusterServiceInstanceController { */ @RequestMapping("/list") public Result list(Integer clusterId) { - return clusterServiceInstanceService.listAll(clusterId); + return Result.success(clusterServiceInstanceService.listAll(clusterId)); } /** diff --git a/datasophon-common/src/main/java/com/datasophon/common/utils/Result.java b/datasophon-common/src/main/java/com/datasophon/common/utils/Result.java index 4092d977..cf71d91c 100644 --- a/datasophon-common/src/main/java/com/datasophon/common/utils/Result.java +++ b/datasophon-common/src/main/java/com/datasophon/common/utils/Result.java @@ -68,6 +68,10 @@ public Object getData() { return this.get(Constants.DATA); } + public boolean isSuccess() { + return this.getCode() == 200; + } + public static Result success(Object data) { Result result = new Result(); result.put(Constants.CODE, 200); diff --git a/datasophon-dao/src/main/java/com/datasophon/dao/enums/ClusterState.java b/datasophon-dao/src/main/java/com/datasophon/dao/enums/ClusterState.java index 5480577a..437868ea 100644 --- a/datasophon-dao/src/main/java/com/datasophon/dao/enums/ClusterState.java +++ b/datasophon-dao/src/main/java/com/datasophon/dao/enums/ClusterState.java @@ -24,8 +24,6 @@ public enum ClusterState { - DELETED(5, "已删除"), - DELETING(4, "删除中"), STOP(3, "停止"), diff --git a/datasophon-service/src/main/java/com/datasophon/api/master/ClusterActor.java b/datasophon-service/src/main/java/com/datasophon/api/master/ClusterActor.java index 4c1efd55..375aaf83 100644 --- a/datasophon-service/src/main/java/com/datasophon/api/master/ClusterActor.java +++ b/datasophon-service/src/main/java/com/datasophon/api/master/ClusterActor.java @@ -20,11 +20,7 @@ import akka.actor.UntypedActor; import cn.hutool.core.date.DateUtil; import cn.hutool.extra.spring.SpringUtil; -import com.datasophon.api.service.ClusterHostService; -import com.datasophon.api.service.ClusterInfoService; -import com.datasophon.api.service.ClusterServiceRoleGroupConfigService; -import com.datasophon.api.service.ClusterServiceRoleInstanceService; -import com.datasophon.api.service.ServiceInstallService; +import com.datasophon.api.service.*; import com.datasophon.api.utils.ProcessUtils; import com.datasophon.common.Constants; import com.datasophon.common.command.ClusterCommand; @@ -33,10 +29,7 @@ import com.datasophon.common.model.ServiceConfig; import com.datasophon.common.utils.ExecResult; import com.datasophon.common.utils.Result; -import com.datasophon.dao.entity.ClusterHostEntity; -import com.datasophon.dao.entity.ClusterInfoEntity; -import com.datasophon.dao.entity.ClusterServiceRoleGroupConfig; -import com.datasophon.dao.entity.ClusterServiceRoleInstanceEntity; +import com.datasophon.dao.entity.*; import com.datasophon.dao.enums.ClusterState; import com.datasophon.dao.enums.ServiceRoleState; @@ -100,9 +93,9 @@ public void onReceive(Object msg) throws Throwable { ClusterInfoEntity clusterInfo = clusterInfoService.getById(clusterId); if (Objects.nonNull(clusterInfo)) { ClusterHostService clusterHostService = SpringUtil.getBean(ClusterHostService.class); + ClusterServiceInstanceService clusterServiceInstanceService = SpringUtil.getBean(ClusterServiceInstanceService.class); ClusterServiceRoleInstanceService clusterServiceRoleInstanceService = SpringUtil.getBean(ClusterServiceRoleInstanceService.class); ClusterServiceRoleGroupConfigService clusterServiceRoleGroupConfigService = SpringUtil.getBean(ClusterServiceRoleGroupConfigService.class); - ServiceInstallService serviceInstallService = SpringUtil.getBean(ServiceInstallService.class); // 检查服务实例配置与目录 List roleInstanceList = clusterServiceRoleInstanceService.getServiceRoleInstanceListByClusterId(clusterId); @@ -110,22 +103,16 @@ public void onReceive(Object msg) throws Throwable { String roleName = roleInstance.getServiceRoleName(); String hostname = roleInstance.getHostname(); ClusterServiceRoleGroupConfig config = clusterServiceRoleGroupConfigService.getConfigByRoleGroupId(roleInstance.getRoleGroupId()); - List oldConfigs = ProcessUtils.getServiceConfig(config); Map> configFileMap = new ConcurrentHashMap<>(); ProcessUtils.generateConfigFileMap(configFileMap, config); Predicate filter = c -> Constants.PATH.equals(c.getConfigType()) && !((String) c.getValue()).contains(DEPRECATED); - Consumer peeker = c -> { - String oldPath = (String) c.getValue(); - String newPath = String.format("%s_%s_%s_%s", oldPath, DEPRECATED, clusterId, DateUtil.today()); - c.setDefaultValue(oldPath); - c.setValue(newPath); - }; - for (Map.Entry> configFile : configFileMap.entrySet()) { List serviceConfigs = configFile.getValue().stream() .filter(filter) .peek(c -> { - peeker.accept(c); + String oldPath = (String) c.getValue(); + String newPath = String.format("%s_%s_%s_%s", oldPath, DEPRECATED, clusterId, DateUtil.today()); + c.setValue(newPath); c.setConfigType(Constants.MV_PATH); }) .collect(Collectors.toList()); @@ -150,16 +137,6 @@ public void onReceive(Object msg) throws Throwable { "{} uninstall success in {}", roleName, hostname); - //保存配置 - serviceInstallService.saveServiceConfig(clusterId, - roleInstance.getServiceName(), - oldConfigs.stream().peek(c -> { - if (filter.test(c)) { - peeker.accept(c); - } - }).collect(Collectors.toList()), - roleInstance.getRoleGroupId()); - } else { logger.info( "{} uninstall failed in {}", @@ -178,9 +155,12 @@ public void onReceive(Object msg) throws Throwable { } } } - List hostList = clusterHostService.getHostListByClusterId(clusterId); - clusterHostService.deleteHosts(hostList.stream().map(h -> String.valueOf(h.getId())).collect(Collectors.joining(Constants.COMMA))); - clusterInfoService.updateClusterState(clusterId, ClusterState.DELETED.getValue()); + List serviceInstanceList = clusterServiceInstanceService.listAll(clusterId); + if(serviceInstanceList.stream().allMatch(instance -> clusterServiceInstanceService.delServiceInstance(instance.getId()).isSuccess())) { + List hostList = clusterHostService.getHostListByClusterId(clusterId); + clusterHostService.deleteHosts(hostList.stream().map(h -> String.valueOf(h.getId())).collect(Collectors.joining(Constants.COMMA))); + clusterInfoService.removeById(clusterId); + } } } } diff --git a/datasophon-service/src/main/java/com/datasophon/api/service/ClusterServiceInstanceService.java b/datasophon-service/src/main/java/com/datasophon/api/service/ClusterServiceInstanceService.java index 7b7d74e6..b52d3c2d 100644 --- a/datasophon-service/src/main/java/com/datasophon/api/service/ClusterServiceInstanceService.java +++ b/datasophon-service/src/main/java/com/datasophon/api/service/ClusterServiceInstanceService.java @@ -36,7 +36,7 @@ public interface ClusterServiceInstanceService extends IService listAll(Integer clusterId); Result downloadClientConfig(Integer clusterId, String serviceName); @@ -47,4 +47,6 @@ public interface ClusterServiceInstanceService extends IService listRunningServiceInstance(Integer clusterId); + + public boolean hasRunningRoleInstance(Integer serviceInstanceId); } diff --git a/datasophon-service/src/main/java/com/datasophon/api/service/impl/ClusterInfoServiceImpl.java b/datasophon-service/src/main/java/com/datasophon/api/service/impl/ClusterInfoServiceImpl.java index 3d89878c..2741ff27 100644 --- a/datasophon-service/src/main/java/com/datasophon/api/service/impl/ClusterInfoServiceImpl.java +++ b/datasophon-service/src/main/java/com/datasophon/api/service/impl/ClusterInfoServiceImpl.java @@ -20,8 +20,8 @@ import akka.actor.ActorRef; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; -import com.datasophon.api.load.ConfigBean; import com.datasophon.api.enums.Status; +import com.datasophon.api.load.ConfigBean; import com.datasophon.api.load.GlobalVariables; import com.datasophon.api.master.ActorUtils; import com.datasophon.api.master.ClusterActor; @@ -88,6 +88,9 @@ public class ClusterInfoServiceImpl extends ServiceImpl ids) { Integer id = ids.get(0); ClusterInfoEntity clusterInfo = this.getById(id); - ActorUtils.getLocalActor( - ClusterActor.class, "clusterActor") - .tell(new ClusterCommand(ClusterCommandType.DELETE, id), ActorRef.noSender()); + if (ClusterState.STOP.equals(clusterInfo.getClusterState())) { + List serviceInstanceList = clusterServiceInstanceService.listAll(id); + if (serviceInstanceList.stream().noneMatch(instance -> clusterServiceInstanceService.hasRunningRoleInstance(instance.getId()))) { + ActorUtils.getLocalActor( + ClusterActor.class, "clusterActor") + .tell(new ClusterCommand(ClusterCommandType.DELETE, id), ActorRef.noSender()); + + this.updateClusterState(id, ClusterState.DELETING.getValue()); + } + } - this.updateClusterState(id, ClusterState.DELETING.getValue()); // delete host // clusterHostService.removeHostByClusterId(id); diff --git a/datasophon-service/src/main/java/com/datasophon/api/service/impl/ClusterServiceInstanceServiceImpl.java b/datasophon-service/src/main/java/com/datasophon/api/service/impl/ClusterServiceInstanceServiceImpl.java index b34ec72f..e5c61903 100644 --- a/datasophon-service/src/main/java/com/datasophon/api/service/impl/ClusterServiceInstanceServiceImpl.java +++ b/datasophon-service/src/main/java/com/datasophon/api/service/impl/ClusterServiceInstanceServiceImpl.java @@ -91,7 +91,7 @@ public String getServiceConfigByClusterIdAndServiceName(Integer clusterId, Strin } @Override - public Result listAll(Integer clusterId) { + public List listAll(Integer clusterId) { Map globalVariables = GlobalVariables.get(clusterId); List list = this.list(new QueryWrapper() .eq(Constants.CLUSTER_ID, clusterId).orderByAsc(Constants.SORT_NUM)); @@ -166,7 +166,7 @@ public Result listAll(Integer clusterId) { this.updateById(serviceInstance); } } - return Result.success(list); + return list; } @Override @@ -253,7 +253,7 @@ public List listRunningServiceInstance(Integer clu .eq(Constants.SERVICE_STATE, ServiceState.RUNNING)); } - private boolean hasRunningRoleInstance(Integer serviceInstanceId) { + public boolean hasRunningRoleInstance(Integer serviceInstanceId) { List list = roleInstanceService.getRunningServiceRoleInstanceListByServiceId(serviceInstanceId); return !list.isEmpty(); From 1ab6d83e6f9ed71e57b20f3c687dee131d193ea2 Mon Sep 17 00:00:00 2001 From: ruanwe <88fantasy@gmail.com> Date: Thu, 10 Aug 2023 15:30:20 +0800 Subject: [PATCH 7/7] [fix][api] delete cluster code clean --- .../main/java/com/datasophon/api/master/ClusterActor.java | 6 ++---- .../api/service/ClusterServiceInstanceService.java | 2 +- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/datasophon-service/src/main/java/com/datasophon/api/master/ClusterActor.java b/datasophon-service/src/main/java/com/datasophon/api/master/ClusterActor.java index 375aaf83..3aaf82ab 100644 --- a/datasophon-service/src/main/java/com/datasophon/api/master/ClusterActor.java +++ b/datasophon-service/src/main/java/com/datasophon/api/master/ClusterActor.java @@ -32,18 +32,16 @@ import com.datasophon.dao.entity.*; import com.datasophon.dao.enums.ClusterState; import com.datasophon.dao.enums.ServiceRoleState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Consumer; import java.util.function.Predicate; import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * 节点状态监测 diff --git a/datasophon-service/src/main/java/com/datasophon/api/service/ClusterServiceInstanceService.java b/datasophon-service/src/main/java/com/datasophon/api/service/ClusterServiceInstanceService.java index b52d3c2d..9a2548b5 100644 --- a/datasophon-service/src/main/java/com/datasophon/api/service/ClusterServiceInstanceService.java +++ b/datasophon-service/src/main/java/com/datasophon/api/service/ClusterServiceInstanceService.java @@ -48,5 +48,5 @@ public interface ClusterServiceInstanceService extends IService listRunningServiceInstance(Integer clusterId); - public boolean hasRunningRoleInstance(Integer serviceInstanceId); + boolean hasRunningRoleInstance(Integer serviceInstanceId); }