diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java index 840266bf45d..9504c9cb2c1 100644 --- a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java +++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java @@ -78,6 +78,11 @@ public class ClientCommandArgs extends AbstractCommandArgs { description = "Get job metrics by JobId") private String metricsJobId; + @Parameter( + names = {"--get_running_job_metrics"}, + description = "Gets metrics for running jobs") + private boolean getRunningJobMetrics = false; + @Parameter( names = {"-l", "--list"}, description = "list job status") diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java index 50112cb2a6a..14b00540f22 100644 --- a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java +++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java @@ -97,6 +97,9 @@ public void execute() throws CommandExecuteException { if (clientCommandArgs.isListJob()) { String jobStatus = engineClient.getJobClient().listJobStatus(true); System.out.println(jobStatus); + } else if (clientCommandArgs.isGetRunningJobMetrics()) { + String runningJobMetrics = engineClient.getJobClient().getRunningJobMetrics(); + System.out.println(runningJobMetrics); } else if (null != clientCommandArgs.getJobId()) { String jobState = engineClient diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java index 8405f409ee1..c4a4f68a698 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java +++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java @@ -35,6 +35,7 @@ import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobInfoCodec; import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobMetricsCodec; import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobStatusCodec; +import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetRunningJobMetricsCodec; import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelListJobStatusCodec; import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelSavePointJobCodec; @@ -117,6 +118,12 @@ public String getJobMetrics(Long jobId) { SeaTunnelGetJobMetricsCodec::decodeResponse); } + public String getRunningJobMetrics() { + return hazelcastClient.requestOnMasterAndDecodeResponse( + SeaTunnelGetRunningJobMetricsCodec.encodeRequest(), + SeaTunnelGetRunningJobMetricsCodec::decodeResponse); + } + public void savePointJob(Long jobId) { PassiveCompletableFuture cancelFuture = hazelcastClient.requestOnMasterAndGetCompletableFuture( diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java index 369120673ae..85aec59c276 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java +++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java @@ -202,6 +202,8 @@ public void testGetJobMetrics() { String jobMetrics = jobClient.getJobMetrics(jobId); + System.out.println(jobMetrics); + Assertions.assertTrue(jobMetrics.contains(SOURCE_RECEIVED_COUNT)); Assertions.assertTrue(jobMetrics.contains(SOURCE_RECEIVED_QPS)); Assertions.assertTrue(jobMetrics.contains(SINK_WRITE_COUNT)); @@ -214,6 +216,58 @@ public void testGetJobMetrics() { } } + @Test + public void testGetRunningJobMetrics() throws ExecutionException, InterruptedException { + Common.setDeployMode(DeployMode.CLUSTER); + String filePath = TestUtils.getResource("/batch_fake_to_console.conf"); + JobConfig jobConfig = new JobConfig(); + jobConfig.setName("fake_to_console1"); + + SeaTunnelClient seaTunnelClient = createSeaTunnelClient(); + JobClient jobClient = seaTunnelClient.getJobClient(); + + ClientJobProxy execute1 = + seaTunnelClient.createExecutionContext(filePath, jobConfig).execute(); + long jobId1 = execute1.getJobId(); + + execute1.waitForJobComplete(); + + filePath = TestUtils.getResource("streaming_fake_to_console.conf"); + jobConfig = new JobConfig(); + jobConfig.setName("fake_to_console2"); + ClientJobProxy execute2 = + seaTunnelClient.createExecutionContext(filePath, jobConfig).execute(); + ClientJobProxy execute3 = + seaTunnelClient.createExecutionContext(filePath, jobConfig).execute(); + + long jobId2 = execute2.getJobId(); + long jobId3 = execute3.getJobId(); + + await().atMost(30000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> + Assertions.assertTrue( + jobClient.getJobStatus(jobId1).equals("FINISHED") + && jobClient.getJobStatus(jobId2).equals("RUNNING") + && jobClient + .getJobStatus(jobId3) + .equals("RUNNING"))); + + System.out.println(jobClient.getRunningJobMetrics()); + + await().atMost(30000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + String runningJobMetrics = jobClient.getRunningJobMetrics(); + Assertions.assertTrue( + runningJobMetrics.contains(jobId2 + "") + && runningJobMetrics.contains(jobId3 + "")); + }); + + jobClient.cancelJob(jobId2); + jobClient.cancelJob(jobId3); + } + @Test public void testCancelJob() throws ExecutionException, InterruptedException { Common.setDeployMode(DeployMode.CLIENT); diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fake_to_console.conf b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fake_to_console.conf new file mode 100644 index 00000000000..e8d74830fa5 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fake_to_console.conf @@ -0,0 +1,48 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set engine configuration here + execution.parallelism = 1 + job.mode = "BATCH" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + FakeSource { + result_table_name = "fake" + parallelism = 1 + schema = { + fields { + name = "string" + age = "int" + } + } + } +} + +transform { +} + +sink { + console { + source_table_name="fake" + } +} \ No newline at end of file diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelGetRunningJobMetricsCodec.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelGetRunningJobMetricsCodec.java new file mode 100644 index 00000000000..44b0f89222a --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelGetRunningJobMetricsCodec.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.core.protocol.codec; + +import com.hazelcast.client.impl.protocol.ClientMessage; +import com.hazelcast.client.impl.protocol.Generated; +import com.hazelcast.client.impl.protocol.codec.builtin.StringCodec; + +import static com.hazelcast.client.impl.protocol.ClientMessage.PARTITION_ID_FIELD_OFFSET; +import static com.hazelcast.client.impl.protocol.ClientMessage.RESPONSE_BACKUP_ACKS_FIELD_OFFSET; +import static com.hazelcast.client.impl.protocol.ClientMessage.TYPE_FIELD_OFFSET; +import static com.hazelcast.client.impl.protocol.ClientMessage.UNFRAGMENTED_MESSAGE; +import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.BYTE_SIZE_IN_BYTES; +import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.INT_SIZE_IN_BYTES; +import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.encodeInt; + +/* + * This file is auto-generated by the Hazelcast Client Protocol Code Generator. + * To change this file, edit the templates or the protocol + * definitions on the https://github.com/hazelcast/hazelcast-client-protocol + * and regenerate it. + */ + +/** */ +@Generated("2a54110c40297eed90df5f79bde1171d") +public final class SeaTunnelGetRunningJobMetricsCodec { + // hex: 0xDE0C00 + public static final int REQUEST_MESSAGE_TYPE = 14552064; + // hex: 0xDE0C01 + public static final int RESPONSE_MESSAGE_TYPE = 14552065; + private static final int REQUEST_INITIAL_FRAME_SIZE = + PARTITION_ID_FIELD_OFFSET + INT_SIZE_IN_BYTES; + private static final int RESPONSE_INITIAL_FRAME_SIZE = + RESPONSE_BACKUP_ACKS_FIELD_OFFSET + BYTE_SIZE_IN_BYTES; + + private SeaTunnelGetRunningJobMetricsCodec() {} + + public static ClientMessage encodeRequest() { + ClientMessage clientMessage = ClientMessage.createForEncode(); + clientMessage.setRetryable(true); + clientMessage.setOperationName("SeaTunnel.GetRunningJobMetrics"); + ClientMessage.Frame initialFrame = + new ClientMessage.Frame(new byte[REQUEST_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE); + encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, REQUEST_MESSAGE_TYPE); + encodeInt(initialFrame.content, PARTITION_ID_FIELD_OFFSET, -1); + clientMessage.add(initialFrame); + return clientMessage; + } + + public static ClientMessage encodeResponse(java.lang.String response) { + ClientMessage clientMessage = ClientMessage.createForEncode(); + ClientMessage.Frame initialFrame = + new ClientMessage.Frame( + new byte[RESPONSE_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE); + encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, RESPONSE_MESSAGE_TYPE); + clientMessage.add(initialFrame); + + StringCodec.encode(clientMessage, response); + return clientMessage; + } + + /** */ + public static java.lang.String decodeResponse(ClientMessage clientMessage) { + ClientMessage.ForwardFrameIterator iterator = clientMessage.frameIterator(); + // empty initial frame + iterator.next(); + return StringCodec.decode(iterator); + } +} diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml b/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml index 78317d82505..f14f86d389e 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml +++ b/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml @@ -217,6 +217,22 @@ methods: retryable: true partitionIdentifier: -1 params: [] + response: + params: + - name: response + type: String + nullable: false + since: 2.0 + doc: '' + + - id: 12 + name: getRunningJobMetrics + since: 2.0 + doc: '' + request: + retryable: true + partitionIdentifier: -1 + params: [ ] response: params: - name: response diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java index bf913a42452..d2931d0c37e 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java @@ -18,7 +18,9 @@ package org.apache.seatunnel.engine.server; import org.apache.seatunnel.api.common.metrics.JobMetrics; +import org.apache.seatunnel.api.common.metrics.RawJobMetrics; import org.apache.seatunnel.common.utils.ExceptionUtils; +import org.apache.seatunnel.common.utils.SeaTunnelException; import org.apache.seatunnel.common.utils.StringFormatUtils; import org.apache.seatunnel.engine.common.Constant; import org.apache.seatunnel.engine.common.config.EngineConfig; @@ -45,9 +47,12 @@ import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager; import org.apache.seatunnel.engine.server.resourcemanager.ResourceManagerFactory; import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile; +import org.apache.seatunnel.engine.server.task.operation.GetMetricsOperation; +import org.apache.seatunnel.engine.server.utils.NodeEngineUtil; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.hazelcast.cluster.Address; +import com.hazelcast.core.HazelcastInstanceNotActiveException; import com.hazelcast.internal.serialization.Data; import com.hazelcast.internal.services.MembershipServiceEvent; import com.hazelcast.logging.ILogger; @@ -55,9 +60,12 @@ import com.hazelcast.spi.impl.NodeEngineImpl; import lombok.NonNull; +import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; @@ -68,6 +76,9 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; +import static org.apache.seatunnel.api.common.metrics.MetricTags.JOB_ID; +import static org.apache.seatunnel.engine.server.metrics.JobMetricsUtil.toJobMetricsMap; + public class CoordinatorService { private final NodeEngineImpl nodeEngine; private final ILogger logger; @@ -530,6 +541,73 @@ public JobMetrics getJobMetrics(long jobId) { return jobMetricsImap != null ? jobMetricsImap.merge(jobMetrics) : jobMetrics; } + public Map getRunningJobMetrics() { + final Set runningJobIds = runningJobMasterMap.keySet(); + + Set
addresses = new HashSet<>(); + ownedSlotProfilesIMap.forEach( + (pipelineLocation, ownedSlotProfilesIMap) -> { + if (runningJobIds.contains(pipelineLocation.getJobId())) { + ownedSlotProfilesIMap + .values() + .forEach( + ownedSlotProfile -> { + addresses.add(ownedSlotProfile.getWorker()); + }); + } + }); + + List metrics = new ArrayList<>(); + + addresses.forEach( + address -> { + try { + if (nodeEngine.getClusterService().getMember(address) != null) { + RawJobMetrics rawJobMetrics = + (RawJobMetrics) + NodeEngineUtil.sendOperationToMemberNode( + nodeEngine, + new GetMetricsOperation( + dis -> + (dis.tagValue(JOB_ID) + != null + && runningJobIds + .contains( + Long + .parseLong( + dis + .tagValue( + JOB_ID))))), + address) + .get(); + metrics.add(rawJobMetrics); + } + } + // HazelcastInstanceNotActiveException. It means that the node is + // offline, so waiting for the taskGroup to restore can be successful + catch (HazelcastInstanceNotActiveException e) { + logger.warning( + String.format( + "get metrics with exception: %s.", + ExceptionUtils.getMessage(e))); + } catch (Exception e) { + throw new SeaTunnelException(e.getMessage()); + } + }); + + Map longJobMetricsMap = toJobMetricsMap(metrics); + + longJobMetricsMap.forEach( + (jobId, jobMetrics) -> { + JobMetrics jobMetricsImap = jobHistoryService.getJobMetrics(jobId); + if (jobMetricsImap != null) { + longJobMetricsMap.put(jobId, jobMetricsImap.merge(jobMetrics)); + } + }); + + return longJobMetricsMap; + } + public JobDAGInfo getJobInfo(long jobId) { JobDAGInfo jobInfo = jobHistoryService.getJobDAGInfo(jobId); if (jobInfo != null) { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/JobMetricsUtil.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/JobMetricsUtil.java index 722e6a5ddb0..057b0dbeb51 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/JobMetricsUtil.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/JobMetricsUtil.java @@ -17,6 +17,11 @@ package org.apache.seatunnel.engine.server.metrics; +import org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.SerializationFeature; +import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode; + import org.apache.seatunnel.api.common.metrics.JobMetrics; import org.apache.seatunnel.api.common.metrics.Measurement; import org.apache.seatunnel.api.common.metrics.MetricTags; @@ -35,10 +40,13 @@ import java.util.function.UnaryOperator; import static org.apache.seatunnel.api.common.metrics.MetricTags.ADDRESS; +import static org.apache.seatunnel.api.common.metrics.MetricTags.JOB_ID; import static org.apache.seatunnel.api.common.metrics.MetricTags.MEMBER; public final class JobMetricsUtil { + private static ObjectMapper OBJECTMAPPER = new ObjectMapper(); + private JobMetricsUtil() {} public static String getTaskGroupLocationFromMetricsDescriptor(MetricDescriptor descriptor) { @@ -68,6 +76,78 @@ public static JobMetrics toJobMetrics(List rawJobMetrics) { return JobMetrics.of(consumer.metrics); } + public static String toJsonString(Object o) { + OBJECTMAPPER.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); + try { + return OBJECTMAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(o); + } catch (JsonProcessingException e) { + ObjectNode objectNode = OBJECTMAPPER.createObjectNode(); + objectNode.put("err", "serialize JobMetrics err"); + return objectNode.toString(); + } + } + + public static Map toJobMetricsMap(List rawJobMetrics) { + metricsConsumer consumer = new metricsConsumer(); + for (RawJobMetrics metrics : rawJobMetrics) { + if (metrics.getBlob() == null) { + continue; + } + consumer.timestamp = metrics.getTimestamp(); + MetricsCompressor.extractMetrics(metrics.getBlob(), consumer); + } + + Map jobMetricsMap = MapUtil.createHashMap(consumer.metrics.size()); + consumer.metrics.forEach( + (jobId, metrics) -> { + jobMetricsMap.put(jobId, JobMetrics.of(metrics)); + }); + + return jobMetricsMap; + } + + private static class metricsConsumer implements MetricConsumer { + + final Map>> metrics = new HashMap<>(); + long timestamp; + + @Override + public void consumeLong(MetricDescriptor descriptor, long value) { + + String jobId = descriptor.tagValue(JOB_ID); + if (jobId == null) { + return; + } + long jobIdLong = Long.parseLong(jobId); + metrics.computeIfAbsent(jobIdLong, k -> new HashMap<>()) + .computeIfAbsent(descriptor.metric(), k -> new ArrayList<>()) + .add(measurement(descriptor, value)); + } + + @Override + public void consumeDouble(MetricDescriptor descriptor, double value) { + String jobId = descriptor.tagValue(JOB_ID); + if (jobId == null) { + return; + } + long jobIdLong = Long.parseLong(jobId); + metrics.computeIfAbsent(jobIdLong, k -> new HashMap<>()) + .computeIfAbsent(descriptor.metric(), k -> new ArrayList<>()) + .add(measurement(descriptor, value)); + } + + private Measurement measurement(MetricDescriptor descriptor, Object value) { + Map tags = MapUtil.createHashMap(descriptor.tagCount()); + for (int i = 0; i < descriptor.tagCount(); i++) { + tags.put(descriptor.tag(i), descriptor.tagValue(i)); + } + if (descriptor.discriminator() != null || descriptor.discriminatorValue() != null) { + tags.put(descriptor.discriminator(), descriptor.discriminatorValue()); + } + return Measurement.of(descriptor.metric(), value, timestamp, tags); + } + } + private static class JobMetricsConsumer implements MetricConsumer { final Map> metrics = new HashMap<>(); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/ZetaMetricsCollector.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/ZetaMetricsCollector.java new file mode 100644 index 00000000000..8a4f5401d1f --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/ZetaMetricsCollector.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.metrics; + +import org.apache.seatunnel.api.common.metrics.RawJobMetrics; + +import com.hazelcast.cluster.Member; +import com.hazelcast.internal.metrics.MetricDescriptor; +import com.hazelcast.internal.metrics.collectors.MetricsCollector; +import com.hazelcast.internal.metrics.impl.MetricsCompressor; +import com.hazelcast.logging.ILogger; + +import java.util.Objects; +import java.util.function.Predicate; +import java.util.function.UnaryOperator; + +public class ZetaMetricsCollector implements MetricsCollector { + + private final Predicate metricDescriptorPredicate; + private final MetricsCompressor compressor; + private final ILogger logger; + private final UnaryOperator addPrefixFn; + + public ZetaMetricsCollector( + Predicate metricDescriptorPredicate, Member member, ILogger logger) { + Objects.requireNonNull(member, "member"); + this.logger = Objects.requireNonNull(logger, "logger"); + + this.metricDescriptorPredicate = metricDescriptorPredicate; + this.addPrefixFn = JobMetricsUtil.addMemberPrefixFn(member); + this.compressor = new MetricsCompressor(); + } + + @Override + public void collectLong(MetricDescriptor descriptor, long value) { + if (metricDescriptorPredicate.test(descriptor)) { + compressor.addLong(addPrefixFn.apply(descriptor), value); + } + } + + @Override + public void collectDouble(MetricDescriptor descriptor, double value) { + if (metricDescriptorPredicate.test(descriptor)) { + compressor.addDouble(addPrefixFn.apply(descriptor), value); + } + } + + @Override + public void collectException(MetricDescriptor descriptor, Exception e) { + if (metricDescriptorPredicate.test(descriptor)) { + logger.warning("Exception when rendering job metrics: " + e, e); + } + } + + @Override + public void collectNoValue(MetricDescriptor descriptor) {} + + public RawJobMetrics getMetrics() { + return RawJobMetrics.of(compressor.getBlobAndReset()); + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobMetricsOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobMetricsOperation.java index 965816cd137..06686473817 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobMetricsOperation.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobMetricsOperation.java @@ -30,6 +30,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import static org.apache.seatunnel.engine.server.metrics.JobMetricsUtil.toJsonString; + public class GetJobMetricsOperation extends Operation implements IdentifiedDataSerializable, AllowedDuringPassiveState { private long jobId; @@ -70,9 +72,10 @@ public void run() { CompletableFuture future = CompletableFuture.supplyAsync( () -> { - return service.getCoordinatorService() - .getJobMetrics(jobId) - .toJsonString(); + return toJsonString( + service.getCoordinatorService() + .getJobMetrics(jobId) + .getMetrics()); }, getNodeEngine() .getExecutionService() diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetRunningJobMetricsOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetRunningJobMetricsOperation.java new file mode 100644 index 00000000000..9277c324f32 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetRunningJobMetricsOperation.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.operation; + +import org.apache.seatunnel.engine.server.SeaTunnelServer; +import org.apache.seatunnel.engine.server.serializable.ClientToServerOperationDataSerializerHook; + +import com.hazelcast.nio.ObjectDataInput; +import com.hazelcast.nio.ObjectDataOutput; +import com.hazelcast.nio.serialization.IdentifiedDataSerializable; +import com.hazelcast.spi.impl.AllowedDuringPassiveState; +import com.hazelcast.spi.impl.operationservice.Operation; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static org.apache.seatunnel.engine.server.metrics.JobMetricsUtil.toJsonString; + +public class GetRunningJobMetricsOperation extends Operation + implements IdentifiedDataSerializable, AllowedDuringPassiveState { + + private String response; + + public GetRunningJobMetricsOperation() {} + + @Override + public final int getFactoryId() { + return ClientToServerOperationDataSerializerHook.FACTORY_ID; + } + + @Override + public int getClassId() { + return ClientToServerOperationDataSerializerHook.GET_RUNNING_JOB_METRICS_OPERATOR; + } + + @Override + protected void writeInternal(ObjectDataOutput out) throws IOException { + super.writeInternal(out); + } + + @Override + protected void readInternal(ObjectDataInput in) throws IOException { + super.readInternal(in); + } + + @Override + public void run() { + SeaTunnelServer service = getService(); + CompletableFuture future = + CompletableFuture.supplyAsync( + () -> { + return toJsonString( + service.getCoordinatorService().getRunningJobMetrics()); + }, + getNodeEngine() + .getExecutionService() + .getExecutor("get_running_job_metrics_operation")); + + try { + response = future.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + @Override + public Object getResponse() { + return response; + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetJobMetricsTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetJobMetricsTask.java index d083a6a3aad..ea3e834f66c 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetJobMetricsTask.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetJobMetricsTask.java @@ -43,7 +43,7 @@ protected Operation prepareOperation() { @Override public String getMethodName() { - return "getJobStatus"; + return "getJobMetrics"; } @Override diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetRunningJobMetricsTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetRunningJobMetricsTask.java new file mode 100644 index 00000000000..2c13e253eba --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetRunningJobMetricsTask.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.protocol.task; + +import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetRunningJobMetricsCodec; +import org.apache.seatunnel.engine.server.operation.GetRunningJobMetricsOperation; + +import com.hazelcast.client.impl.protocol.ClientMessage; +import com.hazelcast.instance.impl.Node; +import com.hazelcast.internal.nio.Connection; +import com.hazelcast.spi.impl.operationservice.Operation; + +public class GetRunningJobMetricsTask extends AbstractSeaTunnelMessageTask { + + protected GetRunningJobMetricsTask( + ClientMessage clientMessage, Node node, Connection connection) { + super( + clientMessage, + node, + connection, + m -> null, + SeaTunnelGetRunningJobMetricsCodec::encodeResponse); + } + + @Override + protected Operation prepareOperation() { + return new GetRunningJobMetricsOperation(); + } + + @Override + public String getMethodName() { + return "getRunningJobMetrics"; + } + + @Override + public Object[] getParameters() { + return new Object[0]; + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java index 6d19ada03f5..f3c88b9ebcb 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java @@ -23,6 +23,7 @@ import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobInfoCodec; import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobMetricsCodec; import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobStatusCodec; +import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetRunningJobMetricsCodec; import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelListJobStatusCodec; import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelPrintMessageCodec; import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelSavePointJobCodec; @@ -92,5 +93,9 @@ private void initFactories() { SeaTunnelGetClusterHealthMetricsCodec.REQUEST_MESSAGE_TYPE, (clientMessage, connection) -> new GetClusterHealthMetricsTask(clientMessage, node, connection)); + factories.put( + SeaTunnelGetRunningJobMetricsCodec.REQUEST_MESSAGE_TYPE, + (clientMessage, connection) -> + new GetRunningJobMetricsTask(clientMessage, node, connection)); } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ClientToServerOperationDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ClientToServerOperationDataSerializerHook.java index 4cfef1cd59b..188e4fe0657 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ClientToServerOperationDataSerializerHook.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ClientToServerOperationDataSerializerHook.java @@ -24,6 +24,7 @@ import org.apache.seatunnel.engine.server.operation.GetJobInfoOperation; import org.apache.seatunnel.engine.server.operation.GetJobMetricsOperation; import org.apache.seatunnel.engine.server.operation.GetJobStatusOperation; +import org.apache.seatunnel.engine.server.operation.GetRunningJobMetricsOperation; import org.apache.seatunnel.engine.server.operation.PrintMessageOperation; import org.apache.seatunnel.engine.server.operation.SavePointJobOperation; import org.apache.seatunnel.engine.server.operation.SubmitJobOperation; @@ -60,6 +61,8 @@ public final class ClientToServerOperationDataSerializerHook implements DataSeri public static final int GET_CLUSTER_HEALTH_METRICS = 9; + public static final int GET_RUNNING_JOB_METRICS_OPERATOR = 10; + public static final int FACTORY_ID = FactoryIdHelper.getFactoryId( SeaTunnelFactoryIdConstant.SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY, @@ -100,6 +103,8 @@ public IdentifiedDataSerializable create(int typeId) { return new SavePointJobOperation(); case GET_CLUSTER_HEALTH_METRICS: return new GetClusterHealthMetricsOperation(); + case GET_RUNNING_JOB_METRICS_OPERATOR: + return new GetRunningJobMetricsOperation(); default: throw new IllegalArgumentException("Unknown type id " + typeId); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java index 90d543cee3d..c5b9baeb0da 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java @@ -25,6 +25,7 @@ import org.apache.seatunnel.engine.server.task.operation.CheckTaskGroupIsExecutingOperation; import org.apache.seatunnel.engine.server.task.operation.CleanTaskGroupContextOperation; import org.apache.seatunnel.engine.server.task.operation.DeployTaskOperation; +import org.apache.seatunnel.engine.server.task.operation.GetMetricsOperation; import org.apache.seatunnel.engine.server.task.operation.GetTaskGroupAddressOperation; import org.apache.seatunnel.engine.server.task.operation.GetTaskGroupMetricsOperation; import org.apache.seatunnel.engine.server.task.operation.NotifyTaskStatusOperation; @@ -89,6 +90,8 @@ public class TaskDataSerializerHook implements DataSerializerHook { public static final int CHECK_TASKGROUP_IS_EXECUTING = 21; + public static final int GET_METRICS_OPERATION = 22; + public static final int FACTORY_ID = FactoryIdHelper.getFactoryId( SeaTunnelFactoryIdConstant.SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY, @@ -151,6 +154,8 @@ public IdentifiedDataSerializable create(int typeId) { return new SourceReaderEventOperation(); case CHECK_TASKGROUP_IS_EXECUTING: return new CheckTaskGroupIsExecutingOperation(); + case GET_METRICS_OPERATION: + return new GetMetricsOperation(); default: throw new IllegalArgumentException("Unknown type id " + typeId); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetMetricsOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetMetricsOperation.java new file mode 100644 index 00000000000..15003a641dd --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetMetricsOperation.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.task.operation; + +import org.apache.seatunnel.api.common.metrics.RawJobMetrics; +import org.apache.seatunnel.engine.server.metrics.ZetaMetricsCollector; +import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook; + +import com.hazelcast.cluster.Address; +import com.hazelcast.internal.metrics.MetricDescriptor; +import com.hazelcast.logging.ILogger; +import com.hazelcast.nio.ObjectDataInput; +import com.hazelcast.nio.ObjectDataOutput; +import com.hazelcast.nio.serialization.IdentifiedDataSerializable; +import com.hazelcast.spi.impl.NodeEngineImpl; +import com.hazelcast.spi.impl.operationservice.Operation; + +import java.io.IOException; +import java.util.function.Predicate; + +public class GetMetricsOperation extends Operation implements IdentifiedDataSerializable { + + private Predicate metricDescriptorPredicate; + private RawJobMetrics response; + + public GetMetricsOperation() {} + + public GetMetricsOperation(Predicate metricDescriptorPredicate) { + this.metricDescriptorPredicate = metricDescriptorPredicate; + } + + @Override + public void run() { + ILogger logger = getLogger(); + + Address callerAddress = getCallerAddress(); + + NodeEngineImpl nodeEngine = (NodeEngineImpl) getNodeEngine(); + Address masterAddress = getNodeEngine().getMasterAddress(); + if (!callerAddress.equals(masterAddress)) { + throw new IllegalStateException( + "Caller " + + callerAddress + + " cannot get metrics" + + " because it is not master. Master is: " + + masterAddress); + } + + ZetaMetricsCollector metricsRenderer = + new ZetaMetricsCollector( + metricDescriptorPredicate, nodeEngine.getLocalMember(), logger); + nodeEngine.getMetricsRegistry().collect(metricsRenderer); + response = metricsRenderer.getMetrics(); + } + + @Override + protected void writeInternal(ObjectDataOutput out) throws IOException { + super.writeInternal(out); + out.writeObject(metricDescriptorPredicate); + } + + @Override + protected void readInternal(ObjectDataInput in) throws IOException { + super.readInternal(in); + this.metricDescriptorPredicate = in.readObject(); + } + + @Override + public Object getResponse() { + return response; + } + + @Override + public int getFactoryId() { + return TaskDataSerializerHook.FACTORY_ID; + } + + @Override + public int getClassId() { + return TaskDataSerializerHook.GET_METRICS_OPERATION; + } +}