Skip to content

Commit

Permalink
release 1.1.9.1-3.x
Browse files Browse the repository at this point in the history
  • Loading branch information
yanhom1314 committed Oct 23, 2024
2 parents f1779da + 2fd9d72 commit 423d670
Show file tree
Hide file tree
Showing 12 changed files with 164 additions and 27 deletions.
16 changes: 8 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,29 +84,29 @@ protected void afterExecute(Runnable r, Throwable t);
>
> 4. 集成常用三方中间件内部线程池管理
**经过多个版本的迭代,目前最新版本 v1.1.7 具有以下特性**
**经过多个版本的迭代,目前最新版本 v1.1.9 具有以下特性**

- **代码零侵入**:我们改变了线程池以往的使用姿势,所有配置均放在配置中心,服务启动时会从配置中心拉取配置生成线程池对象放到 Spring 容器中,使用时直接从 Spring 容器中获取,对业务代码零侵入

- **轻量简单**:使用起来极其简单,引入相应依赖,接入只需简单 4 步就可完成,顺利 3 分钟搞定,相当丝滑

- **通知告警**:提供多种通知告警维度(配置变更通知、活性报警、队列容量阈值报警、拒绝触发报警、任务执行或等待超时报警),触发配置阈值实时推送告警信息,已支持企微、钉钉、飞书、邮件、云之家报警,同时提供 SPI 接口可自定义扩展实现

- **运行监控**:定时采集线程池指标数据(20 多种指标,包含线程池维度、队列维度、任务维度、tps、tp99等),支持通过 MicroMeter、JsonLog 两种方式,也可以通过 SpringBoot Endpoint 端点实时获取最新指标数据,同时提供 SPI 接口可自定义扩展实现
- **运行监控**:定时采集线程池指标数据(20 多种指标,包含线程池维度、队列维度、任务维度、tps、tpxx 等),支持通过 MicroMeter、JsonLog、JMX 三种方式定时获取,也可以通过 SpringBoot Endpoint 端点实时获取最新指标数据,同时提供 SPI 接口可自定义扩展实现

- **任务增强**:提供任务包装功能(比 Spring 线程池任务包装更强大),实现 TaskWrapper 接口即可,如 MdcTaskWrapper、TtlTaskWrapper、SwTraceTaskWrapper、OpenTelemetryWrapper,可以支持线程池上下文信息传递

- **多配置中心支持**:支持多种主流配置中心,包括 Nacos、Apollo、Zookeeper、Consul、Etcd、Polaris、ServiceComb,同时也提供 SPI 接口可自定义扩展实现

- **中间件线程池管理**:集成管理常用第三方组件的线程池,已集成 Tomcat、Jetty、Undertow、Dubbo、RocketMq、Hystrix、Grpc、Motan、Okhttp3、Brpc、Tars、SofaRpc、RabbitMq 等组件的线程池管理(调参、监控报警)

- **轻量简单**:使用起来极其简单,引入相应依赖,接入只需简单 4 步就可完成,顺利 3 分钟搞定,相当丝滑
- **中间件线程池管理**:集成管理常用第三方组件的线程池,已集成 Tomcat、Jetty、Undertow、Dubbo、RocketMq、Hystrix、Grpc、Motan、Okhttp3、Brpc、Tars、SofaRpc、RabbitMq、Liteflow 等组件的线程池管理(动态调参、监控、报警)

- **多模式**:提供了增强线程池 DtpExecutor,IO 密集型场景使用的线程池 EagerDtpExecutor,调度线程池 ScheduledDtpExecutor,有序线程池 OrderedDtpExecutor,可以根据业务场景选择合适的线程池

- **兼容性**:JUC 普通线程池和 Spring 中的 ThreadPoolTaskExecutor 也可以被框架管理,@Bean 定义时加 @DynamicTp 注解即可
- **兼容性**:JUC 普通线程池和 Spring 中的 ThreadPoolTaskExecutor 也可以被框架管理,只需@Bean 定义时加 @DynamicTp 注解即可

- **可靠性**:依靠 Spring 生命周期管理,可以做到优雅关闭线程池,在 Spring 容器关闭前尽可能多的处理队列中的任务

- **高可扩展**:框架核心功能都提供 SPI 接口供用户自定义个性化实现(配置中心、配置文件解析、通知告警、监控数据采集、任务包装等等
- **高可扩展**:框架核心功能都提供 SPI 接口供用户自定义个性化实现(配置中心、配置文件解析、通知告警、监控数据采集、任务包装、拒绝策略等等

- **线上大规模应用**:参考[美团线程池实践](https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html),美团内部已经有该理论成熟的应用经验

Expand Down Expand Up @@ -200,7 +200,7 @@ protected void afterExecute(Runnable r, Throwable t);

看到这儿,**请给项目一个 star**,你的支持是我们前进的动力!

使用过程中有任何问题,或者对项目有什么想法或者建议,可以加入社群,跟 1000+ 群友一起交流讨论。
使用过程中有任何问题,或者对项目有什么想法或者建议,可以加入社群,跟 1500+ 群友一起交流讨论。

微信群已满 200 人,可以关注微信公众号,加我个人微信拉群(备注:dynamic-tp)。

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.slf4j.MDC;

import java.util.Objects;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

import static org.dromara.dynamictp.common.constant.DynamicTpConst.TRACE_ID;
Expand Down Expand Up @@ -51,8 +50,10 @@ private static void tryPrintError(Runnable r, Throwable t) {
}
if (r instanceof FutureTask) {
try {
Future<?> future = (Future<?>) r;
future.get();
FutureTask<?> future = (FutureTask<?>) r;
if (future.isDone()) {
future.get();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public final class CollectorHandler {

private CollectorHandler() {
List<MetricsCollector> loadedCollectors = ExtensionServiceLoader.get(MetricsCollector.class);
loadedCollectors.forEach(collector -> COLLECTORS.put(collector.type(), collector));
loadedCollectors.forEach(collector -> COLLECTORS.put(collector.type().toLowerCase(), collector));

MetricsCollector microMeterCollector = new MicroMeterCollector();
LogCollector logCollector = new LogCollector();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public final class NotifierHandler {

private NotifierHandler() {
List<DtpNotifier> loadedNotifiers = ExtensionServiceLoader.get(DtpNotifier.class);
loadedNotifiers.forEach(notifier -> NOTIFIERS.put(notifier.platform(), notifier));
loadedNotifiers.forEach(notifier -> NOTIFIERS.put(notifier.platform().toLowerCase(), notifier));

DtpNotifier dingNotifier = new DtpDingNotifier(new DingNotifier());
DtpNotifier wechatNotifier = new DtpWechatNotifier(new WechatNotifier());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,34 +58,29 @@ public ScheduledThreadPoolExecutorProxy(ScheduledThreadPoolExecutor executor) {
@Override
public void execute(Runnable command) {
command = getEnhancedTask(command);
AwareManager.execute(this, command);
super.execute(command);
}

@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
command = getEnhancedTask(command);
AwareManager.execute(this, command);
return super.schedule(command, delay, unit);
}

public <V> ScheduledFuture<V> schedule(Runnable command, V result, long delay, TimeUnit unit) {
command = getEnhancedTask(command);
AwareManager.execute(this, command);
return super.schedule(Executors.callable(command, result), delay, unit);
}

@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
command = getEnhancedTask(command);
AwareManager.execute(this, command);
return super.scheduleAtFixedRate(command, initialDelay, period, unit);
}

@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
command = getEnhancedTask(command);
AwareManager.execute(this, command);
return super.scheduleWithFixedDelay(command, initialDelay, delay, unit);
}

Expand Down
2 changes: 1 addition & 1 deletion dependencies/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<url>https://github.com/yanhom1314/dynamic-tp</url>

<properties>
<revision>1.1.9-3.x</revision>
<revision>1.1.9.1-3.x</revision>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

<maven.compiler.source>17</maven.compiler.source>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ public ScheduledExecutorService scheduledDtpExecutor() {
return ThreadPoolBuilder.newBuilder()
.threadPoolName("scheduledDtpExecutor")
.corePoolSize(2)
.dynamic(true)
.threadFactory("test-scheduled")
.rejectedExecutionHandler(CALLER_RUNS_POLICY.getName())
.buildScheduled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.dromara.dynamictp.extension.agent;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.ArrayUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
Expand All @@ -26,9 +27,11 @@
import java.lang.ref.SoftReference;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -59,7 +62,7 @@ public String getName() {
return "agent";
}

private DtpRunnable determineDtpRunnable(List<Field> conditionalFields, Runnable r) throws IllegalAccessException {
private DtpRunnable determineDtpRunnable(List<Field> conditionalFields, Runnable r, Set<Class> visitedClass) throws IllegalAccessException {
for (Field field : conditionalFields) {
if (Objects.isNull(field)) {
continue;
Expand All @@ -69,24 +72,29 @@ private DtpRunnable determineDtpRunnable(List<Field> conditionalFields, Runnable
if (o instanceof DtpRunnable) {
return (DtpRunnable) o;
}
if (Objects.isNull(o) || CollUtil.contains(visitedClass, o.getClass())) {
return null;
} else {
visitedClass.add(o.getClass());
}
// 纵向查找
DtpRunnable dtpRunnable = getDtpRunnable(o.getClass(), o);
DtpRunnable dtpRunnable = getDtpRunnable(o.getClass(), o, visitedClass);
if (dtpRunnable != null) {
return dtpRunnable;
}
}
return null;
}

private DtpRunnable getDtpRunnable(Class<? extends Runnable> rClass, Runnable r) throws IllegalAccessException {
private DtpRunnable getDtpRunnable(Class<? extends Runnable> rClass, Runnable r, Set<Class> visitedClass) throws IllegalAccessException {
while (Runnable.class.isAssignableFrom(rClass)) {
Field[] declaredFields = rClass.getDeclaredFields();
if (ArrayUtil.isNotEmpty(declaredFields)) {
List<Field> conditionFields = Arrays.stream(declaredFields)
.filter(ele -> Runnable.class.isAssignableFrom(ele.getType()))
.collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(conditionFields)) {
DtpRunnable dtpRunnable = determineDtpRunnable(conditionFields, r);
DtpRunnable dtpRunnable = determineDtpRunnable(conditionFields, r, visitedClass);
if (Objects.nonNull(dtpRunnable)) {
return dtpRunnable;
}
Expand All @@ -107,13 +115,13 @@ private Runnable getDtpRunnableInstance(Runnable r) {
DtpRunnable dtpRunnable = null;
Class<? extends Runnable> rClass = r.getClass();
try {
dtpRunnable = getDtpRunnable(rClass, r);
dtpRunnable = getDtpRunnable(rClass, r, new HashSet<>());
} catch (IllegalAccessException e) {
log.error("getDtpRunnable Error", e);
}
if (dtpRunnable == null) {
if (log.isWarnEnabled()) {
log.warn("DynamicTp aware [{}], can not find DtpRunnable.", getName());
if (log.isDebugEnabled()) {
log.debug("DynamicTp aware [{}], can not find DtpRunnable.", getName());
}
return r;
}
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<url>https://github.com/yanhom1314/dynamic-tp</url>

<properties>
<revision>1.1.9-3.x</revision>
<revision>1.1.9.1-3.x</revision>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,20 @@

package org.dromara.dynamictp.agent;

import org.dromara.dynamictp.core.support.ThreadPoolBuilder;
import org.dromara.dynamictp.extension.agent.AgentAware;
import org.dromara.dynamictp.core.support.task.runnable.DtpRunnable;
import org.junit.Assert;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.springframework.util.ReflectionUtils;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class AgentAwareTest {

Expand Down Expand Up @@ -124,4 +130,53 @@ public void testDeepRunnable() throws InvocationTargetException, IllegalAccessEx
Assertions.assertTrue(result == dtpRunnable);
}


@Test
public void testNestRunnable() throws InvocationTargetException, IllegalAccessException {

Runnable runnable = () -> System.out.println("test");
DtpRunnable dtpRunnable = new DtpRunnable(runnable, runnable, "test");
MyAgentNestWrapper myAgentNestWrapper = new MyAgentNestWrapper(dtpRunnable);
Method getDtpRunnableInstance = ReflectionUtils.findMethod(AgentAware.class, "getDtpRunnableInstance", Runnable.class);
getDtpRunnableInstance.setAccessible(true);
Object result = getDtpRunnableInstance.invoke(new AgentAware(), myAgentNestWrapper);
Assertions.assertTrue(dtpRunnable == dtpRunnable);
}

@Test
public void testContainNestRunnable() throws InvocationTargetException, IllegalAccessException {

Runnable runnable = () -> System.out.println("test");
DtpRunnable dtpRunnable = new DtpRunnable(runnable, runnable, "test");
MyAgentNestWrapper myAgentNestWrapper = new MyAgentNestWrapper(dtpRunnable);

MyAgentContainNestWrapper myAgentContainNestWrapper = new MyAgentContainNestWrapper(myAgentNestWrapper);

Method getDtpRunnableInstance = ReflectionUtils.findMethod(AgentAware.class, "getDtpRunnableInstance", Runnable.class);
getDtpRunnableInstance.setAccessible(true);
Object result = getDtpRunnableInstance.invoke(new AgentAware(), myAgentContainNestWrapper);
Assertions.assertTrue(dtpRunnable == dtpRunnable);
}

@Test
public void testScheduledThreadPoolExecutor() throws InterruptedException {
ScheduledExecutorService scheduledExecutorService = ThreadPoolBuilder.newBuilder()
.dynamic(true)
.corePoolSize(1)
.scheduled()
.buildScheduled();

AtomicInteger count = new AtomicInteger();
CountDownLatch downLatch = new CountDownLatch(3);
scheduledExecutorService.scheduleAtFixedRate(() -> {
if (count.get() >= 3) {
throw new RuntimeException("down");
}
count.incrementAndGet();
downLatch.countDown();
}, 1, 1, TimeUnit.SECONDS);

downLatch.await();
Assert.assertEquals(3, count.get());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.dromara.dynamictp.agent;

public class MyAgentContainNestWrapper implements Runnable {

private MyAgentNestWrapper agentNestWrapper;

public MyAgentContainNestWrapper(MyAgentNestWrapper agentNestWrapper) {
this.agentNestWrapper = agentNestWrapper;
}

@Override
public void run() {
System.out.println("before");
agentNestWrapper.run();
System.out.println("after");
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.dromara.dynamictp.agent;

import org.dromara.dynamictp.core.support.task.runnable.DtpRunnable;

public class MyAgentNestWrapper implements Runnable {

private MyAgentNestWrapper myAgentNestWrapper = this;

private DtpRunnable dtpRunnable;

public MyAgentNestWrapper(DtpRunnable dtpRunnable) {
this.dtpRunnable = dtpRunnable;
}

@Override
public void run() {
System.out.println("before");
try {
dtpRunnable.run();
} finally {
System.out.println("finally");
}
}
}

0 comments on commit 423d670

Please sign in to comment.