Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EventMesh function connector runtime #4858

Merged
merged 18 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
File renamed without changes.
3 changes: 2 additions & 1 deletion .github/workflows/stale.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
stale:
runs-on: ubuntu-latest
steps:
- uses: actions/stale@v9
- uses: actions/stale@v8
with:
days-before-issue-stale: 90
days-before-pr-stale: 60
Expand All @@ -47,3 +47,4 @@ jobs:
exempt-issue-labels: 'pinned,discussion,help wanted,WIP,weopen-star,GLCC,summer of code'
exempt-pr-labels: 'help wanted,dependencies'
exempt-all-milestones: true # Exempt all issues/PRs with milestones from stale
operations-per-run: 30
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ subprojects {
dependency "org.projectlombok:lombok:1.18.22"
dependency "com.github.seancfoley:ipaddress:5.3.3"
dependency "javax.annotation:javax.annotation-api:1.3.2"
dependency "com.alibaba:fastjson:1.2.83"
dependency "com.alibaba.fastjson2:fastjson2:2.0.48"

dependency "software.amazon.awssdk:s3:2.20.29"
dependency "com.github.rholder:guava-retrying:2.0.0"
Expand Down
2 changes: 2 additions & 0 deletions eventmesh-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ dependencies {
api "io.cloudevents:cloudevents-core"
api "io.cloudevents:cloudevents-json-jackson"

api "com.alibaba.fastjson2:fastjson2"

implementation "org.apache.logging.log4j:log4j-api"
implementation "org.apache.logging.log4j:log4j-core"
implementation "org.apache.logging.log4j:log4j-slf4j2-impl"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,12 @@ public EventMeshThreadFactory(final String threadNamePrefix) {
public Thread newThread(@Nonnull final Runnable runnable) {

StringBuilder threadName = new StringBuilder(threadNamePrefix);
if (null != threadIndex) {
if (threadIndex != null) {
threadName.append("-").append(threadIndex.incrementAndGet());
}
Thread thread = new Thread(runnable, threadName.toString());
thread.setDaemon(daemon);
if (null != priority) {
if (priority != null) {
thread.setPriority(priority);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,94 +34,85 @@
@Config(prefix = "eventMesh")
public class CommonConfiguration {

@ConfigFiled(field = "sysid", beNumber = true, notEmpty = true)
@ConfigField(field = "sysid", beNumber = true, notEmpty = true)
private String sysID = "5477";

@ConfigFiled(field = "server.env", notEmpty = true)
@ConfigField(field = "server.env", notEmpty = true)
private String eventMeshEnv = "P";

@ConfigFiled(field = "server.idc", notEmpty = true)
@ConfigField(field = "server.idc", notEmpty = true)
private String eventMeshIDC = "FT";

@ConfigFiled(field = "server.name", notEmpty = true)
@ConfigField(field = "server.name", notEmpty = true)
private String eventMeshName = "";

@ConfigFiled(field = "server.cluster", notEmpty = true)
@ConfigField(field = "server.cluster", notEmpty = true)
private String eventMeshCluster = "LS";

@ConfigFiled(field = "server.hostIp", reload = true)
@ConfigField(field = "server.hostIp", reload = true)
private String eventMeshServerIp = null;

@ConfigFiled(field = "metaStorage.plugin.server-addr", notEmpty = true)
@ConfigField(field = "metaStorage.plugin.server-addr", notEmpty = true)
private String metaStorageAddr = "";

@ConfigFiled(field = "metaStorage.plugin.type", notEmpty = true)
@ConfigField(field = "metaStorage.plugin.type", notEmpty = true)
private String eventMeshMetaStoragePluginType = "nacos";

@ConfigFiled(field = "metaStorage.plugin.username")
@ConfigField(field = "metaStorage.plugin.username")
private String eventMeshMetaStoragePluginUsername = "";

@ConfigFiled(field = "metaStorage.plugin.password")
@ConfigField(field = "metaStorage.plugin.password")
private String eventMeshMetaStoragePluginPassword = "";

@ConfigFiled(field = "metaStorage.plugin.metaStorageIntervalInMills")
private Integer eventMeshMetaStorageIntervalInMills = 10 * 1000;

@ConfigFiled(field = "metaStorage.plugin.fetchMetaStorageAddrIntervalInMills")
private Integer eventMeshFetchMetaStorageAddrInterval = 10 * 1000;

@ConfigFiled(field = "metaStorage.plugin.enabled")
@ConfigField(field = "metaStorage.plugin.enabled")
private boolean eventMeshServerMetaStorageEnable = false;

@ConfigFiled(field = "trace.plugin", notEmpty = true)
@ConfigField(field = "trace.plugin", notEmpty = true)
private String eventMeshTracePluginType;

@ConfigFiled(field = "metrics.plugin", notEmpty = true)
@ConfigField(field = "metrics.plugin", notEmpty = true)
private List<String> eventMeshMetricsPluginType;

@ConfigFiled(field = "security.plugin.type", notEmpty = true)
@ConfigField(field = "security.plugin.type", notEmpty = true)
private String eventMeshSecurityPluginType = "security";

@ConfigFiled(field = "connector.plugin.type", notEmpty = true)
private String eventMeshConnectorPluginType = "rocketmq";

@ConfigFiled(field = "storage.plugin.type", notEmpty = true)
private String eventMeshStoragePluginType = "rocketmq";
@ConfigField(field = "storage.plugin.type", notEmpty = true)
private String eventMeshStoragePluginType = "standalone";

@ConfigFiled(field = "security.validation.type.token", notEmpty = true)
@ConfigField(field = "security.validation.type.token", notEmpty = true)
private boolean eventMeshSecurityValidateTypeToken = false;

@ConfigFiled(field = "server.trace.enabled")
@ConfigField(field = "server.trace.enabled")
private boolean eventMeshServerTraceEnable = false;

@ConfigFiled(field = "server.security.enabled")
@ConfigField(field = "server.security.enabled")
private boolean eventMeshServerSecurityEnable = false;

@ConfigFiled(field = "security.publickey")
@ConfigField(field = "security.publickey")
private String eventMeshSecurityPublickey = "";

@ConfigFiled(field = "server.provide.protocols", reload = true)
@ConfigField(field = "server.provide.protocols", reload = true)
private List<String> eventMeshProvideServerProtocols;

@ConfigFiled(reload = true)
@ConfigField(reload = true)
private String eventMeshWebhookOrigin;

@ConfigFiled(reload = true)
@ConfigField(reload = true)
private String meshGroup;

@ConfigFiled(field = "server.retry.plugin.type")
@ConfigField(field = "server.retry.plugin.type")
private String eventMeshRetryPluginType = Constants.DEFAULT;

@ConfigFiled(field = "registry.plugin.server-addr", notEmpty = true)
@ConfigField(field = "registry.plugin.server-addr", notEmpty = true)
private String registryAddr = "";

@ConfigFiled(field = "registry.plugin.type", notEmpty = true)
@ConfigField(field = "registry.plugin.type", notEmpty = true)
private String eventMeshRegistryPluginType = "nacos";

@ConfigFiled(field = "registry.plugin.username")
@ConfigField(field = "registry.plugin.username")
private String eventMeshRegistryPluginUsername = "";

@ConfigFiled(field = "registry.plugin.password")
@ConfigField(field = "registry.plugin.password")
private String eventMeshRegistryPluginPassword = "";

public void reload() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.FIELD})
public @interface ConfigFiled {
public @interface ConfigField {

/**
* @return The key name of the configuration file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.eventmesh.common.config.convert;

import org.apache.eventmesh.common.config.ConfigFiled;
import org.apache.eventmesh.common.config.ConfigField;

import org.apache.commons.lang3.StringUtils;

Expand All @@ -44,7 +44,7 @@ default boolean canHandleNullValue() {
/**
* @return The value converter needs
*/
default Object processFieldValue(ConvertInfo convertInfo, String key, ConfigFiled configFiled) {
default Object processFieldValue(ConvertInfo convertInfo, String key, ConfigField configField) {
Properties properties = convertInfo.getProperties();
String value = properties.getProperty(key);

Expand All @@ -54,14 +54,14 @@ default Object processFieldValue(ConvertInfo convertInfo, String key, ConfigFile

value = value.trim();

boolean findEnv = configFiled.findEnv();
String fieldName = configFiled.field();
boolean findEnv = configField.findEnv();
String fieldName = configField.field();

if (StringUtils.isBlank(value) && !StringUtils.isBlank(fieldName) && findEnv) {
value = Optional.ofNullable(System.getProperty(fieldName)).orElse(System.getenv(fieldName));
}

if (StringUtils.isBlank(value) && configFiled.notEmpty()) {
if (StringUtils.isBlank(value) && configField.notEmpty()) {
throw new RuntimeException(key + " can't be empty!");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.eventmesh.common.config.convert;

import org.apache.eventmesh.common.config.ConfigFiled;
import org.apache.eventmesh.common.config.ConfigField;
import org.apache.eventmesh.common.config.convert.converter.BaseDataTypeConverter;
import org.apache.eventmesh.common.config.convert.converter.DateConverter;
import org.apache.eventmesh.common.config.convert.converter.EnumConverter;
Expand Down Expand Up @@ -96,9 +96,9 @@ public static void register(ConvertValue<?> convertValue, Class<?>... clazzs) {
*/
public static ConvertValue<?> getFieldConverter(Field field) {
Class<?> clazz = field.getType();
ConfigFiled configFiled = field.getAnnotation(ConfigFiled.class);
ConfigField configField = field.getAnnotation(ConfigField.class);

Class<?> converter1 = configFiled.converter();
Class<?> converter1 = configField.converter();
if (!converter1.equals(ConvertValue.DefaultConverter.class)) {
if (!classToConverter.containsKey(converter1)) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.eventmesh.common.config.convert.converter;

import org.apache.eventmesh.common.config.Config;
import org.apache.eventmesh.common.config.ConfigFiled;
import org.apache.eventmesh.common.config.ConfigField;
import org.apache.eventmesh.common.config.ConfigInfo;
import org.apache.eventmesh.common.config.convert.ConvertInfo;
import org.apache.eventmesh.common.config.convert.ConvertValue;
Expand Down Expand Up @@ -110,26 +110,26 @@ private void setValue() throws Exception {
field.setAccessible(true);

ConvertInfo convertInfo = this.convertInfo;
ConfigFiled configFiled = field.getAnnotation(ConfigFiled.class);
if (Objects.isNull(configFiled)) {
ConfigField configField = field.getAnnotation(ConfigField.class);
if (Objects.isNull(configField)) {
continue;
}

String key = this.buildKey(configFiled);
needReload = this.checkNeedReload(needReload, configFiled);
String key = this.buildKey(configField);
needReload = this.checkNeedReload(needReload, configField);

ConvertValue<?> convertValue = ConverterMap.getFieldConverter(field);
Object fieldValue = convertValue.processFieldValue(convertInfo, key, configFiled);
Object fieldValue = convertValue.processFieldValue(convertInfo, key, configField);

if (!checkFieldValueBefore(configFiled, key, convertValue, fieldValue)) {
if (!checkFieldValueBefore(configField, key, convertValue, fieldValue)) {
continue;
}
convertInfo.setValue(fieldValue);
convertInfo.setField(field);
convertInfo.setKey(key);
Object convertedValue = convertValue.convert(convertInfo);

if (!checkFieldValueAfter(configFiled, key, convertedValue)) {
if (!checkFieldValueAfter(configField, key, convertedValue)) {
continue;
}
field.set(object, convertedValue);
Expand All @@ -155,16 +155,16 @@ private void reloadConfigIfNeed(boolean needReload) throws NoSuchMethodException
}
}

private boolean checkFieldValueAfter(ConfigFiled configFiled, String key, Object convertedValue) {
private boolean checkFieldValueAfter(ConfigField configField, String key, Object convertedValue) {
if (Objects.isNull(convertedValue)) {
if (configFiled.notNull()) {
if (configField.notNull()) {
throw new RuntimeException(key + " can not be null!");
}

return false;
}

if (configFiled.beNumber()) {
if (configField.beNumber()) {
if (!StringUtils.isNumeric(String.valueOf(convertedValue))) {
throw new RuntimeException(key + " must be number!");
}
Expand All @@ -173,9 +173,9 @@ private boolean checkFieldValueAfter(ConfigFiled configFiled, String key, Object
return true;
}

private boolean checkFieldValueBefore(ConfigFiled configFiled, String key, ConvertValue<?> convertValue, Object fieldValue) {
private boolean checkFieldValueBefore(ConfigField configField, String key, ConvertValue<?> convertValue, Object fieldValue) {
if (Objects.isNull(fieldValue) && !convertValue.canHandleNullValue()) {
if (configFiled.notNull()) {
if (configField.notNull()) {
throw new RuntimeException(key + " can not be null!");
}

Expand All @@ -185,8 +185,8 @@ private boolean checkFieldValueBefore(ConfigFiled configFiled, String key, Conve
return true;
}

private boolean checkNeedReload(boolean needReload, ConfigFiled configFiled) {
if (!needReload && configFiled != null && configFiled.reload()) {
private boolean checkNeedReload(boolean needReload, ConfigField configField) {
if (!needReload && configField != null && configField.reload()) {
needReload = Boolean.TRUE;
}

Expand All @@ -201,14 +201,14 @@ private boolean checkNeedReload(boolean needReload, ConfigFiled configFiled) {
return needReload;
}

private String buildKey(ConfigFiled configFiled) {
private String buildKey(ConfigField configField) {
String key;
StringBuilder keyPrefix = new StringBuilder(Objects.isNull(prefix) ? "" : prefix);

if (configFiled == null || configFiled.field().isEmpty() && keyPrefix.length() > 0) {
if (configField == null || configField.field().isEmpty() && keyPrefix.length() > 0) {
key = keyPrefix.deleteCharAt(keyPrefix.length() - 1).toString();
} else {
key = keyPrefix.append(configFiled.field()).toString();
key = keyPrefix.append(configField.field()).toString();
}

return key;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.eventmesh.common.config.convert.converter;

import org.apache.eventmesh.common.config.ConfigFiled;
import org.apache.eventmesh.common.config.ConfigField;
import org.apache.eventmesh.common.config.convert.ConvertInfo;
import org.apache.eventmesh.common.config.convert.ConvertValue;
import org.apache.eventmesh.common.utils.PropertiesUtils;
Expand All @@ -41,7 +41,7 @@ public Properties convert(ConvertInfo convertInfo) {
}

@Override
public Object processFieldValue(ConvertInfo convertInfo, String prefix, ConfigFiled configFiled) {
public Object processFieldValue(ConvertInfo convertInfo, String prefix, ConfigField configField) {
Properties properties = convertInfo.getProperties();

if (StringUtils.isBlank(prefix)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ public void testGetCommonConfiguration() {
Assertions.assertEquals("cluster-succeed!!!", config.getEventMeshCluster());
Assertions.assertEquals("name-succeed!!!", config.getEventMeshName());
Assertions.assertEquals("816", config.getSysID());
// Assertions.assertEquals("connector-succeed!!!", config.getEventMeshConnectorPluginType());
Assertions.assertEquals("storage-succeed!!!", config.getEventMeshStoragePluginType());
Assertions.assertEquals("storage-succeed!!!", config.getEventMeshStoragePluginType());
Assertions.assertEquals("security-succeed!!!", config.getEventMeshSecurityPluginType());
Expand All @@ -55,9 +54,6 @@ public void testGetCommonConfiguration() {
Assertions.assertEquals("username-succeed!!!", config.getEventMeshMetaStoragePluginUsername());
Assertions.assertEquals("password-succeed!!!", config.getEventMeshMetaStoragePluginPassword());

Assertions.assertEquals(Integer.valueOf(816), config.getEventMeshMetaStorageIntervalInMills());
Assertions.assertEquals(Integer.valueOf(1816), config.getEventMeshFetchMetaStorageAddrInterval());

List<String> list = new ArrayList<>();
list.add("metrics-succeed1!!!");
list.add("metrics-succeed2!!!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ public class SystemUtilsTest {

@Test
public void isLinuxPlatform() {
if (null != SystemUtils.OS_NAME && SystemUtils.OS_NAME.toLowerCase().contains("linux")) {
if (SystemUtils.OS_NAME != null && SystemUtils.OS_NAME.toLowerCase().contains("linux")) {
Assertions.assertTrue(SystemUtils.isLinuxPlatform());
Assertions.assertFalse(SystemUtils.isWindowsPlatform());
}
}

@Test
public void isWindowsPlatform() {
if (null != SystemUtils.OS_NAME && SystemUtils.OS_NAME.toLowerCase().contains("windows")) {
if (SystemUtils.OS_NAME != null && SystemUtils.OS_NAME.toLowerCase().contains("windows")) {
Assertions.assertFalse(SystemUtils.isLinuxPlatform());
Assertions.assertTrue(SystemUtils.isWindowsPlatform());
}
Expand Down
Loading
Loading