diff --git a/adapter/adapter-dubbo/src/main/java/org/dromara/dynamictp/adapter/dubbo/apache/EagerThreadPoolExecutorProxy.java b/adapter/adapter-dubbo/src/main/java/org/dromara/dynamictp/adapter/dubbo/apache/EagerThreadPoolExecutorProxy.java index 622ccfe68..c40c4a036 100644 --- a/adapter/adapter-dubbo/src/main/java/org/dromara/dynamictp/adapter/dubbo/apache/EagerThreadPoolExecutorProxy.java +++ b/adapter/adapter-dubbo/src/main/java/org/dromara/dynamictp/adapter/dubbo/apache/EagerThreadPoolExecutorProxy.java @@ -19,6 +19,7 @@ import org.apache.dubbo.common.threadpool.support.eager.EagerThreadPoolExecutor; import org.apache.dubbo.common.threadpool.support.eager.TaskQueue; +import org.dromara.dynamictp.common.util.ExecutorUtil; import org.dromara.dynamictp.core.aware.AwareManager; import org.dromara.dynamictp.core.aware.RejectHandlerAware; import org.dromara.dynamictp.core.aware.TaskEnhanceAware; @@ -44,7 +45,7 @@ public class EagerThreadPoolExecutorProxy extends EagerThreadPoolExecutor implem /** * Reject handler type. */ - private final String rejectHandlerType; + private String rejectHandlerType; public EagerThreadPoolExecutorProxy(EagerThreadPoolExecutor executor) { super(executor.getCorePoolSize(), executor.getMaximumPoolSize(), @@ -65,14 +66,15 @@ public void execute(Runnable command) { @Override protected void beforeExecute(Thread t, Runnable r) { - super.beforeExecute(t, r); AwareManager.beforeExecute(this, t, r); + super.beforeExecute(t, r); } @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); AwareManager.afterExecute(this, r, t); + ExecutorUtil.tryExecAfterExecute(r, t); } @Override @@ -89,4 +91,9 @@ public void setTaskWrappers(List taskWrappers) { public String getRejectHandlerType() { return rejectHandlerType; } + + @Override + public void setRejectHandlerType(String rejectHandlerType) { + this.rejectHandlerType = rejectHandlerType; + } } diff --git a/adapter/adapter-motan/src/main/java/org/dromara/dynamictp/adapter/motan/StandardThreadExecutorProxy.java b/adapter/adapter-motan/src/main/java/org/dromara/dynamictp/adapter/motan/StandardThreadExecutorProxy.java index 1f852d057..cfab39584 100644 --- a/adapter/adapter-motan/src/main/java/org/dromara/dynamictp/adapter/motan/StandardThreadExecutorProxy.java +++ b/adapter/adapter-motan/src/main/java/org/dromara/dynamictp/adapter/motan/StandardThreadExecutorProxy.java @@ -18,6 +18,7 @@ package org.dromara.dynamictp.adapter.motan; import com.weibo.api.motan.transport.netty.StandardThreadExecutor; +import org.dromara.dynamictp.common.util.ExecutorUtil; import org.dromara.dynamictp.common.util.ReflectionUtil; import org.dromara.dynamictp.core.aware.AwareManager; import org.dromara.dynamictp.core.aware.RejectHandlerAware; @@ -42,7 +43,7 @@ public class StandardThreadExecutorProxy extends StandardThreadExecutor implemen */ private List taskWrappers; - private final String rejectHandlerType; + private String rejectHandlerType; public StandardThreadExecutorProxy(StandardThreadExecutor executor) { super(executor.getCorePoolSize(), executor.getMaximumPoolSize(), @@ -70,14 +71,15 @@ public void execute(Runnable command) { @Override protected void beforeExecute(Thread t, Runnable r) { - super.beforeExecute(t, r); AwareManager.beforeExecute(this, t, r); + super.beforeExecute(t, r); } @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); AwareManager.afterExecute(this, r, t); + ExecutorUtil.tryExecAfterExecute(r, t); } @Override @@ -94,4 +96,9 @@ public void setTaskWrappers(List taskWrappers) { public String getRejectHandlerType() { return rejectHandlerType; } + + @Override + public void setRejectHandlerType(String rejectHandlerType) { + this.rejectHandlerType = rejectHandlerType; + } } diff --git a/common/src/main/java/org/dromara/dynamictp/common/entity/DtpExecutorProps.java b/common/src/main/java/org/dromara/dynamictp/common/entity/DtpExecutorProps.java index 1bc3be82f..0cc26c980 100644 --- a/common/src/main/java/org/dromara/dynamictp/common/entity/DtpExecutorProps.java +++ b/common/src/main/java/org/dromara/dynamictp/common/entity/DtpExecutorProps.java @@ -51,30 +51,12 @@ public class DtpExecutorProps extends TpExecutorProps { private boolean fair = false; /** - * Thread name prefix. - */ - private String threadNamePrefix = "dtp"; - - /** - * Whether to wait for scheduled tasks to complete on shutdown, - * not interrupting running tasks and executing all tasks in the queue. - */ - private boolean waitForTasksToCompleteOnShutdown = true; - - /** - * The maximum number of seconds that this executor is supposed to block - * on shutdown in order to wait for remaining tasks to complete their execution - * before the rest of the container continues to shut down. - */ - private int awaitTerminationSeconds = 3; - - /** - * If enhance reject. + * Plugin names. */ - private boolean rejectEnhanced = true; + private Set pluginNames; /** - * Plugin names. + * If false, will not auto create dtpExecutor, default is true. */ - private Set pluginNames; + private boolean autoCreate = true; } diff --git a/common/src/main/java/org/dromara/dynamictp/common/entity/TpExecutorProps.java b/common/src/main/java/org/dromara/dynamictp/common/entity/TpExecutorProps.java index 08f2769bf..16ca9ae1a 100644 --- a/common/src/main/java/org/dromara/dynamictp/common/entity/TpExecutorProps.java +++ b/common/src/main/java/org/dromara/dynamictp/common/entity/TpExecutorProps.java @@ -46,9 +46,9 @@ public class TpExecutorProps { private String threadPoolAliasName; /** - * If false, will not auto create dtpExecutor, default is true. + * Thread name prefix. */ - private boolean autoCreateDtp = true; + private String threadNamePrefix = "dtp"; /** * CoreSize of ThreadPool. @@ -87,6 +87,11 @@ public class TpExecutorProps { */ private String rejectedHandlerType = RejectedTypeEnum.ABORT_POLICY.getName(); + /** + * If enhance reject. + */ + private boolean rejectEnhanced = true; + /** * If allow core thread timeout. */ @@ -127,6 +132,19 @@ public class TpExecutorProps { */ private long queueTimeout = 0; + /** + * Whether to wait for scheduled tasks to complete on shutdown, + * not interrupting running tasks and executing all tasks in the queue. + */ + private boolean waitForTasksToCompleteOnShutdown = true; + + /** + * The maximum number of seconds that this executor is supposed to block + * on shutdown in order to wait for remaining tasks to complete their execution + * before the rest of the container continues to shut down. + */ + private int awaitTerminationSeconds = 3; + /** * Task wrapper names. */ diff --git a/common/src/main/java/org/dromara/dynamictp/common/util/DtpPropertiesBinderUtil.java b/common/src/main/java/org/dromara/dynamictp/common/util/DtpPropertiesBinderUtil.java new file mode 100644 index 000000000..bf4cfaf7e --- /dev/null +++ b/common/src/main/java/org/dromara/dynamictp/common/util/DtpPropertiesBinderUtil.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.dromara.dynamictp.common.util; + +import cn.hutool.core.util.ReflectUtil; +import lombok.val; +import org.apache.commons.collections4.CollectionUtils; +import org.dromara.dynamictp.common.entity.DtpExecutorProps; +import org.dromara.dynamictp.common.entity.TpExecutorProps; +import org.dromara.dynamictp.common.properties.DtpProperties; +import org.springframework.core.env.Environment; + +import java.lang.reflect.Field; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import static org.dromara.dynamictp.common.constant.DynamicTpConst.AWARE_NAMES; +import static org.dromara.dynamictp.common.constant.DynamicTpConst.EXECUTORS_CONFIG_PREFIX; +import static org.dromara.dynamictp.common.constant.DynamicTpConst.GLOBAL_CONFIG_PREFIX; +import static org.dromara.dynamictp.common.constant.DynamicTpConst.MAIN_PROPERTIES_PREFIX; +import static org.dromara.dynamictp.common.constant.DynamicTpConst.NOTIFY_ITEMS; +import static org.dromara.dynamictp.common.constant.DynamicTpConst.PLATFORM_IDS; + +/** + * DtpPropertiesBinderUtil related + * + * @author yanhom + * @since 1.1.0 + */ +@SuppressWarnings("unchecked") +public final class DtpPropertiesBinderUtil { + + private DtpPropertiesBinderUtil() { + } + + /** + * Assign global environment variable to property + * + * @param source environment + * @param dtpProperties dtpProperties + */ + public static void tryResetWithGlobalConfig(Object source, DtpProperties dtpProperties) { + if (Objects.isNull(dtpProperties.getGlobalExecutorProps())) { + return; + } + if (CollectionUtils.isNotEmpty(dtpProperties.getExecutors())) { + tryResetCusExecutors(dtpProperties, source); + } + tryResetAdapterExecutors(dtpProperties, source); + } + + private static void tryResetCusExecutors(DtpProperties dtpProperties, Object source) { + val dtpPropsFields = ReflectionUtil.getAllFields(DtpExecutorProps.class); + int[] idx = {0}; + dtpProperties.getExecutors().forEach(executor -> { + dtpPropsFields.forEach(field -> { + String executorFieldKey = EXECUTORS_CONFIG_PREFIX + idx[0] + "]." + field.getName(); + setBasicField(source, field, executor, executorFieldKey); + }); + setListField(dtpProperties, executor); + val globalExecutorProps = dtpProperties.getGlobalExecutorProps(); + if (CollectionUtils.isEmpty(executor.getPluginNames()) && + CollectionUtils.isNotEmpty(globalExecutorProps.getPluginNames())) { + executor.setPluginNames(globalExecutorProps.getPluginNames()); + } + idx[0]++; + }); + } + + private static void tryResetAdapterExecutors(DtpProperties dtpProperties, Object source) { + val dtpPropertiesFields = ReflectionUtil.getAllFields(DtpProperties.class); + val tpExecutorPropFields = ReflectionUtil.getAllFields(TpExecutorProps.class); + dtpPropertiesFields.forEach(dtpPropertiesField -> { + val targetObj = ReflectUtil.getFieldValue(dtpProperties, dtpPropertiesField); + if (Objects.isNull(targetObj)) { + return; + } + if (dtpPropertiesField.getType().isAssignableFrom(TpExecutorProps.class)) { + tpExecutorPropFields.forEach(tpField -> setBasicField(source, tpField, dtpPropertiesField.getName(), targetObj)); + setListField(dtpProperties, targetObj); + } else if (dtpPropertiesField.getGenericType() instanceof ParameterizedType) { + ParameterizedType paramType = (ParameterizedType) dtpPropertiesField.getGenericType(); + Type[] argTypes = paramType.getActualTypeArguments(); + if (argTypes.length == 1 && argTypes[0].equals(TpExecutorProps.class)) { + List tpExecutorProps = (List) targetObj; + if (CollectionUtils.isEmpty(tpExecutorProps)) { + return; + } + int[] idx = {0}; + tpExecutorProps.forEach(tpProp -> { + tpExecutorPropFields.forEach(tpField -> setBasicField(source, tpField, dtpPropertiesField.getName(), tpProp, idx)); + setListField(dtpProperties, tpProp); + idx[0]++; + }); + } + } + }); + } + + private static Object getProperty(String key, Object environment) { + if (environment instanceof Environment) { + Environment env = (Environment) environment; + return env.getProperty(key); + } else if (environment instanceof Map) { + Map properties = (Map) environment; + return properties.get(key); + } + return null; + } + + private static void setBasicField(Object source, Field tpPropField, String targetObjName, Object targetObj, int[] idx) { + String executorFieldKey = MAIN_PROPERTIES_PREFIX + "." + targetObjName + "[" + idx[0] + "]." + tpPropField.getName(); + setBasicField(source, tpPropField, targetObj, executorFieldKey); + } + + private static void setBasicField(Object source, Field tpPropField, String targetObjName, Object targetObj) { + String executorFieldKey = MAIN_PROPERTIES_PREFIX + "." + targetObjName + "." + tpPropField.getName(); + setBasicField(source, tpPropField, targetObj, executorFieldKey); + } + + private static void setBasicField(Object source, Field tpPropField, Object targetObj, String executorFieldKey) { + Object executorFieldVal = getProperty(executorFieldKey, source); + if (Objects.nonNull(executorFieldVal)) { + return; + } + Object globalFieldVal = getProperty(GLOBAL_CONFIG_PREFIX + tpPropField.getName(), source); + if (Objects.isNull(globalFieldVal)) { + return; + } + ReflectUtil.setFieldValue(targetObj, tpPropField.getName(), globalFieldVal); + } + + private static void setListField(DtpProperties dtpProperties, Object fieldVal) { + val globalExecutorProps = dtpProperties.getGlobalExecutorProps(); + val taskWrappers = (Collection) ReflectUtil.getFieldValue(fieldVal, "taskWrapperNames"); + if (CollectionUtils.isEmpty(taskWrappers) && + CollectionUtils.isNotEmpty(globalExecutorProps.getTaskWrapperNames())) { + ReflectUtil.setFieldValue(fieldVal, "taskWrapperNames", globalExecutorProps.getTaskWrapperNames()); + } + val platformIds = (List) ReflectUtil.getFieldValue(fieldVal, PLATFORM_IDS); + if (CollectionUtils.isEmpty(platformIds) && + CollectionUtils.isNotEmpty(globalExecutorProps.getPlatformIds())) { + ReflectUtil.setFieldValue(fieldVal, PLATFORM_IDS, globalExecutorProps.getPlatformIds()); + } + + val notifyItems = (List) ReflectUtil.getFieldValue(fieldVal, NOTIFY_ITEMS); + if (CollectionUtils.isEmpty(notifyItems) && + CollectionUtils.isNotEmpty(globalExecutorProps.getNotifyItems())) { + ReflectUtil.setFieldValue(fieldVal, NOTIFY_ITEMS, globalExecutorProps.getNotifyItems()); + } + + val awareNames = (List) ReflectUtil.getFieldValue(fieldVal, AWARE_NAMES); + if (CollectionUtils.isEmpty(awareNames) && + CollectionUtils.isNotEmpty(globalExecutorProps.getAwareNames())) { + ReflectUtil.setFieldValue(fieldVal, AWARE_NAMES, globalExecutorProps.getAwareNames()); + } + } +} diff --git a/common/src/main/java/org/dromara/dynamictp/common/util/ExecutorUtil.java b/common/src/main/java/org/dromara/dynamictp/common/util/ExecutorUtil.java new file mode 100644 index 000000000..2e5ae9ae4 --- /dev/null +++ b/common/src/main/java/org/dromara/dynamictp/common/util/ExecutorUtil.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.dromara.dynamictp.common.util; + +import lombok.extern.slf4j.Slf4j; +import org.slf4j.MDC; + +import java.util.Objects; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; + +import static org.dromara.dynamictp.common.constant.DynamicTpConst.TRACE_ID; + +/** + * ExecutorUtil related + * + * @author yanhom + * @since 1.1.9 + */ +@Slf4j +public final class ExecutorUtil { + + private ExecutorUtil() { + } + + public static void tryExecAfterExecute(Runnable r, Throwable t) { + tryPrintError(r, t); + tryClearContext(); + } + + private static void tryPrintError(Runnable r, Throwable t) { + if (Objects.nonNull(t)) { + log.error("DynamicTp execute, thread {} throw exception, traceId {}", + Thread.currentThread(), MDC.get(TRACE_ID), t); + return; + } + if (r instanceof FutureTask) { + try { + Future future = (Future) r; + future.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Exception e) { + log.error("DynamicTp execute, thread {} throw exception, traceId {}", + Thread.currentThread(), MDC.get(TRACE_ID), e); + } + } + } + + public static void tryClearContext() { + MDC.remove(TRACE_ID); + } +} diff --git a/core/src/main/java/org/dromara/dynamictp/core/DtpRegistry.java b/core/src/main/java/org/dromara/dynamictp/core/DtpRegistry.java index cb79604bb..9dde3bcdb 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/DtpRegistry.java +++ b/core/src/main/java/org/dromara/dynamictp/core/DtpRegistry.java @@ -36,6 +36,7 @@ import org.dromara.dynamictp.core.aware.AwareManager; import org.dromara.dynamictp.core.converter.ExecutorConverter; import org.dromara.dynamictp.core.executor.DtpExecutor; +import org.dromara.dynamictp.core.executor.NamedThreadFactory; import org.dromara.dynamictp.core.notifier.manager.NoticeManager; import org.dromara.dynamictp.core.notifier.manager.NotifyHelper; import org.dromara.dynamictp.core.reject.RejectHandlerGetter; @@ -45,6 +46,7 @@ import org.dromara.dynamictp.core.support.task.wrapper.TaskWrappers; import org.springframework.context.event.ContextRefreshedEvent; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -244,9 +246,8 @@ private static void doRefreshCommon(ExecutorWrapper executorWrapper, DtpExecutor String currentRejectHandlerType = executor.getRejectHandlerType(); if (!Objects.equals(currentRejectHandlerType, props.getRejectedHandlerType())) { val rejectHandler = RejectHandlerGetter.buildRejectedHandler(props.getRejectedHandlerType()); - executor.setRejectedExecutionHandler(rejectHandler); + executorWrapper.setRejectHandler(rejectHandler); } - executorWrapper.setPreStartAllCoreThreads(props.isPreStartAllCoreThreads()); List taskWrappers = TaskWrappers.getInstance().getByNames(props.getTaskWrapperNames()); executorWrapper.setTaskWrappers(taskWrappers); @@ -255,6 +256,7 @@ private static void doRefreshCommon(ExecutorWrapper executorWrapper, DtpExecutor NotifyHelper.updateNotifyInfo(executorWrapper, props, dtpProperties.getPlatforms()); // update aware related AwareManager.refresh(executorWrapper, props); + updateWrapper(executorWrapper, props); } private static void doRefreshDtp(ExecutorWrapper executorWrapper, DtpExecutorProps props) { @@ -263,14 +265,29 @@ private static void doRefreshDtp(ExecutorWrapper executorWrapper, DtpExecutorPro if (StringUtils.isNotBlank(props.getThreadPoolAliasName())) { executor.setThreadPoolAliasName(props.getThreadPoolAliasName()); } + executor.setPreStartAllCoreThreads(props.isPreStartAllCoreThreads()); + if (executor.getThreadFactory() instanceof NamedThreadFactory) { + String prefix = ((NamedThreadFactory) executor.getThreadFactory()).getNamePrefix(); + if (!Objects.equals(prefix, props.getThreadNamePrefix())) { + ((NamedThreadFactory) executor.getThreadFactory()).setNamePrefix(props.getThreadNamePrefix()); + } + } + // update reject handler executor.setRejectEnhanced(props.isRejectEnhanced()); if (!Objects.equals(executor.getRejectHandlerType(), props.getRejectedHandlerType())) { executor.setRejectHandler(RejectHandlerGetter.buildRejectedHandler(props.getRejectedHandlerType())); } + + // update timeout related + executor.setRunTimeout(props.getRunTimeout()); + executor.setQueueTimeout(props.getQueueTimeout()); + executor.setTryInterrupt(props.isTryInterrupt()); + + // update shutdown related executor.setWaitForTasksToCompleteOnShutdown(props.isWaitForTasksToCompleteOnShutdown()); executor.setAwaitTerminationSeconds(props.getAwaitTerminationSeconds()); - executor.setPreStartAllCoreThreads(props.isPreStartAllCoreThreads()); + List taskWrappers = TaskWrappers.getInstance().getByNames(props.getTaskWrapperNames()); executor.setTaskWrappers(taskWrappers); @@ -278,14 +295,18 @@ private static void doRefreshDtp(ExecutorWrapper executorWrapper, DtpExecutorPro NotifyHelper.updateNotifyInfo(executor, props, dtpProperties.getPlatforms()); // update aware related AwareManager.refresh(executorWrapper, props); - updateWrapper(executorWrapper, executor); + updateWrapper(executorWrapper, props); } - private static void updateWrapper(ExecutorWrapper executorWrapper, DtpExecutor executor) { - executorWrapper.setThreadPoolAliasName(executor.getThreadPoolAliasName()); - executorWrapper.setNotifyItems(executor.getNotifyItems()); - executorWrapper.setPlatformIds(executor.getPlatformIds()); - executorWrapper.setNotifyEnabled(executor.isNotifyEnabled()); + private static void updateWrapper(ExecutorWrapper executorWrapper, DtpExecutorProps props) { + executorWrapper.setThreadPoolAliasName(props.getThreadPoolAliasName()); + executorWrapper.setNotifyItems(props.getNotifyItems()); + executorWrapper.setPlatformIds(props.getPlatformIds()); + executorWrapper.setNotifyEnabled(props.isNotifyEnabled()); + executorWrapper.setPreStartAllCoreThreads(props.isPreStartAllCoreThreads()); + executorWrapper.setRejectEnhanced(props.isRejectEnhanced()); + executorWrapper.setWaitForTasksToCompleteOnShutdown(props.isWaitForTasksToCompleteOnShutdown()); + executorWrapper.setAwaitTerminationSeconds(props.getAwaitTerminationSeconds()); } /** @@ -336,17 +357,17 @@ private static void updateQueueProps(ExecutorAdapter executor, DtpExecutorPro @Override protected void onContextRefreshedEvent(ContextRefreshedEvent event) { val executors = Optional.ofNullable(dtpProperties.getExecutors()).orElse(Collections.emptyList()); - Set remoteExecutors = Collections.emptySet(); + val registeredExecutors = Sets.newHashSet(EXECUTOR_REGISTRY.keySet()); + Collection remoteExecutors = Collections.emptySet(); if (CollectionUtils.isNotEmpty(executors)) { - remoteExecutors = executors.stream() + remoteExecutors = CollectionUtils.intersection(executors.stream() .map(DtpExecutorProps::getThreadPoolName) - .collect(Collectors.toSet()); + .collect(Collectors.toSet()), registeredExecutors); } - val registeredExecutors = Sets.newHashSet(EXECUTOR_REGISTRY.keySet()); val localExecutors = CollectionUtils.subtract(registeredExecutors, remoteExecutors); // refresh just for non-dtp executors - val nonDtpExecutors = executors.stream().filter(e -> !e.isAutoCreateDtp()).collect(toList()); + val nonDtpExecutors = executors.stream().filter(e -> !e.isAutoCreate()).collect(toList()); if (CollectionUtils.isNotEmpty(nonDtpExecutors)) { nonDtpExecutors.forEach(DtpRegistry::refresh); } diff --git a/core/src/main/java/org/dromara/dynamictp/core/aware/RejectHandlerAware.java b/core/src/main/java/org/dromara/dynamictp/core/aware/RejectHandlerAware.java index 5766f5b9b..027da044f 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/aware/RejectHandlerAware.java +++ b/core/src/main/java/org/dromara/dynamictp/core/aware/RejectHandlerAware.java @@ -31,4 +31,13 @@ public interface RejectHandlerAware extends DtpAware { * @return reject handler type */ String getRejectHandlerType(); + + /** + * Set reject handler type. + * + * @param rejectHandlerType reject handler type + */ + default void setRejectHandlerType(String rejectHandlerType) { + + } } diff --git a/core/src/main/java/org/dromara/dynamictp/core/executor/DtpExecutor.java b/core/src/main/java/org/dromara/dynamictp/core/executor/DtpExecutor.java index bcbab0e7b..2dd611c6a 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/executor/DtpExecutor.java +++ b/core/src/main/java/org/dromara/dynamictp/core/executor/DtpExecutor.java @@ -23,6 +23,7 @@ import lombok.val; import org.dromara.dynamictp.common.em.NotifyItemEnum; import org.dromara.dynamictp.common.entity.NotifyItem; +import org.dromara.dynamictp.common.util.ExecutorUtil; import org.dromara.dynamictp.core.aware.AwareManager; import org.dromara.dynamictp.core.aware.TaskEnhanceAware; import org.dromara.dynamictp.core.notifier.manager.NotifyHelper; @@ -30,22 +31,16 @@ import org.dromara.dynamictp.core.spring.SpringExecutor; import org.dromara.dynamictp.core.support.ExecutorAdapter; import org.dromara.dynamictp.core.support.task.wrapper.TaskWrapper; -import org.slf4j.MDC; import java.util.List; -import java.util.Objects; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.FutureTask; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import static org.dromara.dynamictp.common.constant.DynamicTpConst.TRACE_ID; - /** * Dynamic ThreadPoolExecutor, extending ThreadPoolExecutor, implements some new features * @@ -197,16 +192,15 @@ public void execute(Runnable command) { @Override protected void beforeExecute(Thread t, Runnable r) { - super.beforeExecute(t, r); AwareManager.beforeExecute(this, t, r); + super.beforeExecute(t, r); } @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); AwareManager.afterExecute(this, r, t); - tryPrintError(r, t); - clearContext(); + ExecutorUtil.tryExecAfterExecute(r, t); } @Override @@ -243,29 +237,6 @@ public void setRejectHandler(RejectedExecutionHandler handler) { setRejectedExecutionHandler(RejectHandlerGetter.getProxy(handler)); } - private void tryPrintError(Runnable r, Throwable t) { - if (Objects.nonNull(t)) { - log.error("DynamicTp execute, thread {} throw exception, traceId {}", - Thread.currentThread(), MDC.get(TRACE_ID), t); - return; - } - if (r instanceof FutureTask) { - try { - Future future = (Future) r; - future.get(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } catch (Exception e) { - log.error("DynamicTp execute, thread {} throw exception, traceId {}", - Thread.currentThread(), MDC.get(TRACE_ID), e); - } - } - } - - private void clearContext() { - MDC.remove(TRACE_ID); - } - public String getThreadPoolName() { return threadPoolName; } @@ -405,4 +376,9 @@ public void setAwaitTerminationSeconds(int awaitTerminationSeconds) { public void setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) { allowCoreThreadTimeOut(allowCoreThreadTimeOut); } + + @Override + public void preStartAllCoreThreads() { + super.prestartAllCoreThreads(); + } } diff --git a/core/src/main/java/org/dromara/dynamictp/core/executor/NamedThreadFactory.java b/core/src/main/java/org/dromara/dynamictp/core/executor/NamedThreadFactory.java index d5b8c5815..b101a821e 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/executor/NamedThreadFactory.java +++ b/core/src/main/java/org/dromara/dynamictp/core/executor/NamedThreadFactory.java @@ -31,9 +31,9 @@ @Slf4j public class NamedThreadFactory implements ThreadFactory { - private final ThreadGroup group; + private String namePrefix; - private final String namePrefix; + private final ThreadGroup group; /** * is daemon thread. @@ -78,4 +78,8 @@ public Thread newThread(Runnable r) { public String getNamePrefix() { return namePrefix; } + + public void setNamePrefix(String namePrefix) { + this.namePrefix = namePrefix; + } } diff --git a/core/src/main/java/org/dromara/dynamictp/core/spring/DtpBeanDefinitionRegistrar.java b/core/src/main/java/org/dromara/dynamictp/core/spring/DtpBeanDefinitionRegistrar.java index d9c39ee36..e149d2277 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/spring/DtpBeanDefinitionRegistrar.java +++ b/core/src/main/java/org/dromara/dynamictp/core/spring/DtpBeanDefinitionRegistrar.java @@ -84,12 +84,12 @@ public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, B BinderHelper.bindDtpProperties(environment, dtpProperties); val executors = dtpProperties.getExecutors(); if (CollectionUtils.isEmpty(executors)) { - log.warn("DynamicTp registrar, no executors are configured."); + log.info("DynamicTp registrar, no executors are configured."); return; } executors.forEach(e -> { - if (!e.isAutoCreateDtp()) { + if (!e.isAutoCreate()) { return; } Class executorTypeClass = ExecutorType.getClass(e.getExecutorType()); diff --git a/core/src/main/java/org/dromara/dynamictp/core/support/DtpLifecycleSupport.java b/core/src/main/java/org/dromara/dynamictp/core/support/DtpLifecycleSupport.java index 20ad0baae..ce7c95732 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/support/DtpLifecycleSupport.java +++ b/core/src/main/java/org/dromara/dynamictp/core/support/DtpLifecycleSupport.java @@ -18,7 +18,6 @@ package org.dromara.dynamictp.core.support; import lombok.extern.slf4j.Slf4j; -import org.dromara.dynamictp.core.executor.DtpExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.Objects; @@ -55,23 +54,15 @@ public static void initialize(ExecutorWrapper executorWrapper) { * @param executorWrapper executor wrapper */ public static void destroy(ExecutorWrapper executorWrapper) { - if (executorWrapper.isDtpExecutor()) { - destroy((DtpExecutor) executorWrapper.getExecutor()); - } else if (executorWrapper.isThreadPoolExecutor()) { - internalShutdown(((ThreadPoolExecutorAdapter) executorWrapper.getExecutor()).getOriginal(), + if (executorWrapper.isExecutorService()) { + ExecutorService executorService = (ExecutorService) executorWrapper.getExecutor().getOriginal(); + internalShutdown(executorService, executorWrapper.getThreadPoolName(), - true, - 0); + executorWrapper.isWaitForTasksToCompleteOnShutdown(), + executorWrapper.getAwaitTerminationSeconds()); } } - public static void destroy(DtpExecutor executor) { - internalShutdown(executor, - executor.getThreadPoolName(), - executor.isWaitForTasksToCompleteOnShutdown(), - executor.getAwaitTerminationSeconds()); - } - public static void shutdownGracefulAsync(ExecutorService executor, String threadPoolName, int timeout) { diff --git a/core/src/main/java/org/dromara/dynamictp/core/support/ExecutorWrapper.java b/core/src/main/java/org/dromara/dynamictp/core/support/ExecutorWrapper.java index e571b962a..71aa1b592 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/support/ExecutorWrapper.java +++ b/core/src/main/java/org/dromara/dynamictp/core/support/ExecutorWrapper.java @@ -22,16 +22,20 @@ import org.dromara.dynamictp.common.em.NotifyItemEnum; import org.dromara.dynamictp.common.entity.NotifyItem; import org.dromara.dynamictp.core.aware.AwareManager; +import org.dromara.dynamictp.core.aware.RejectHandlerAware; import org.dromara.dynamictp.core.aware.TaskEnhanceAware; import org.dromara.dynamictp.core.executor.DtpExecutor; import org.dromara.dynamictp.core.notifier.capture.CapturedExecutor; import org.dromara.dynamictp.core.notifier.manager.AlarmManager; +import org.dromara.dynamictp.core.reject.RejectHandlerGetter; import org.dromara.dynamictp.core.support.task.wrapper.TaskWrapper; import org.springframework.beans.BeanUtils; import java.util.List; import java.util.Set; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; /** @@ -79,15 +83,33 @@ public class ExecutorWrapper { private boolean preStartAllCoreThreads; /** - * Thread pool stat provider + * If enhance reject. */ - private ThreadPoolStatProvider threadPoolStatProvider; + private boolean rejectEnhanced = true; /** * Aware names */ private Set awareNames = Sets.newHashSet(); + /** + * Whether to wait for scheduled tasks to complete on shutdown, + * not interrupting running tasks and executing all tasks in the queue. + */ + protected boolean waitForTasksToCompleteOnShutdown = false; + + /** + * The maximum number of seconds that this executor is supposed to block + * on shutdown in order to wait for remaining tasks to complete their execution + * before the rest of the container continues to shut down. + */ + protected int awaitTerminationSeconds = 0; + + /** + * Thread pool stat provider + */ + private ThreadPoolStatProvider threadPoolStatProvider; + private ExecutorWrapper() { } @@ -97,14 +119,17 @@ private ExecutorWrapper() { * @param executor the DtpExecutor */ public ExecutorWrapper(DtpExecutor executor) { + this.executor = executor; this.threadPoolName = executor.getThreadPoolName(); this.threadPoolAliasName = executor.getThreadPoolAliasName(); - this.executor = executor; this.notifyItems = executor.getNotifyItems(); this.notifyEnabled = executor.isNotifyEnabled(); this.platformIds = executor.getPlatformIds(); this.awareNames = executor.getAwareNames(); this.preStartAllCoreThreads = executor.isPreStartAllCoreThreads(); + this.rejectEnhanced = executor.isRejectEnhanced(); + this.waitForTasksToCompleteOnShutdown = executor.isWaitForTasksToCompleteOnShutdown(); + this.awaitTerminationSeconds = executor.getAwaitTerminationSeconds(); this.threadPoolStatProvider = ThreadPoolStatProvider.of(this); } @@ -155,8 +180,7 @@ public ExecutorWrapper capture() { */ public void initialize() { if (isDtpExecutor()) { - DtpExecutor dtpExecutor = (DtpExecutor) getExecutor(); - dtpExecutor.initialize(); + ((DtpExecutor) getExecutor()).initialize(); AwareManager.register(this); } else if (isThreadPoolExecutor()) { AwareManager.register(this); @@ -175,6 +199,10 @@ public boolean isDtpExecutor() { return this.executor instanceof DtpExecutor; } + public boolean isExecutorService() { + return this.executor.getOriginal() instanceof ExecutorService; + } + /** * whether is ThreadPoolExecutor * @@ -194,4 +222,16 @@ public void setTaskWrappers(List taskWrappers) { ((TaskEnhanceAware) executor.getOriginal()).setTaskWrappers(taskWrappers); } } + + public void setRejectHandler(RejectedExecutionHandler handler) { + String rejectHandlerType = handler.getClass().getSimpleName(); + if (executor.getOriginal() instanceof RejectHandlerAware) { + ((RejectHandlerAware) executor.getOriginal()).setRejectHandlerType(rejectHandlerType); + } + if (isRejectEnhanced()) { + executor.setRejectedExecutionHandler(RejectHandlerGetter.getProxy(handler)); + } else { + executor.setRejectedExecutionHandler(handler); + } + } } diff --git a/core/src/main/java/org/dromara/dynamictp/core/support/ScheduledThreadPoolExecutorProxy.java b/core/src/main/java/org/dromara/dynamictp/core/support/ScheduledThreadPoolExecutorProxy.java index caaf57b9c..4228a7d11 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/support/ScheduledThreadPoolExecutorProxy.java +++ b/core/src/main/java/org/dromara/dynamictp/core/support/ScheduledThreadPoolExecutorProxy.java @@ -17,11 +17,13 @@ package org.dromara.dynamictp.core.support; +import org.dromara.dynamictp.common.util.ExecutorUtil; import org.dromara.dynamictp.core.aware.AwareManager; import org.dromara.dynamictp.core.aware.RejectHandlerAware; import org.dromara.dynamictp.core.aware.TaskEnhanceAware; import org.dromara.dynamictp.core.reject.RejectHandlerGetter; import org.dromara.dynamictp.core.support.task.wrapper.TaskWrapper; + import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledFuture; @@ -45,7 +47,7 @@ public class ScheduledThreadPoolExecutorProxy extends ScheduledThreadPoolExecuto /** * Reject handler type. */ - private final String rejectHandlerType; + private String rejectHandlerType; public ScheduledThreadPoolExecutorProxy(ScheduledThreadPoolExecutor executor) { super(executor.getCorePoolSize(), executor.getThreadFactory()); @@ -89,14 +91,15 @@ public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialD @Override protected void beforeExecute(Thread t, Runnable r) { - super.beforeExecute(t, r); AwareManager.beforeExecute(this, t, r); + super.beforeExecute(t, r); } @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); AwareManager.afterExecute(this, r, t); + ExecutorUtil.tryExecAfterExecute(r, t); } @Override @@ -104,6 +107,11 @@ public String getRejectHandlerType() { return rejectHandlerType; } + @Override + public void setRejectHandlerType(String rejectHandlerType) { + this.rejectHandlerType = rejectHandlerType; + } + @Override public List getTaskWrappers() { return taskWrappers; diff --git a/core/src/main/java/org/dromara/dynamictp/core/support/ThreadPoolExecutorProxy.java b/core/src/main/java/org/dromara/dynamictp/core/support/ThreadPoolExecutorProxy.java index 00396b077..0a3df0972 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/support/ThreadPoolExecutorProxy.java +++ b/core/src/main/java/org/dromara/dynamictp/core/support/ThreadPoolExecutorProxy.java @@ -17,6 +17,7 @@ package org.dromara.dynamictp.core.support; +import org.dromara.dynamictp.common.util.ExecutorUtil; import org.dromara.dynamictp.core.aware.AwareManager; import org.dromara.dynamictp.core.aware.RejectHandlerAware; import org.dromara.dynamictp.core.aware.TaskEnhanceAware; @@ -43,7 +44,7 @@ public class ThreadPoolExecutorProxy extends ThreadPoolExecutor implements TaskE /** * Reject handler type. */ - private final String rejectHandlerType; + private String rejectHandlerType; public ThreadPoolExecutorProxy(ThreadPoolExecutor executor) { super(executor.getCorePoolSize(), executor.getMaximumPoolSize(), @@ -63,14 +64,15 @@ public void execute(Runnable command) { @Override protected void beforeExecute(Thread t, Runnable r) { - super.beforeExecute(t, r); AwareManager.beforeExecute(this, t, r); + super.beforeExecute(t, r); } @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); AwareManager.afterExecute(this, r, t); + ExecutorUtil.tryExecAfterExecute(r, t); } @Override @@ -87,4 +89,9 @@ public void setTaskWrappers(List taskWrappers) { public String getRejectHandlerType() { return rejectHandlerType; } + + @Override + public void setRejectHandlerType(String rejectHandlerType) { + this.rejectHandlerType = rejectHandlerType; + } } diff --git a/core/src/main/java/org/dromara/dynamictp/core/support/task/runnable/EnhancedRunnable.java b/core/src/main/java/org/dromara/dynamictp/core/support/task/runnable/EnhancedRunnable.java index feeae7c65..8445d7cb3 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/support/task/runnable/EnhancedRunnable.java +++ b/core/src/main/java/org/dromara/dynamictp/core/support/task/runnable/EnhancedRunnable.java @@ -18,6 +18,7 @@ package org.dromara.dynamictp.core.support.task.runnable; import lombok.extern.slf4j.Slf4j; +import org.dromara.dynamictp.common.util.ExecutorUtil; import org.dromara.dynamictp.core.aware.AwareManager; import java.util.Objects; import java.util.concurrent.Executor; @@ -58,6 +59,7 @@ public void run() { throw e; } finally { AwareManager.afterExecute(executor, runnable, t); + ExecutorUtil.tryExecAfterExecute(runnable, t); } } } diff --git a/starter/starter-adapter/starter-adapter-webserver/src/main/java/org/dromara/dynamictp/starter/adapter/webserver/tomcat/TomcatExecutorProxy.java b/starter/starter-adapter/starter-adapter-webserver/src/main/java/org/dromara/dynamictp/starter/adapter/webserver/tomcat/TomcatExecutorProxy.java index 4fbd824c5..9e3ea026c 100644 --- a/starter/starter-adapter/starter-adapter-webserver/src/main/java/org/dromara/dynamictp/starter/adapter/webserver/tomcat/TomcatExecutorProxy.java +++ b/starter/starter-adapter/starter-adapter-webserver/src/main/java/org/dromara/dynamictp/starter/adapter/webserver/tomcat/TomcatExecutorProxy.java @@ -20,6 +20,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.tomcat.util.threads.TaskQueue; import org.apache.tomcat.util.threads.ThreadPoolExecutor; +import org.dromara.dynamictp.common.util.ExecutorUtil; import org.dromara.dynamictp.common.util.ReflectionUtil; import org.dromara.dynamictp.core.aware.AwareManager; import org.dromara.dynamictp.core.aware.RejectHandlerAware; @@ -86,14 +87,15 @@ public void execute(Runnable command) { @Override protected void beforeExecute(Thread t, Runnable r) { - super.beforeExecute(t, r); AwareManager.beforeExecute(this, t, r); + super.beforeExecute(t, r); } @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); AwareManager.afterExecute(this, r, t); + ExecutorUtil.tryExecAfterExecute(r, t); } @Override diff --git a/starter/starter-common/src/main/java/org/dromara/dynamictp/starter/common/binder/SpringBootPropertiesBinder.java b/starter/starter-common/src/main/java/org/dromara/dynamictp/starter/common/binder/SpringBootPropertiesBinder.java index b177936d0..93e6cb083 100644 --- a/starter/starter-common/src/main/java/org/dromara/dynamictp/starter/common/binder/SpringBootPropertiesBinder.java +++ b/starter/starter-common/src/main/java/org/dromara/dynamictp/starter/common/binder/SpringBootPropertiesBinder.java @@ -17,14 +17,10 @@ package org.dromara.dynamictp.starter.common.binder; -import cn.hutool.core.util.ReflectUtil; import lombok.extern.slf4j.Slf4j; -import lombok.val; -import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; -import org.dromara.dynamictp.common.entity.DtpExecutorProps; import org.dromara.dynamictp.common.properties.DtpProperties; -import org.dromara.dynamictp.common.util.ReflectionUtil; +import org.dromara.dynamictp.common.util.DtpPropertiesBinderUtil; import org.dromara.dynamictp.core.spring.PropertiesBinder; import org.springframework.beans.MutablePropertyValues; import org.springframework.beans.PropertyValues; @@ -39,10 +35,7 @@ import java.lang.reflect.Constructor; import java.lang.reflect.Method; import java.util.Map; -import java.util.Objects; -import static org.dromara.dynamictp.common.constant.DynamicTpConst.EXECUTORS_CONFIG_PREFIX; -import static org.dromara.dynamictp.common.constant.DynamicTpConst.GLOBAL_CONFIG_PREFIX; import static org.dromara.dynamictp.common.constant.DynamicTpConst.MAIN_PROPERTIES_PREFIX; /** @@ -52,7 +45,6 @@ * @since 1.0.3 **/ @Slf4j -@SuppressWarnings("all") public class SpringBootPropertiesBinder implements PropertiesBinder { @Override @@ -81,7 +73,7 @@ public void bindDtpProperties(Environment environment, DtpProperties dtpProperti @Override public void afterBind(Object source, DtpProperties dtpProperties) { - tryResetWithGlobalConfig(source, dtpProperties); + DtpPropertiesBinderUtil.tryResetWithGlobalConfig(source, dtpProperties); } private void doBindIn2X(Map properties, DtpProperties dtpProperties) { @@ -132,70 +124,5 @@ private void doBindIn1X(Map properties, DtpProperties dtpProperties) { throw new RuntimeException(e); } } - - /** - * Assign global environment variable to property - * - * @param environment - * @param dtpProperties - */ - private void tryResetWithGlobalConfig(Object source, DtpProperties dtpProperties) { - if (Objects.isNull(dtpProperties.getGlobalExecutorProps()) || - CollectionUtils.isEmpty(dtpProperties.getExecutors())) { - return; - } - val fields = ReflectionUtil.getAllFields(DtpExecutorProps.class); - if (CollectionUtils.isEmpty(fields)) { - return; - } - - final int[] executorIndex = {0}; - dtpProperties.getExecutors().forEach(executor -> { - fields.forEach(field -> { - Object executorFieldVal = getProperty(EXECUTORS_CONFIG_PREFIX + executorIndex[0] + "]." + field.getName(), source); - if (Objects.nonNull(executorFieldVal)) { - return; - } - Object globalFieldVal = getProperty(GLOBAL_CONFIG_PREFIX + field.getName(), source); - if (Objects.isNull(globalFieldVal)) { - return; - } - ReflectUtil.setFieldValue(executor, field, globalFieldVal); - }); - - val globalExecutorProps = dtpProperties.getGlobalExecutorProps(); - if (CollectionUtils.isEmpty(executor.getTaskWrapperNames()) && - CollectionUtils.isNotEmpty(globalExecutorProps.getTaskWrapperNames())) { - executor.setTaskWrapperNames(globalExecutorProps.getTaskWrapperNames()); - } - if (CollectionUtils.isEmpty(executor.getPlatformIds()) && - CollectionUtils.isNotEmpty(globalExecutorProps.getPlatformIds())) { - executor.setPlatformIds(globalExecutorProps.getPlatformIds()); - } - if (CollectionUtils.isEmpty(executor.getNotifyItems()) && - CollectionUtils.isNotEmpty(globalExecutorProps.getNotifyItems())) { - executor.setNotifyItems(globalExecutorProps.getNotifyItems()); - } - if (CollectionUtils.isEmpty(executor.getAwareNames()) && - CollectionUtils.isNotEmpty(globalExecutorProps.getAwareNames())) { - executor.setAwareNames(globalExecutorProps.getAwareNames()); - } - if (CollectionUtils.isEmpty(executor.getPluginNames()) && - CollectionUtils.isNotEmpty(globalExecutorProps.getPluginNames())) { - executor.setPluginNames(globalExecutorProps.getPluginNames()); - } - executorIndex[0]++; - }); - } - - private Object getProperty(String key, Object environment) { - if (environment instanceof Environment) { - Environment env = (Environment) environment; - return env.getProperty(key); - } else if (environment instanceof Map) { - Map properties = (Map) environment; - return properties.get(key); - } - return null; - } } +