Skip to content

Commit

Permalink
fix cluster fo
Browse files Browse the repository at this point in the history
  • Loading branch information
qingwen220 committed Jan 17, 2025
1 parent a421b12 commit bfd329f
Show file tree
Hide file tree
Showing 18 changed files with 372 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ public GeaflowHeartbeatException() {
super(MESSAGE);
}

public GeaflowHeartbeatException(Throwable cause) {
public GeaflowHeartbeatException(String message) {
super(message);
}

public GeaflowHeartbeatException(String message, Throwable cause) {
super(MESSAGE, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import java.lang.management.RuntimeMXBean;
import java.lang.reflect.Field;
import java.net.InetAddress;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -112,14 +114,23 @@ public static synchronized int getProcessPid(Process p) {
return pid;
}

public static synchronized void killProcess(int pid) {
LOGGER.info("Kill -9 {}", pid);
public static void killProcess(int pid) {
execute("kill -9 " + pid);
}

public static void killProcesses(List<Integer> pidList) {
String pids = StringUtils.join(pidList, " ");
execute("kill -9 " + pids);
}

public static void execute(String cmd) {
LOGGER.info(cmd);
try {
Process process = Runtime.getRuntime().exec("kill -9 " + pid);
Process process = Runtime.getRuntime().exec(cmd);
process.waitFor();
} catch (InterruptedException | IOException e) {
LOGGER.error("Kill {} failed: {}", pid, e.getMessage());
throw new GeaflowRuntimeException(e);
LOGGER.error(" {} failed: {}", cmd, e);
throw new GeaflowRuntimeException(e.getMessage(), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2023 AntGroup CO., Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/

package com.antgroup.geaflow.common.utils;

import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertThrows;

import com.antgroup.geaflow.common.exception.GeaflowRuntimeException;
import java.io.IOException;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class ProcessUtilTest {

@Mock
private Runtime runtime;

@Mock
private Process process;

@BeforeMethod
public void setUp() throws IOException {
MockitoAnnotations.initMocks(this);
when(runtime.exec(anyString())).thenReturn(process);
}

@Test
public void execute_CommandThrowsIOException_ExceptionHandled() throws IOException, InterruptedException {
String cmd = "some command";
when(runtime.exec(anyString())).thenThrow(new IOException("IO error"));

assertThrows(GeaflowRuntimeException.class, () -> ProcessUtil.execute(cmd));
}

@Test
public void execute_CommandThrowsInterruptedException_ExceptionHandled() throws IOException, InterruptedException {
String cmd = "some command";
when(runtime.exec(anyString())).thenReturn(process);
doThrow(new InterruptedException("Interrupted")).when(process).waitFor();

assertThrows(GeaflowRuntimeException.class, () -> ProcessUtil.execute(cmd));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,19 +102,21 @@ public void load() {
ClusterMetaStore metaStore = ClusterMetaStore.getInstance(id, name, config);
Map<Integer, String> drivers = metaStore.getDriverIds();
Map<Integer, String> containerIds = metaStore.getContainerIds();
if (drivers != null && !drivers.isEmpty() && containerIds != null && !containerIds
.isEmpty()) {
int driverNum = drivers == null ? 0 : drivers.size();
int containerNum = containerIds == null ? 0 : containerIds.size();
if (driverNum != 0 && containerNum != 0) {
this.isRecover = true;
this.driverIds = drivers;
this.containerIds = containerIds;
this.maxComponentId = metaStore.getMaxContainerId();
LOGGER.info("recover {} containers and {} drivers maxComponentId {} from metaStore",
containerIds.size(), drivers.size(), maxComponentId);
LOGGER.info("recover {} containers and {} drivers maxComponentId: {}",
containerNum, driverNum, maxComponentId);
} else {
this.isRecover = false;
this.driverIds = new ConcurrentHashMap<>();
this.containerIds = new ConcurrentHashMap<>();
this.maxComponentId = 0;
LOGGER.info("init with maxComponentId: {}", maxComponentId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,12 @@ public void run() {
Thread.currentThread().setName(this.name);
try {
this.fetch();
} catch (GeaflowRuntimeException e) {
LOGGER.error("fetcher task err with window id {} {}", this.targetWindowId, this.name, e);
throw e;
} catch (Throwable e) {
LOGGER.error("fetcher task err with window id {} {}", this.targetWindowId, this.name, e);
throw new GeaflowRuntimeException(e);
throw new GeaflowRuntimeException(e.getMessage(), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,8 @@ public class HeartbeatManager implements Serializable {
private final ScheduledFuture<?> reportFuture;
private final ScheduledExecutorService checkTimeoutService;
private final ScheduledExecutorService heartbeatReportService;
private final GeaflowHeartbeatException timeoutException;
private ScheduledFuture<?> checkFuture;
private IStatsWriter statsWriter;
private final IStatsWriter statsWriter;

public HeartbeatManager(Configuration config, IClusterManager clusterManager) {
this.senderMap = new ConcurrentHashMap<>();
Expand All @@ -86,14 +85,14 @@ public HeartbeatManager(Configuration config, IClusterManager clusterManager) {
this.checkFuture = checkTimeoutService.scheduleAtFixedRate(this::checkSupervisorHealth,
heartbeatCheckMs, heartbeatCheckMs, TimeUnit.MILLISECONDS);
}

this.heartbeatReportService = new ScheduledThreadPoolExecutor(1,
ThreadUtil.namedThreadFactory(true, "heartbeat-report"));
this.reportFuture = heartbeatReportService
.scheduleAtFixedRate(this::reportHeartbeat, heartbeatReportMs, heartbeatReportMs,
TimeUnit.MILLISECONDS);

this.clusterManager = (AbstractClusterManager) clusterManager;
this.timeoutException = new GeaflowHeartbeatException();
this.statsWriter = StatsCollectorFactory.init(config).getStatsWriter();
}

Expand All @@ -107,7 +106,7 @@ public void registerMasterHeartbeat(ComponentInfo masterInfo) {
this.statsWriter.addMetric(masterInfo.getName(), masterInfo);
}

private void checkHeartBeat() {
void checkHeartBeat() {
try {
long checkTime = System.currentTimeMillis();
checkTimeout(clusterManager.getContainerIds(), checkTime);
Expand All @@ -128,8 +127,9 @@ private void checkTimeout(Map<Integer, String> map, long checkTime) {
LOGGER.warn("{} is not registered", entry.getValue());
}
} else if (checkTime > heartbeat.getTimestamp() + heartbeatTimeoutMs) {
LOGGER.error("{} heartbeat is missing", entry.getValue());
clusterManager.doFailover(componentId, timeoutException);
String message = String.format("%s heartbeat is lost", entry.getValue());
LOGGER.error(message);
doFailover(componentId, new GeaflowHeartbeatException(message));
}
}
}
Expand All @@ -147,7 +147,7 @@ private void checkSupervisorHealth() {
checkSupervisorHealth(clusterManager.getContainerIds());
checkSupervisorHealth(clusterManager.getDriverIds());
} catch (Throwable e) {
LOGGER.warn("Check container healthy error", e);
LOGGER.warn("Check container healthy error: {}", e.getMessage(), e);
}
}

Expand All @@ -157,17 +157,23 @@ private void checkSupervisorHealth(Map<Integer, String> map) {
try {
StatusResponse response = RpcClient.getInstance().querySupervisorStatus(name);
if (!response.getIsAlive()) {
LOGGER.info("Found {} is not alive and do failover", name);
clusterManager.doFailover(entry.getKey(), timeoutException);
String message = String.format("supervisor of %s is not alive", name);
LOGGER.error(message);
doFailover(entry.getKey(), new GeaflowHeartbeatException(message));
}
} catch (Throwable e) {
LOGGER.error("Try to do failover due to exception from {}: {}", name,
e.getMessage());
clusterManager.doFailover(entry.getKey(), e);
String message = String.format("%s %s", name, e.getMessage());
LOGGER.error(message, e);
doFailover(entry.getKey(), new GeaflowHeartbeatException(message, e));
}
}
}

void doFailover(int componentId, Throwable e) {
StatsCollectorFactory.getInstance().getExceptionCollector().reportException(e);
clusterManager.doFailover(componentId, e);
}

protected boolean isRegistered(int componentId) {
AbstractClusterManager cm = clusterManager;
return cm.getContainerInfos().containsKey(componentId) || cm.getDriverInfos()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,22 +66,28 @@ public void startProcess() {
int code = childProcess.waitFor();
LOGGER.warn("Child process {} exits with code: {} and alive: {}", pid, code,
childProcess.isAlive());
if (code == 0) {
break;
// 0: success, 137: killed by SIGKILL, 143: killed by SIGTERM
if (code == 0 || code == 137 || code == 143) {
return;
}
if (restarts == 0) {
String errMsg = String.format("Latest process %s exits with code: %s: "
+ "Exhausted after retrying startup %s times. ", pid, code, maxRestarts + 1);
String errMsg;
if (maxRestarts == 0) {
errMsg = String.format("process exits code: %s", code);
} else {
errMsg = String.format("process exits code: %s, exhausted %s restarts",
code, maxRestarts);
}
throw new GeaflowRuntimeException(errMsg);
}
restarts--;
} while (true);
} catch (Exception e) {
} catch (GeaflowRuntimeException e) {
LOGGER.error("FATAL: start command failed: {}", command, e);
if (e instanceof GeaflowRuntimeException) {
throw (GeaflowRuntimeException) e;
}
throw new GeaflowRuntimeException(e);
throw e;
} catch (Throwable e) {
LOGGER.error("FATAL: start command failed: {}", command, e);
throw new GeaflowRuntimeException(e.getMessage(), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
import com.baidu.brpc.server.RpcServerOptions;
import com.google.common.base.Preconditions;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -104,23 +106,29 @@ public boolean isWorkerAlive() {
return false;
}

public void stopWorker() {
stopWorker(mainRunner.getProcessId());
}

public void stopWorker(int pid) {
Preconditions.checkArgument(pid > 0, "pid should be larger than 0");
LOGGER.info("Kill process: {}", pid);
ProcessUtil.killProcess(pid);
LOGGER.info("Stop old process if exists");

List<Integer> pids = new ArrayList<>();
pids.add(pid);

Process process = mainRunner.getProcess();
if (process.isAlive()) {
int ppid = ProcessUtil.getProcessPid(process);
if (ppid <= 0) {
LOGGER.warn("NOT found live process {}", process);
int curPid = mainRunner.getProcessId();
if (curPid <= 0) {
LOGGER.warn("Process is alive but pid not found: {}", process);
return;
}
if (pid != ppid) {
LOGGER.info("Kill parent process: {}", ppid);
process.destroy();
if (pid != curPid) {
pids.add(curPid);
}
}
ProcessUtil.killProcesses(pids);
}

public void startAgent() {
Expand All @@ -139,4 +147,10 @@ public void waitForTermination() {
}
}

public void stop() {
if (rpcService != null) {
rpcService.stopService();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class ClusterMetaStore {
private final String clusterId;
private final Configuration configuration;
private final IClusterMetaKVStore<String, Object> componentBackend;
private final Map<String, IClusterMetaKVStore<String, Object>> backends;
private Map<String, IClusterMetaKVStore<String, Object>> backends;

private ClusterMetaStore(int id, String name, Configuration configuration) {
this.componentId = id;
Expand Down Expand Up @@ -79,7 +79,9 @@ public static ClusterMetaStore getInstance() {
public static synchronized void close() {
LOGGER.info("close ClusterMetaStore");
if (INSTANCE != null) {
for (IClusterMetaKVStore<String, Object> backend : INSTANCE.backends.values()) {
Map<String, IClusterMetaKVStore<String, Object>> backends = INSTANCE.backends;
INSTANCE.backends = null;
for (IClusterMetaKVStore<String, Object> backend : backends.values()) {
backend.close();
}
INSTANCE = null;
Expand Down Expand Up @@ -220,6 +222,10 @@ private IClusterMetaKVStore<String, Object> getBackend(ClusterMetaKey metaKey) {
default:
return componentBackend;
}
// Cluster meta store is closed.
if (backends == null) {
return null;
}
if (!backends.containsKey(namespace)) {
synchronized (ClusterMetaStore.class) {
if (!backends.containsKey(namespace)) {
Expand Down
Loading

0 comments on commit bfd329f

Please sign in to comment.