Skip to content

Commit

Permalink
Merge pull request #479 from dromara/dev
Browse files Browse the repository at this point in the history
[ISSUE #477 #478] support reset adapter tp prop with global config, and support dynamic update more fields for ExecutorWrapper
  • Loading branch information
yanhom1314 authored Sep 22, 2024
2 parents 94b1b53 + eefda15 commit dd016f7
Show file tree
Hide file tree
Showing 18 changed files with 424 additions and 178 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.dubbo.common.threadpool.support.eager.EagerThreadPoolExecutor;
import org.apache.dubbo.common.threadpool.support.eager.TaskQueue;
import org.dromara.dynamictp.common.util.ExecutorUtil;
import org.dromara.dynamictp.core.aware.AwareManager;
import org.dromara.dynamictp.core.aware.RejectHandlerAware;
import org.dromara.dynamictp.core.aware.TaskEnhanceAware;
Expand All @@ -44,7 +45,7 @@ public class EagerThreadPoolExecutorProxy extends EagerThreadPoolExecutor implem
/**
* Reject handler type.
*/
private final String rejectHandlerType;
private String rejectHandlerType;

public EagerThreadPoolExecutorProxy(EagerThreadPoolExecutor executor) {
super(executor.getCorePoolSize(), executor.getMaximumPoolSize(),
Expand All @@ -65,14 +66,15 @@ public void execute(Runnable command) {

@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
AwareManager.beforeExecute(this, t, r);
super.beforeExecute(t, r);
}

@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
AwareManager.afterExecute(this, r, t);
ExecutorUtil.tryExecAfterExecute(r, t);
}

@Override
Expand All @@ -89,4 +91,9 @@ public void setTaskWrappers(List<TaskWrapper> taskWrappers) {
public String getRejectHandlerType() {
return rejectHandlerType;
}

@Override
public void setRejectHandlerType(String rejectHandlerType) {
this.rejectHandlerType = rejectHandlerType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.dromara.dynamictp.adapter.motan;

import com.weibo.api.motan.transport.netty.StandardThreadExecutor;
import org.dromara.dynamictp.common.util.ExecutorUtil;
import org.dromara.dynamictp.common.util.ReflectionUtil;
import org.dromara.dynamictp.core.aware.AwareManager;
import org.dromara.dynamictp.core.aware.RejectHandlerAware;
Expand All @@ -42,7 +43,7 @@ public class StandardThreadExecutorProxy extends StandardThreadExecutor implemen
*/
private List<TaskWrapper> taskWrappers;

private final String rejectHandlerType;
private String rejectHandlerType;

public StandardThreadExecutorProxy(StandardThreadExecutor executor) {
super(executor.getCorePoolSize(), executor.getMaximumPoolSize(),
Expand Down Expand Up @@ -70,14 +71,15 @@ public void execute(Runnable command) {

@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
AwareManager.beforeExecute(this, t, r);
super.beforeExecute(t, r);
}

@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
AwareManager.afterExecute(this, r, t);
ExecutorUtil.tryExecAfterExecute(r, t);
}

@Override
Expand All @@ -94,4 +96,9 @@ public void setTaskWrappers(List<TaskWrapper> taskWrappers) {
public String getRejectHandlerType() {
return rejectHandlerType;
}

@Override
public void setRejectHandlerType(String rejectHandlerType) {
this.rejectHandlerType = rejectHandlerType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,30 +51,12 @@ public class DtpExecutorProps extends TpExecutorProps {
private boolean fair = false;

/**
* Thread name prefix.
*/
private String threadNamePrefix = "dtp";

/**
* Whether to wait for scheduled tasks to complete on shutdown,
* not interrupting running tasks and executing all tasks in the queue.
*/
private boolean waitForTasksToCompleteOnShutdown = true;

/**
* The maximum number of seconds that this executor is supposed to block
* on shutdown in order to wait for remaining tasks to complete their execution
* before the rest of the container continues to shut down.
*/
private int awaitTerminationSeconds = 3;

/**
* If enhance reject.
* Plugin names.
*/
private boolean rejectEnhanced = true;
private Set<String> pluginNames;

/**
* Plugin names.
* If false, will not auto create dtpExecutor, default is true.
*/
private Set<String> pluginNames;
private boolean autoCreate = true;
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ public class TpExecutorProps {
private String threadPoolAliasName;

/**
* If false, will not auto create dtpExecutor, default is true.
* Thread name prefix.
*/
private boolean autoCreateDtp = true;
private String threadNamePrefix = "dtp";

/**
* CoreSize of ThreadPool.
Expand Down Expand Up @@ -87,6 +87,11 @@ public class TpExecutorProps {
*/
private String rejectedHandlerType = RejectedTypeEnum.ABORT_POLICY.getName();

/**
* If enhance reject.
*/
private boolean rejectEnhanced = true;

/**
* If allow core thread timeout.
*/
Expand Down Expand Up @@ -127,6 +132,19 @@ public class TpExecutorProps {
*/
private long queueTimeout = 0;

/**
* Whether to wait for scheduled tasks to complete on shutdown,
* not interrupting running tasks and executing all tasks in the queue.
*/
private boolean waitForTasksToCompleteOnShutdown = true;

/**
* The maximum number of seconds that this executor is supposed to block
* on shutdown in order to wait for remaining tasks to complete their execution
* before the rest of the container continues to shut down.
*/
private int awaitTerminationSeconds = 3;

/**
* Task wrapper names.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.dromara.dynamictp.common.util;

import cn.hutool.core.util.ReflectUtil;
import lombok.val;
import org.apache.commons.collections4.CollectionUtils;
import org.dromara.dynamictp.common.entity.DtpExecutorProps;
import org.dromara.dynamictp.common.entity.TpExecutorProps;
import org.dromara.dynamictp.common.properties.DtpProperties;
import org.springframework.core.env.Environment;

import java.lang.reflect.Field;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import static org.dromara.dynamictp.common.constant.DynamicTpConst.AWARE_NAMES;
import static org.dromara.dynamictp.common.constant.DynamicTpConst.EXECUTORS_CONFIG_PREFIX;
import static org.dromara.dynamictp.common.constant.DynamicTpConst.GLOBAL_CONFIG_PREFIX;
import static org.dromara.dynamictp.common.constant.DynamicTpConst.MAIN_PROPERTIES_PREFIX;
import static org.dromara.dynamictp.common.constant.DynamicTpConst.NOTIFY_ITEMS;
import static org.dromara.dynamictp.common.constant.DynamicTpConst.PLATFORM_IDS;

/**
* DtpPropertiesBinderUtil related
*
* @author yanhom
* @since 1.1.0
*/
@SuppressWarnings("unchecked")
public final class DtpPropertiesBinderUtil {

private DtpPropertiesBinderUtil() {
}

/**
* Assign global environment variable to property
*
* @param source environment
* @param dtpProperties dtpProperties
*/
public static void tryResetWithGlobalConfig(Object source, DtpProperties dtpProperties) {
if (Objects.isNull(dtpProperties.getGlobalExecutorProps())) {
return;
}
if (CollectionUtils.isNotEmpty(dtpProperties.getExecutors())) {
tryResetCusExecutors(dtpProperties, source);
}
tryResetAdapterExecutors(dtpProperties, source);
}

private static void tryResetCusExecutors(DtpProperties dtpProperties, Object source) {
val dtpPropsFields = ReflectionUtil.getAllFields(DtpExecutorProps.class);
int[] idx = {0};
dtpProperties.getExecutors().forEach(executor -> {
dtpPropsFields.forEach(field -> {
String executorFieldKey = EXECUTORS_CONFIG_PREFIX + idx[0] + "]." + field.getName();
setBasicField(source, field, executor, executorFieldKey);
});
setListField(dtpProperties, executor);
val globalExecutorProps = dtpProperties.getGlobalExecutorProps();
if (CollectionUtils.isEmpty(executor.getPluginNames()) &&
CollectionUtils.isNotEmpty(globalExecutorProps.getPluginNames())) {
executor.setPluginNames(globalExecutorProps.getPluginNames());
}
idx[0]++;
});
}

private static void tryResetAdapterExecutors(DtpProperties dtpProperties, Object source) {
val dtpPropertiesFields = ReflectionUtil.getAllFields(DtpProperties.class);
val tpExecutorPropFields = ReflectionUtil.getAllFields(TpExecutorProps.class);
dtpPropertiesFields.forEach(dtpPropertiesField -> {
val targetObj = ReflectUtil.getFieldValue(dtpProperties, dtpPropertiesField);
if (Objects.isNull(targetObj)) {
return;
}
if (dtpPropertiesField.getType().isAssignableFrom(TpExecutorProps.class)) {
tpExecutorPropFields.forEach(tpField -> setBasicField(source, tpField, dtpPropertiesField.getName(), targetObj));
setListField(dtpProperties, targetObj);
} else if (dtpPropertiesField.getGenericType() instanceof ParameterizedType) {
ParameterizedType paramType = (ParameterizedType) dtpPropertiesField.getGenericType();
Type[] argTypes = paramType.getActualTypeArguments();
if (argTypes.length == 1 && argTypes[0].equals(TpExecutorProps.class)) {
List<TpExecutorProps> tpExecutorProps = (List<TpExecutorProps>) targetObj;
if (CollectionUtils.isEmpty(tpExecutorProps)) {
return;
}
int[] idx = {0};
tpExecutorProps.forEach(tpProp -> {
tpExecutorPropFields.forEach(tpField -> setBasicField(source, tpField, dtpPropertiesField.getName(), tpProp, idx));
setListField(dtpProperties, tpProp);
idx[0]++;
});
}
}
});
}

private static Object getProperty(String key, Object environment) {
if (environment instanceof Environment) {
Environment env = (Environment) environment;
return env.getProperty(key);
} else if (environment instanceof Map) {
Map<?, Object> properties = (Map<?, Object>) environment;
return properties.get(key);
}
return null;
}

private static void setBasicField(Object source, Field tpPropField, String targetObjName, Object targetObj, int[] idx) {
String executorFieldKey = MAIN_PROPERTIES_PREFIX + "." + targetObjName + "[" + idx[0] + "]." + tpPropField.getName();
setBasicField(source, tpPropField, targetObj, executorFieldKey);
}

private static void setBasicField(Object source, Field tpPropField, String targetObjName, Object targetObj) {
String executorFieldKey = MAIN_PROPERTIES_PREFIX + "." + targetObjName + "." + tpPropField.getName();
setBasicField(source, tpPropField, targetObj, executorFieldKey);
}

private static void setBasicField(Object source, Field tpPropField, Object targetObj, String executorFieldKey) {
Object executorFieldVal = getProperty(executorFieldKey, source);
if (Objects.nonNull(executorFieldVal)) {
return;
}
Object globalFieldVal = getProperty(GLOBAL_CONFIG_PREFIX + tpPropField.getName(), source);
if (Objects.isNull(globalFieldVal)) {
return;
}
ReflectUtil.setFieldValue(targetObj, tpPropField.getName(), globalFieldVal);
}

private static void setListField(DtpProperties dtpProperties, Object fieldVal) {
val globalExecutorProps = dtpProperties.getGlobalExecutorProps();
val taskWrappers = (Collection<?>) ReflectUtil.getFieldValue(fieldVal, "taskWrapperNames");
if (CollectionUtils.isEmpty(taskWrappers) &&
CollectionUtils.isNotEmpty(globalExecutorProps.getTaskWrapperNames())) {
ReflectUtil.setFieldValue(fieldVal, "taskWrapperNames", globalExecutorProps.getTaskWrapperNames());
}
val platformIds = (List<?>) ReflectUtil.getFieldValue(fieldVal, PLATFORM_IDS);
if (CollectionUtils.isEmpty(platformIds) &&
CollectionUtils.isNotEmpty(globalExecutorProps.getPlatformIds())) {
ReflectUtil.setFieldValue(fieldVal, PLATFORM_IDS, globalExecutorProps.getPlatformIds());
}

val notifyItems = (List<?>) ReflectUtil.getFieldValue(fieldVal, NOTIFY_ITEMS);
if (CollectionUtils.isEmpty(notifyItems) &&
CollectionUtils.isNotEmpty(globalExecutorProps.getNotifyItems())) {
ReflectUtil.setFieldValue(fieldVal, NOTIFY_ITEMS, globalExecutorProps.getNotifyItems());
}

val awareNames = (List<?>) ReflectUtil.getFieldValue(fieldVal, AWARE_NAMES);
if (CollectionUtils.isEmpty(awareNames) &&
CollectionUtils.isNotEmpty(globalExecutorProps.getAwareNames())) {
ReflectUtil.setFieldValue(fieldVal, AWARE_NAMES, globalExecutorProps.getAwareNames());
}
}
}
Loading

0 comments on commit dd016f7

Please sign in to comment.