From 512fbb6fc1d4ff82f1fff05aff5659785be4b188 Mon Sep 17 00:00:00 2001 From: loustler Date: Fri, 27 Sep 2024 20:12:50 +0900 Subject: [PATCH] Add test codes and refactor codes --- .../engine/e2e/k8s/KubernetesIT.java | 20 ++++++-- .../hazelcast-kubernetes-discovery.yaml | 46 +++++++++++++++++++ ...cast.yaml => hazelcast-tcp-discovery.yaml} | 0 .../engine/server/SeaTunnelNodeContext.java | 23 +--------- 4 files changed, 64 insertions(+), 25 deletions(-) create mode 100644 seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/resources/custom_config/hazelcast-kubernetes-discovery.yaml rename seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/resources/custom_config/{hazelcast.yaml => hazelcast-tcp-discovery.yaml} (100%) diff --git a/seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/java/org/apache/seatunnel/engine/e2e/k8s/KubernetesIT.java b/seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/java/org/apache/seatunnel/engine/e2e/k8s/KubernetesIT.java index 2de7caeddb5..ce2b73fb10c 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/java/org/apache/seatunnel/engine/e2e/k8s/KubernetesIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/java/org/apache/seatunnel/engine/e2e/k8s/KubernetesIT.java @@ -61,7 +61,18 @@ public class KubernetesIT { private static final String podName = "seatunnel-0"; @Test - public void test() + public void testTcpDiscovery() + throws IOException, XmlPullParserException, ApiException, InterruptedException { + runDiscoveryTest("hazelcast-tcp-discovery.yaml"); + } + + @Test + public void testKubernetesDiscovery() + throws IOException, XmlPullParserException, ApiException, InterruptedException { + runDiscoveryTest("hazelcast-kubernetes-discovery.yaml"); + } + + private void runDiscoveryTest(String hazelCastConfigFile) throws IOException, XmlPullParserException, ApiException, InterruptedException { ApiClient client = Config.defaultClient(); AppsV1Api appsV1Api = new AppsV1Api(client); @@ -82,7 +93,7 @@ public void test() log.info("Docker's environmental information"); log.info(info.toString()); if (dockerClient.listImagesCmd().withImageNameFilter(tag).exec().isEmpty()) { - copyFileToCurrentResources(targetPath); + copyFileToCurrentResources(hazelCastConfigFile, targetPath); File file = new File( PROJECT_ROOT_PATH @@ -153,7 +164,8 @@ public void test() } } - private void copyFileToCurrentResources(String targetPath) throws IOException { + private void copyFileToCurrentResources(String hazelCastConfigFile, String targetPath) + throws IOException { File jarsPath = new File(targetPath + "/jars"); jarsPath.mkdirs(); File binPath = new File(targetPath + "/bin"); @@ -164,7 +176,7 @@ private void copyFileToCurrentResources(String targetPath) throws IOException { new File(PROJECT_ROOT_PATH + "/config"), new File(targetPath + "/config")); // replace hazelcast.yaml and hazelcast-client.yaml Files.copy( - Paths.get(targetPath + "/custom_config/hazelcast.yaml"), + Paths.get(targetPath + "/custom_config/" + hazelCastConfigFile), Paths.get(targetPath + "/config/hazelcast.yaml"), StandardCopyOption.REPLACE_EXISTING); Files.copy( diff --git a/seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/resources/custom_config/hazelcast-kubernetes-discovery.yaml b/seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/resources/custom_config/hazelcast-kubernetes-discovery.yaml new file mode 100644 index 00000000000..d4a79cd0e47 --- /dev/null +++ b/seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/resources/custom_config/hazelcast-kubernetes-discovery.yaml @@ -0,0 +1,46 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +hazelcast: + cluster-name: seatunnel + network: + rest-api: + enabled: true + endpoint-groups: + CLUSTER_WRITE: + enabled: true + DATA: + enabled: true + join: + multicast: + enabled: false + kubernetes: + enabled: true + service-port: 5801 + namespace: default + service-name: seatunnel + port: + auto-increment: true + port-count: 100 + port: 5801 + properties: + hazelcast.invocation.max.retry.count: 100 + hazelcast.invocation.retry.pause.millis: 1000 + hazelcast.tcp.join.port.try.count: 30 + hazelcast.slow.operation.detector.stacktrace.logging.enabled: true + hazelcast.logging.type: log4j2 + hazelcast.operation.generic.thread.count: 200 diff --git a/seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/resources/custom_config/hazelcast.yaml b/seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/resources/custom_config/hazelcast-tcp-discovery.yaml similarity index 100% rename from seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/resources/custom_config/hazelcast.yaml rename to seatunnel-e2e/seatunnel-engine-e2e/seatunnel-engine-k8s-e2e/src/test/resources/custom_config/hazelcast-tcp-discovery.yaml diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelNodeContext.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelNodeContext.java index ad7a5425b5a..60174b88645 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelNodeContext.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelNodeContext.java @@ -24,14 +24,10 @@ import com.hazelcast.instance.impl.Node; import com.hazelcast.instance.impl.NodeExtension; import com.hazelcast.internal.cluster.Joiner; -import com.hazelcast.internal.config.AliasedDiscoveryConfigUtils; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import static com.hazelcast.config.ConfigAccessor.getActiveMemberNetworkConfig; -import static com.hazelcast.internal.config.AliasedDiscoveryConfigUtils.allUsePublicAddress; -import static com.hazelcast.spi.properties.ClusterProperty.DISCOVERY_SPI_ENABLED; -import static com.hazelcast.spi.properties.ClusterProperty.DISCOVERY_SPI_PUBLIC_IP_ENABLED; @Slf4j public class SeaTunnelNodeContext extends DefaultNodeContext { @@ -53,26 +49,11 @@ public Joiner createJoiner(Node node) { getActiveMemberNetworkConfig(seaTunnelConfig.getHazelcastConfig()).getJoin(); join.verify(); - if (node.shouldUseMulticastJoiner(join) && node.multicastService != null) { - return super.createJoiner(node); - } else if (join.getTcpIpConfig().isEnabled()) { + if (join.getTcpIpConfig().isEnabled()) { log.info("Using LiteNodeDropOutTcpIpJoiner TCP/IP discovery"); return new LiteNodeDropOutTcpIpJoiner(node); - } else if (node.getProperties().getBoolean(DISCOVERY_SPI_ENABLED) - || isAnyAliasedConfigEnabled(join) - || join.isAutoDetectionEnabled()) { - return super.createJoiner(node); } - return null; - } - - private static boolean isAnyAliasedConfigEnabled(JoinConfig join) { - return !AliasedDiscoveryConfigUtils.createDiscoveryStrategyConfigs(join).isEmpty(); - } - private boolean usePublicAddress(JoinConfig join, Node node) { - return node.getProperties().getBoolean(DISCOVERY_SPI_PUBLIC_IP_ENABLED) - || allUsePublicAddress( - AliasedDiscoveryConfigUtils.aliasedDiscoveryConfigsFrom(join)); + return super.createJoiner(node); } }