Skip to content

Commit

Permalink
optimize stop thread
Browse files Browse the repository at this point in the history
  • Loading branch information
ponfee committed Dec 28, 2023
1 parent 1068cae commit cccf3ef
Show file tree
Hide file tree
Showing 28 changed files with 125 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public static <T> T execute(ThrowingSupplier<T, Throwable> action, int retryMaxC
if (traceId == null) {
traceId = UuidUtils.uuid32();
}
LOG.error("Execute failed, will retrying: " + (i + 1) + " | " + traceId, e);
LOG.error("Execute failed, will retrying: " + (i + 1) + ", " + traceId, e);
Thread.sleep((i + 1) * retryBackoffPeriod);
} else {
LOG.error("Execute failed, retried max count: " + traceId, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ private LoggedUncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
if (e instanceof java.lang.ThreadDeath) {
LOG.warn("Uncaught exception handle, thread death: {} | {}", t.getName(), e.getMessage());
LOG.warn("Uncaught exception handle, thread death: {}, {}", t.getName(), e.getMessage());
} else if (e instanceof InterruptedException) {
LOG.warn("Uncaught exception handle, thread interrupted: {} | {}", t.getName(), e.getMessage());
LOG.warn("Uncaught exception handle, thread interrupted: {}, {}", t.getName(), e.getMessage());
} else {
LOG.error("Uncaught exception handle, occur error: " + t.getName(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ForkJoinPool;

/**
* Thread utilities
*
Expand Down Expand Up @@ -62,59 +64,14 @@ public static boolean isStopped(Thread thread) {
return thread.getState() == Thread.State.TERMINATED;
}

public static void stopThread(Thread thread, long joinMillis) {
stopThread(thread, joinMillis, 0, 0);
}

/**
* Stops the thread
*
* @param thread the thread
* @param joinMillis the joinMillis
* @param sleepCount the sleepCount
* @param sleepMillis the sleepMillis
*/
public static void stopThread(Thread thread, long joinMillis, int sleepCount, long sleepMillis) {
if (isStopped(thread)) {
return;
}

if (Thread.currentThread() == thread) {
LOG.warn("Call stop on self thread: {}\n{}", thread.getName(), getStackTrace());
thread.interrupt();
stopThread(thread);
return;
}

// sleep for wait the tread run method block code execute finish
LOG.info("Thread stopping: {}", thread.getName());
while (sleepCount-- > 0 && sleepMillis > 0 && !isStopped(thread)) {
try {
// Wait some time
Thread.sleep(sleepMillis);
} catch (InterruptedException e) {
LOG.error("Waiting thread terminal interrupted: " + thread.getName(), e);
Thread.currentThread().interrupt();
break;
}
}

if (isStopped(thread)) {
return;
}

// interrupt and wait joined
thread.interrupt();
if (joinMillis > 0) {
try {
thread.join(joinMillis);
} catch (InterruptedException e) {
LOG.error("Join thread terminal interrupted: " + thread.getName(), e);
Thread.currentThread().interrupt();
}
}

stopThread(thread);
public static void stopThread(Thread thread, long joinMillis) {
stopThread(false, thread, joinMillis);
}

public static void interruptIfNecessary(Throwable t) {
Expand Down Expand Up @@ -152,6 +109,35 @@ private static String buildStackTrace(StackTraceElement[] traces) {
return builder.toString();
}

private static void stopThread(boolean async, Thread thread, long joinMillis) {
if (isStopped(thread)) {
return;
}

if (Thread.currentThread() == thread) {
if (async) {
LOG.warn("Call stop on self thread: {}\n{}", thread.getName(), getStackTrace());
stopThread(thread);
} else {
ForkJoinPool.commonPool().execute(() -> stopThread(true, thread, Math.max(joinMillis, 10)));
}
return;
}

// interrupt and wait joined
thread.interrupt();
if (joinMillis > 0) {
try {
thread.join(joinMillis);
} catch (InterruptedException e) {
LOG.error("Join thread terminal interrupted: " + thread.getName(), e);
Thread.currentThread().interrupt();
}
}

stopThread(thread);
}

/**
* Stop the thread
*
Expand All @@ -162,6 +148,7 @@ private static void stopThread(Thread thread) {
return;
}

thread.interrupt();
try {
// 调用后,thread中正在执行的run方法内部会抛出java.lang.ThreadDeath异常
// 如果在run方法内用 try{...} catch(Throwable e){} 捕获住,则线程不会停止执行
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public static void killProcess(Long pid, Charset charset) {
Process killProcess = new ProcessBuilder("taskkill", "/PID", String.valueOf(pid), "/F", "/T").start();
waitFor(killProcess, () -> "kill process id " + pid);
if (LOG.isInfoEnabled()) {
LOG.info("Stop windows process verbose: {} | {}", pid, processVerbose(killProcess, charset));
LOG.info("Stop windows process verbose: {}, {}", pid, processVerbose(killProcess, charset));
}
destroy(killProcess);
} else if (SystemUtils.IS_OS_UNIX) {
Expand All @@ -133,7 +133,7 @@ public static void killProcess(Long pid, Charset charset) {
Process killProcess = new ProcessBuilder("kill", "-9", String.valueOf(pid)).start();
waitFor(killProcess, () -> "kill process id " + pid);
if (LOG.isInfoEnabled()) {
LOG.info("Stop unix process verbose: {} | {}", pid, processVerbose(killProcess, charset));
LOG.info("Stop unix process verbose: {}, {}", pid, processVerbose(killProcess, charset));
}
destroy(killProcess);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public ExecuteResult execute(ExecutingTask executingTask, Savepoint savepoint) t

Process process = Runtime.getRuntime().exec(commandParam.cmdarray, commandParam.envp);
this.pid = ProcessUtils.getProcessId(process);
LOG.info("Command process id: {} | {}", executingTask.getTaskId(), pid);
LOG.info("Command process id: {}, {}", executingTask.getTaskId(), pid);
return JobUtils.completeProcess(process, charset, executingTask, LOG);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public ExecuteResult execute(ExecutingTask executingTask, Savepoint savepoint) t

Process process = scriptParam.type.exec(scriptPath, scriptParam.envp);
this.pid = ProcessUtils.getProcessId(process);
LOG.info("Script process id: {} | {}", executingTask.getTaskId(), pid);
LOG.info("Script process id: {}, {}", executingTask.getTaskId(), pid);
return JobUtils.completeProcess(process, charset, executingTask, LOG);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ private boolean doDispatch(List<DispatchTaskParam> params) {

try {
doDispatch(task);
log.info("Task trace [{}] dispatched: {} | {}", task.getTaskId(), task.getOperation(), task.getWorker());
log.info("Task trace [{}] dispatched: {}, {}", task.getTaskId(), task.getOperation(), task.getWorker());
} catch (Throwable t) {
// dispatch failed, delay retry
retry(param);
Expand Down Expand Up @@ -178,7 +178,7 @@ private void doDispatch(ExecuteTaskParam task) throws Exception {
boolean result;
if (timingWheel != null && task.getWorker().equals(Worker.current())) {
// if the server both is supervisor & worker: dispatch to local worker
log.info("Dispatching task to local worker {} | {} | {}", task.getTaskId(), task.getOperation(), task.getWorker());
log.info("Dispatching task to local worker {}, {}, {}", task.getTaskId(), task.getOperation(), task.getWorker());
result = timingWheel.offer(task);
} else {
// dispatch to remote worker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,18 @@ public boolean receive(ExecuteTaskParam param) {

Worker assignedWorker = param.getWorker();
if (!currentWorker.sameWorker(assignedWorker)) {
log.error("Received unmatched worker: {} | '{}' | '{}'", param.getTaskId(), currentWorker, assignedWorker);
log.error("Received unmatched worker: {}, '{}', '{}'", param.getTaskId(), currentWorker, assignedWorker);
return false;
}
if (!currentWorker.getWorkerId().equals(assignedWorker.getWorkerId())) {
// 当Worker宕机后又快速启动(重启)的情况,Supervisor从本地缓存(或注册中心)拿到的仍是旧的workerId,但任务却Http方式派发给新的workerId(同机器同端口)
// 这种情况:1、可以剔除掉,等待Supervisor重新派发即可;2、也可以不剔除掉,短暂时间内该Worker的压力会是正常情况的2倍(注册中心还存有旧workerId);
log.warn("Received former worker: {} | '{}' | '{}'", param.getTaskId(), currentWorker, assignedWorker);
log.warn("Received former worker: {}, '{}', '{}'", param.getTaskId(), currentWorker, assignedWorker);
}

boolean res = timingWheel.offer(param);
if (res) {
log.info("Task trace [{}] received: {} | {}", param.getTaskId(), param.getOperation(), param.getWorker());
log.info("Task trace [{}] received: {}, {}", param.getTaskId(), param.getOperation(), param.getWorker());
} else {
log.error("Received task failed {}", param);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ private int registerWorkerId(int workerIdBitLength) {
Object[] args = {bizTag, serverTag, usableWorkerId, System.currentTimeMillis()};
try {
jdbcTemplateWrapper.insert(REGISTER_WORKER_SQL, args);
LOG.info("Create snowflake db worker success: {} | {} | {} | {}", args);
LOG.info("Create snowflake db worker success: {}, {}, {}, {}", args);
return usableWorkerId;
} catch (Throwable ignored) {
// ignored
Expand All @@ -180,11 +180,11 @@ private int registerWorkerId(int workerIdBitLength) {
long currentTime = System.currentTimeMillis();
long lastHeartbeatTime = current.getHeartbeatTime();
if (currentTime < lastHeartbeatTime) {
throw new ClockMovedBackwardsException(String.format("Clock moved backwards: %s | %s | %d | %d", bizTag, serverTag, currentTime, lastHeartbeatTime));
throw new ClockMovedBackwardsException(String.format("Clock moved backwards: %s, %s, %d, %d", bizTag, serverTag, currentTime, lastHeartbeatTime));
}
Object[] args = {currentTime, bizTag, serverTag, lastHeartbeatTime};
if (jdbcTemplateWrapper.update(REUSE_WORKER_SQL, args) == AFFECTED_ONE_ROW) {
LOG.info("Reuse db worker id success: {} | {} | {} | {}", args);
LOG.info("Reuse db worker id success: {}, {}, {}, {}", args);
return workerId;
}

Expand All @@ -197,9 +197,9 @@ private void heartbeat() throws Throwable {
RetryTemplate.execute(() -> {
Object[] args = {System.currentTimeMillis(), bizTag, serverTag};
if (jdbcTemplateWrapper.update(HEARTBEAT_WORKER_SQL, args) == AFFECTED_ONE_ROW) {
LOG.debug("Heartbeat db worker id success: {} | {} | {}", args);
LOG.debug("Heartbeat db worker id success: {}, {}, {}", args);
} else {
LOG.error("Heartbeat db worker id failed: {} | {} | {}", args);
LOG.error("Heartbeat db worker id failed: {}, {}, {}", args);
}
}, 5, 3000L);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ private int registerWorkerId(int workerIdBitLength) throws Exception {
createEphemeral(workerIdPath0, data.serialize());
isCreatedWorkerIdPath = true;
upsertEphemeral(serverTagPath, Bytes.toBytes(usableWorkerId));
LOG.info("Created snowflake zk worker success: {} | {} | {}", serverTag, usableWorkerId, currentTime);
LOG.info("Created snowflake zk worker success: {}, {}, {}", serverTag, usableWorkerId, currentTime);
return usableWorkerId;
} catch (Throwable t) {
if (isCreatedWorkerIdPath) {
Expand Down Expand Up @@ -290,12 +290,12 @@ private int registerWorkerId(int workerIdBitLength) throws Exception {
}
long currentTime = System.currentTimeMillis();
if (currentTime < data.time) {
throw new ClockMovedBackwardsException(String.format("Clock moved backwards: %s | %s | %d", serverTagPath, currentTime, data.time));
throw new ClockMovedBackwardsException(String.format("Clock moved backwards: %s, %s, %d", serverTagPath, currentTime, data.time));
}
updateData(workerIdPath, WorkerIdData.of(currentTime, serverTag).serialize());
}

LOG.info("Reuse zk worker id success: {} | {}", serverTag, workerId0);
LOG.info("Reuse zk worker id success: {}, {}", serverTag, workerId0);

return workerId0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ protected ServerRegistry(String namespace, char separator) {
*/
protected final synchronized void refreshDiscoveredServers(List<D> servers) {
discoveryServer.refreshServers(servers);
log.debug("Refreshed discovery servers: {} | {}", discoveryRole, servers);
log.debug("Refreshed discovery servers: {}, {}", discoveryRole, servers);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public final void register(R server) {
client.agentServiceRegister(newService, token);
}
registered.add(server);
log.info("Consul server registered: {} | {}", registryRole, server);
log.info("Consul server registered: {}, {}", registryRole, server);
}

@Override
Expand All @@ -108,7 +108,7 @@ public final void deregister(R server) {
} else {
client.agentServiceDeregister(serverId, token);
}
log.info("Consul Server deregister: {} | {}", registryRole, server);
log.info("Consul Server deregister: {}, {}", registryRole, server);
} catch (Throwable t) {
log.error("Consul server deregister error.", t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public final void register(R server) {
int updateRowsAffected = update.executeUpdate();
Assert.isTrue(updateRowsAffected <= AFFECTED_ONE_ROW, () -> "Invalid update rows affected: " + updateRowsAffected);
if (updateRowsAffected == AFFECTED_ONE_ROW) {
log.info("Database register update: {} | {} | {}", namespace, registerRoleName, serialize);
log.info("Database register update: {}, {}, {}", namespace, registerRoleName, serialize);
} else {
PreparedStatement insert = psCreator.apply(REGISTER_SQL);
insert.setString(1, namespace);
Expand All @@ -164,7 +164,7 @@ public final void register(R server) {
insert.setLong(4, System.currentTimeMillis());
int insertRowsAffected = insert.executeUpdate();
Assert.isTrue(insertRowsAffected == AFFECTED_ONE_ROW, () -> "Invalid insert rows affected: " + insertRowsAffected);
log.info("Database register insert: {} | {} | {}", namespace, insertRowsAffected, serialize);
log.info("Database register insert: {}, {}, {}", namespace, insertRowsAffected, serialize);
}
});

Expand All @@ -176,7 +176,7 @@ public final void deregister(R server) {
registered.remove(server);
Object[] args = new Object[]{namespace, registerRoleName, server.serialize()};
ThrowingSupplier.doCaught(() -> jdbcTemplateWrapper.delete(DEREGISTER_SQL, args));
log.info("Server deregister: {} | {}", registryRole, server);
log.info("Server deregister: {}, {}", registryRole, server);
}

@Override
Expand Down Expand Up @@ -212,13 +212,13 @@ private void registerServers() {
RetryTemplate.executeQuietly(() -> {
Object[] updateArgs = {System.currentTimeMillis(), namespace, registerRoleName, serialize};
if (jdbcTemplateWrapper.update(HEARTBEAT_SQL, updateArgs) == AFFECTED_ONE_ROW) {
log.debug("Database heartbeat register update: {} | {} | {} | {}", updateArgs);
log.debug("Database heartbeat register update: {}, {}, {}, {}", updateArgs);
return;
}

Object[] insertArgs = {namespace, registerRoleName, serialize, System.currentTimeMillis()};
jdbcTemplateWrapper.insert(REGISTER_SQL, insertArgs);
log.debug("Database heartbeat register insert: {} | {} | {} | {}", insertArgs);
log.debug("Database heartbeat register insert: {}, {}, {}, {}", insertArgs);
}, 3, 1000L);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public final void register(R server) {
try {
client.createEphemeralKey(buildRegistryServerId(server), PLACEHOLDER_VALUE, leaseId);
registered.add(server);
log.info("Etcd server registered: {} | {}", registryRole, server);
log.info("Etcd server registered: {}, {}", registryRole, server);
} catch (Throwable e) {
throw new RegistryException("Etcd server register failed: " + server, e);
}
Expand All @@ -124,7 +124,7 @@ public final void deregister(R server) {
try {
registered.remove(server);
client.deleteKey(buildRegistryServerId(server));
log.info("Etcd server deregister: {} | {}", registryRole, server);
log.info("Etcd server deregister: {}, {}", registryRole, server);
} catch (Throwable t) {
log.error("Etcd server deregister error.", t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public final void register(R server) {
try {
namingService.registerInstance(registryRootPath, groupName, instance);
registered.add(server);
log.info("Nacos server registered: {} | {}", registryRole, server);
log.info("Nacos server registered: {}, {}", registryRole, server);
} catch (Throwable e) {
throw new RegistryException("Nacos server register failed: " + server, e);
}
Expand All @@ -100,7 +100,7 @@ public final void deregister(R server) {
try {
registered.remove(server);
namingService.deregisterInstance(registryRootPath, groupName, instance);
log.info("Nacos server deregister: {} | {}", registryRole, server);
log.info("Nacos server deregister: {}, {}", registryRole, server);
} catch (Throwable t) {
log.error("Nacos server deregister error.", t);
}
Expand Down
Loading

0 comments on commit cccf3ef

Please sign in to comment.