Skip to content

Commit b64606b

Browse files
xwm1992Pil0tXiacnzakiikarsonto
authored
EventMesh function connector runtime (#4858)
* [ISSUE #4812] Set up Admin Endpoints v2 (#4813) * Remove redundant overloaded methods * Simplify write() result param * Add writeJson(); Add PUT; Add JavaDoc * Rename EventHttpHandler to EventMeshHttpHandler * Correct server thread name * Clean up messy & non-hierarchical overloading * No need to set headers manually any more * Set up v1&v2 endpoints * Set up v1&v2 response dto * Introduce fastjson2 * Fix fastjson2 "level too large : 2048" error caused by IPAddress * Correct @ConfigField naming * Return properties format json key * Add format option to query string * Introduce Result * Reduce duplicate builder code * Fix all checkstyle warnings in eventmesh-runtime * Add known dependency * [ISSUE #4814] Migrate from fastjson 1.2.83 to fastjson2 (#4819) * [Enhancement] Migrate from fastjson 1.2.83 to fastjson2 #4814 * fix_dependencies_problem * fix_check * [ISSUE #4551] modify the logic of time-consumption statistics (#4822) * init connector runtime v2 * [ISSUE #4804] Fix SubStreamHandler exception loop by closeOnError (#4807) * Handle exception loop by closeOnError * Lombok optimization * some format optimization * Avoid closing multiple times * Remove redundant set null * Revert "Avoid closing multiple times" This reverts commit 767bc59. * Use synchronized latch to keep senderOnComplete called once * Use boolean to prevent latch called by somebody else * Remove the unique callee/caller close() of onCompleted() * [ISSUE #4838] Deprecate unused `eventMesh.connector.plugin.type` etc. properties (#4839) * Remove all references of `eventMesh.connector.plugin.type` * Deprecate `eventMesh.connector.plugin.type` and sort properties * Remove misconfigured & not-used `registerIntervalInMills`, `fetchRegistryAddrIntervalInMills` * Remove 'defibus' related un-used usages * Supplement #4809 for `null != object` * [ISSUE #4832] Downgrade stale bot to v8 to resolve state cache reserving error (#4833) * Revert stale bot to v8 to resolve state cache reserving error * Reduce operations-per-run to default value to ease pressure * Unify yaml to yml * [ISSUE #4820] Bug fix EventHandler not return json (#4821) * bug fix * bug fix * bug fix * update runtime v2 * update connector runtime * update connector runtime * update connector runtime * update connector runtime * update connector runtime --------- Co-authored-by: Pil0tXia <xiatian@apache.org> Co-authored-by: Zaki <91261012+cnzakii@users.noreply.github.com> Co-authored-by: Karson <karsontao@hotmail.com>
1 parent 43ef17f commit b64606b

File tree

147 files changed

+2559
-926
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

147 files changed

+2559
-926
lines changed
File renamed without changes.

.github/workflows/stale.yml

+2-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ jobs:
3131
stale:
3232
runs-on: ubuntu-latest
3333
steps:
34-
- uses: actions/stale@v9
34+
- uses: actions/stale@v8
3535
with:
3636
days-before-issue-stale: 90
3737
days-before-pr-stale: 60
@@ -47,3 +47,4 @@ jobs:
4747
exempt-issue-labels: 'pinned,discussion,help wanted,WIP,weopen-star,GLCC,summer of code'
4848
exempt-pr-labels: 'help wanted,dependencies'
4949
exempt-all-milestones: true # Exempt all issues/PRs with milestones from stale
50+
operations-per-run: 30

build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -569,7 +569,7 @@ subprojects {
569569
dependency "org.projectlombok:lombok:1.18.22"
570570
dependency "com.github.seancfoley:ipaddress:5.3.3"
571571
dependency "javax.annotation:javax.annotation-api:1.3.2"
572-
dependency "com.alibaba:fastjson:1.2.83"
572+
dependency "com.alibaba.fastjson2:fastjson2:2.0.48"
573573

574574
dependency "software.amazon.awssdk:s3:2.20.29"
575575
dependency "com.github.rholder:guava-retrying:2.0.0"

eventmesh-common/build.gradle

+2
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ dependencies {
3030
api "io.cloudevents:cloudevents-core"
3131
api "io.cloudevents:cloudevents-json-jackson"
3232

33+
api "com.alibaba.fastjson2:fastjson2"
34+
3335
implementation "org.apache.logging.log4j:log4j-api"
3436
implementation "org.apache.logging.log4j:log4j-core"
3537
implementation "org.apache.logging.log4j:log4j-slf4j2-impl"

eventmesh-common/src/main/java/org/apache/eventmesh/common/EventMeshThreadFactory.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -67,12 +67,12 @@ public EventMeshThreadFactory(final String threadNamePrefix) {
6767
public Thread newThread(@Nonnull final Runnable runnable) {
6868

6969
StringBuilder threadName = new StringBuilder(threadNamePrefix);
70-
if (null != threadIndex) {
70+
if (threadIndex != null) {
7171
threadName.append("-").append(threadIndex.incrementAndGet());
7272
}
7373
Thread thread = new Thread(runnable, threadName.toString());
7474
thread.setDaemon(daemon);
75-
if (null != priority) {
75+
if (priority != null) {
7676
thread.setPriority(priority);
7777
}
7878

eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java

+28-37
Original file line numberDiff line numberDiff line change
@@ -34,94 +34,85 @@
3434
@Config(prefix = "eventMesh")
3535
public class CommonConfiguration {
3636

37-
@ConfigFiled(field = "sysid", beNumber = true, notEmpty = true)
37+
@ConfigField(field = "sysid", beNumber = true, notEmpty = true)
3838
private String sysID = "5477";
3939

40-
@ConfigFiled(field = "server.env", notEmpty = true)
40+
@ConfigField(field = "server.env", notEmpty = true)
4141
private String eventMeshEnv = "P";
4242

43-
@ConfigFiled(field = "server.idc", notEmpty = true)
43+
@ConfigField(field = "server.idc", notEmpty = true)
4444
private String eventMeshIDC = "FT";
4545

46-
@ConfigFiled(field = "server.name", notEmpty = true)
46+
@ConfigField(field = "server.name", notEmpty = true)
4747
private String eventMeshName = "";
4848

49-
@ConfigFiled(field = "server.cluster", notEmpty = true)
49+
@ConfigField(field = "server.cluster", notEmpty = true)
5050
private String eventMeshCluster = "LS";
5151

52-
@ConfigFiled(field = "server.hostIp", reload = true)
52+
@ConfigField(field = "server.hostIp", reload = true)
5353
private String eventMeshServerIp = null;
5454

55-
@ConfigFiled(field = "metaStorage.plugin.server-addr", notEmpty = true)
55+
@ConfigField(field = "metaStorage.plugin.server-addr", notEmpty = true)
5656
private String metaStorageAddr = "";
5757

58-
@ConfigFiled(field = "metaStorage.plugin.type", notEmpty = true)
58+
@ConfigField(field = "metaStorage.plugin.type", notEmpty = true)
5959
private String eventMeshMetaStoragePluginType = "nacos";
6060

61-
@ConfigFiled(field = "metaStorage.plugin.username")
61+
@ConfigField(field = "metaStorage.plugin.username")
6262
private String eventMeshMetaStoragePluginUsername = "";
6363

64-
@ConfigFiled(field = "metaStorage.plugin.password")
64+
@ConfigField(field = "metaStorage.plugin.password")
6565
private String eventMeshMetaStoragePluginPassword = "";
6666

67-
@ConfigFiled(field = "metaStorage.plugin.metaStorageIntervalInMills")
68-
private Integer eventMeshMetaStorageIntervalInMills = 10 * 1000;
69-
70-
@ConfigFiled(field = "metaStorage.plugin.fetchMetaStorageAddrIntervalInMills")
71-
private Integer eventMeshFetchMetaStorageAddrInterval = 10 * 1000;
72-
73-
@ConfigFiled(field = "metaStorage.plugin.enabled")
67+
@ConfigField(field = "metaStorage.plugin.enabled")
7468
private boolean eventMeshServerMetaStorageEnable = false;
7569

76-
@ConfigFiled(field = "trace.plugin", notEmpty = true)
70+
@ConfigField(field = "trace.plugin", notEmpty = true)
7771
private String eventMeshTracePluginType;
7872

79-
@ConfigFiled(field = "metrics.plugin", notEmpty = true)
73+
@ConfigField(field = "metrics.plugin", notEmpty = true)
8074
private List<String> eventMeshMetricsPluginType;
8175

82-
@ConfigFiled(field = "security.plugin.type", notEmpty = true)
76+
@ConfigField(field = "security.plugin.type", notEmpty = true)
8377
private String eventMeshSecurityPluginType = "security";
8478

85-
@ConfigFiled(field = "connector.plugin.type", notEmpty = true)
86-
private String eventMeshConnectorPluginType = "rocketmq";
87-
88-
@ConfigFiled(field = "storage.plugin.type", notEmpty = true)
89-
private String eventMeshStoragePluginType = "rocketmq";
79+
@ConfigField(field = "storage.plugin.type", notEmpty = true)
80+
private String eventMeshStoragePluginType = "standalone";
9081

91-
@ConfigFiled(field = "security.validation.type.token", notEmpty = true)
82+
@ConfigField(field = "security.validation.type.token", notEmpty = true)
9283
private boolean eventMeshSecurityValidateTypeToken = false;
9384

94-
@ConfigFiled(field = "server.trace.enabled")
85+
@ConfigField(field = "server.trace.enabled")
9586
private boolean eventMeshServerTraceEnable = false;
9687

97-
@ConfigFiled(field = "server.security.enabled")
88+
@ConfigField(field = "server.security.enabled")
9889
private boolean eventMeshServerSecurityEnable = false;
9990

100-
@ConfigFiled(field = "security.publickey")
91+
@ConfigField(field = "security.publickey")
10192
private String eventMeshSecurityPublickey = "";
10293

103-
@ConfigFiled(field = "server.provide.protocols", reload = true)
94+
@ConfigField(field = "server.provide.protocols", reload = true)
10495
private List<String> eventMeshProvideServerProtocols;
10596

106-
@ConfigFiled(reload = true)
97+
@ConfigField(reload = true)
10798
private String eventMeshWebhookOrigin;
10899

109-
@ConfigFiled(reload = true)
100+
@ConfigField(reload = true)
110101
private String meshGroup;
111102

112-
@ConfigFiled(field = "server.retry.plugin.type")
103+
@ConfigField(field = "server.retry.plugin.type")
113104
private String eventMeshRetryPluginType = Constants.DEFAULT;
114105

115-
@ConfigFiled(field = "registry.plugin.server-addr", notEmpty = true)
106+
@ConfigField(field = "registry.plugin.server-addr", notEmpty = true)
116107
private String registryAddr = "";
117108

118-
@ConfigFiled(field = "registry.plugin.type", notEmpty = true)
109+
@ConfigField(field = "registry.plugin.type", notEmpty = true)
119110
private String eventMeshRegistryPluginType = "nacos";
120111

121-
@ConfigFiled(field = "registry.plugin.username")
112+
@ConfigField(field = "registry.plugin.username")
122113
private String eventMeshRegistryPluginUsername = "";
123114

124-
@ConfigFiled(field = "registry.plugin.password")
115+
@ConfigField(field = "registry.plugin.password")
125116
private String eventMeshRegistryPluginPassword = "";
126117

127118
public void reload() {

eventmesh-common/src/main/java/org/apache/eventmesh/common/config/ConfigFiled.java eventmesh-common/src/main/java/org/apache/eventmesh/common/config/ConfigField.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
*/
3030
@Retention(RetentionPolicy.RUNTIME)
3131
@Target({ElementType.TYPE, ElementType.FIELD})
32-
public @interface ConfigFiled {
32+
public @interface ConfigField {
3333

3434
/**
3535
* @return The key name of the configuration file

eventmesh-common/src/main/java/org/apache/eventmesh/common/config/convert/ConvertValue.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.eventmesh.common.config.convert;
1919

20-
import org.apache.eventmesh.common.config.ConfigFiled;
20+
import org.apache.eventmesh.common.config.ConfigField;
2121

2222
import org.apache.commons.lang3.StringUtils;
2323

@@ -44,7 +44,7 @@ default boolean canHandleNullValue() {
4444
/**
4545
* @return The value converter needs
4646
*/
47-
default Object processFieldValue(ConvertInfo convertInfo, String key, ConfigFiled configFiled) {
47+
default Object processFieldValue(ConvertInfo convertInfo, String key, ConfigField configField) {
4848
Properties properties = convertInfo.getProperties();
4949
String value = properties.getProperty(key);
5050

@@ -54,14 +54,14 @@ default Object processFieldValue(ConvertInfo convertInfo, String key, ConfigFile
5454

5555
value = value.trim();
5656

57-
boolean findEnv = configFiled.findEnv();
58-
String fieldName = configFiled.field();
57+
boolean findEnv = configField.findEnv();
58+
String fieldName = configField.field();
5959

6060
if (StringUtils.isBlank(value) && !StringUtils.isBlank(fieldName) && findEnv) {
6161
value = Optional.ofNullable(System.getProperty(fieldName)).orElse(System.getenv(fieldName));
6262
}
6363

64-
if (StringUtils.isBlank(value) && configFiled.notEmpty()) {
64+
if (StringUtils.isBlank(value) && configField.notEmpty()) {
6565
throw new RuntimeException(key + " can't be empty!");
6666
}
6767

eventmesh-common/src/main/java/org/apache/eventmesh/common/config/convert/ConverterMap.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.eventmesh.common.config.convert;
1919

20-
import org.apache.eventmesh.common.config.ConfigFiled;
20+
import org.apache.eventmesh.common.config.ConfigField;
2121
import org.apache.eventmesh.common.config.convert.converter.BaseDataTypeConverter;
2222
import org.apache.eventmesh.common.config.convert.converter.DateConverter;
2323
import org.apache.eventmesh.common.config.convert.converter.EnumConverter;
@@ -96,9 +96,9 @@ public static void register(ConvertValue<?> convertValue, Class<?>... clazzs) {
9696
*/
9797
public static ConvertValue<?> getFieldConverter(Field field) {
9898
Class<?> clazz = field.getType();
99-
ConfigFiled configFiled = field.getAnnotation(ConfigFiled.class);
99+
ConfigField configField = field.getAnnotation(ConfigField.class);
100100

101-
Class<?> converter1 = configFiled.converter();
101+
Class<?> converter1 = configField.converter();
102102
if (!converter1.equals(ConvertValue.DefaultConverter.class)) {
103103
if (!classToConverter.containsKey(converter1)) {
104104
try {

eventmesh-common/src/main/java/org/apache/eventmesh/common/config/convert/converter/ObjectConverter.java

+18-18
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.eventmesh.common.config.convert.converter;
1919

2020
import org.apache.eventmesh.common.config.Config;
21-
import org.apache.eventmesh.common.config.ConfigFiled;
21+
import org.apache.eventmesh.common.config.ConfigField;
2222
import org.apache.eventmesh.common.config.ConfigInfo;
2323
import org.apache.eventmesh.common.config.convert.ConvertInfo;
2424
import org.apache.eventmesh.common.config.convert.ConvertValue;
@@ -110,26 +110,26 @@ private void setValue() throws Exception {
110110
field.setAccessible(true);
111111

112112
ConvertInfo convertInfo = this.convertInfo;
113-
ConfigFiled configFiled = field.getAnnotation(ConfigFiled.class);
114-
if (Objects.isNull(configFiled)) {
113+
ConfigField configField = field.getAnnotation(ConfigField.class);
114+
if (Objects.isNull(configField)) {
115115
continue;
116116
}
117117

118-
String key = this.buildKey(configFiled);
119-
needReload = this.checkNeedReload(needReload, configFiled);
118+
String key = this.buildKey(configField);
119+
needReload = this.checkNeedReload(needReload, configField);
120120

121121
ConvertValue<?> convertValue = ConverterMap.getFieldConverter(field);
122-
Object fieldValue = convertValue.processFieldValue(convertInfo, key, configFiled);
122+
Object fieldValue = convertValue.processFieldValue(convertInfo, key, configField);
123123

124-
if (!checkFieldValueBefore(configFiled, key, convertValue, fieldValue)) {
124+
if (!checkFieldValueBefore(configField, key, convertValue, fieldValue)) {
125125
continue;
126126
}
127127
convertInfo.setValue(fieldValue);
128128
convertInfo.setField(field);
129129
convertInfo.setKey(key);
130130
Object convertedValue = convertValue.convert(convertInfo);
131131

132-
if (!checkFieldValueAfter(configFiled, key, convertedValue)) {
132+
if (!checkFieldValueAfter(configField, key, convertedValue)) {
133133
continue;
134134
}
135135
field.set(object, convertedValue);
@@ -155,16 +155,16 @@ private void reloadConfigIfNeed(boolean needReload) throws NoSuchMethodException
155155
}
156156
}
157157

158-
private boolean checkFieldValueAfter(ConfigFiled configFiled, String key, Object convertedValue) {
158+
private boolean checkFieldValueAfter(ConfigField configField, String key, Object convertedValue) {
159159
if (Objects.isNull(convertedValue)) {
160-
if (configFiled.notNull()) {
160+
if (configField.notNull()) {
161161
throw new RuntimeException(key + " can not be null!");
162162
}
163163

164164
return false;
165165
}
166166

167-
if (configFiled.beNumber()) {
167+
if (configField.beNumber()) {
168168
if (!StringUtils.isNumeric(String.valueOf(convertedValue))) {
169169
throw new RuntimeException(key + " must be number!");
170170
}
@@ -173,9 +173,9 @@ private boolean checkFieldValueAfter(ConfigFiled configFiled, String key, Object
173173
return true;
174174
}
175175

176-
private boolean checkFieldValueBefore(ConfigFiled configFiled, String key, ConvertValue<?> convertValue, Object fieldValue) {
176+
private boolean checkFieldValueBefore(ConfigField configField, String key, ConvertValue<?> convertValue, Object fieldValue) {
177177
if (Objects.isNull(fieldValue) && !convertValue.canHandleNullValue()) {
178-
if (configFiled.notNull()) {
178+
if (configField.notNull()) {
179179
throw new RuntimeException(key + " can not be null!");
180180
}
181181

@@ -185,8 +185,8 @@ private boolean checkFieldValueBefore(ConfigFiled configFiled, String key, Conve
185185
return true;
186186
}
187187

188-
private boolean checkNeedReload(boolean needReload, ConfigFiled configFiled) {
189-
if (!needReload && configFiled != null && configFiled.reload()) {
188+
private boolean checkNeedReload(boolean needReload, ConfigField configField) {
189+
if (!needReload && configField != null && configField.reload()) {
190190
needReload = Boolean.TRUE;
191191
}
192192

@@ -201,14 +201,14 @@ private boolean checkNeedReload(boolean needReload, ConfigFiled configFiled) {
201201
return needReload;
202202
}
203203

204-
private String buildKey(ConfigFiled configFiled) {
204+
private String buildKey(ConfigField configField) {
205205
String key;
206206
StringBuilder keyPrefix = new StringBuilder(Objects.isNull(prefix) ? "" : prefix);
207207

208-
if (configFiled == null || configFiled.field().isEmpty() && keyPrefix.length() > 0) {
208+
if (configField == null || configField.field().isEmpty() && keyPrefix.length() > 0) {
209209
key = keyPrefix.deleteCharAt(keyPrefix.length() - 1).toString();
210210
} else {
211-
key = keyPrefix.append(configFiled.field()).toString();
211+
key = keyPrefix.append(configField.field()).toString();
212212
}
213213

214214
return key;

eventmesh-common/src/main/java/org/apache/eventmesh/common/config/convert/converter/PropertiesConverter.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.eventmesh.common.config.convert.converter;
1919

20-
import org.apache.eventmesh.common.config.ConfigFiled;
20+
import org.apache.eventmesh.common.config.ConfigField;
2121
import org.apache.eventmesh.common.config.convert.ConvertInfo;
2222
import org.apache.eventmesh.common.config.convert.ConvertValue;
2323
import org.apache.eventmesh.common.utils.PropertiesUtils;
@@ -41,7 +41,7 @@ public Properties convert(ConvertInfo convertInfo) {
4141
}
4242

4343
@Override
44-
public Object processFieldValue(ConvertInfo convertInfo, String prefix, ConfigFiled configFiled) {
44+
public Object processFieldValue(ConvertInfo convertInfo, String prefix, ConfigField configField) {
4545
Properties properties = convertInfo.getProperties();
4646

4747
if (StringUtils.isBlank(prefix)) {

eventmesh-common/src/test/java/org/apache/eventmesh/common/config/CommonConfigurationTest.java

-4
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ public void testGetCommonConfiguration() {
4545
Assertions.assertEquals("cluster-succeed!!!", config.getEventMeshCluster());
4646
Assertions.assertEquals("name-succeed!!!", config.getEventMeshName());
4747
Assertions.assertEquals("816", config.getSysID());
48-
// Assertions.assertEquals("connector-succeed!!!", config.getEventMeshConnectorPluginType());
4948
Assertions.assertEquals("storage-succeed!!!", config.getEventMeshStoragePluginType());
5049
Assertions.assertEquals("storage-succeed!!!", config.getEventMeshStoragePluginType());
5150
Assertions.assertEquals("security-succeed!!!", config.getEventMeshSecurityPluginType());
@@ -55,9 +54,6 @@ public void testGetCommonConfiguration() {
5554
Assertions.assertEquals("username-succeed!!!", config.getEventMeshMetaStoragePluginUsername());
5655
Assertions.assertEquals("password-succeed!!!", config.getEventMeshMetaStoragePluginPassword());
5756

58-
Assertions.assertEquals(Integer.valueOf(816), config.getEventMeshMetaStorageIntervalInMills());
59-
Assertions.assertEquals(Integer.valueOf(1816), config.getEventMeshFetchMetaStorageAddrInterval());
60-
6157
List<String> list = new ArrayList<>();
6258
list.add("metrics-succeed1!!!");
6359
list.add("metrics-succeed2!!!");

eventmesh-common/src/test/java/org/apache/eventmesh/common/utils/SystemUtilsTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,15 @@ public class SystemUtilsTest {
2424

2525
@Test
2626
public void isLinuxPlatform() {
27-
if (null != SystemUtils.OS_NAME && SystemUtils.OS_NAME.toLowerCase().contains("linux")) {
27+
if (SystemUtils.OS_NAME != null && SystemUtils.OS_NAME.toLowerCase().contains("linux")) {
2828
Assertions.assertTrue(SystemUtils.isLinuxPlatform());
2929
Assertions.assertFalse(SystemUtils.isWindowsPlatform());
3030
}
3131
}
3232

3333
@Test
3434
public void isWindowsPlatform() {
35-
if (null != SystemUtils.OS_NAME && SystemUtils.OS_NAME.toLowerCase().contains("windows")) {
35+
if (SystemUtils.OS_NAME != null && SystemUtils.OS_NAME.toLowerCase().contains("windows")) {
3636
Assertions.assertFalse(SystemUtils.isLinuxPlatform());
3737
Assertions.assertTrue(SystemUtils.isWindowsPlatform());
3838
}

0 commit comments

Comments
 (0)