diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java index ef2e356c3dd..ced00d9e660 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java @@ -76,7 +76,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; -import static org.apache.seatunnel.api.common.metrics.MetricTags.JOB_ID; import static org.apache.seatunnel.engine.server.metrics.JobMetricsUtil.toJobMetricsMap; public class CoordinatorService { @@ -568,17 +567,7 @@ public Map getRunningJobMetrics() { (RawJobMetrics) NodeEngineUtil.sendOperationToMemberNode( nodeEngine, - new GetMetricsOperation( - dis -> - (dis.tagValue(JOB_ID) - != null - && runningJobIds - .contains( - Long - .parseLong( - dis - .tagValue( - JOB_ID))))), + new GetMetricsOperation(runningJobIds), address) .get(); metrics.add(rawJobMetrics); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetMetricsOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetMetricsOperation.java index 15003a641dd..8d9c5d7f987 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetMetricsOperation.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetMetricsOperation.java @@ -31,17 +31,22 @@ import com.hazelcast.spi.impl.operationservice.Operation; import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; import java.util.function.Predicate; -public class GetMetricsOperation extends Operation implements IdentifiedDataSerializable { +import static org.apache.seatunnel.api.common.metrics.MetricTags.JOB_ID; - private Predicate metricDescriptorPredicate; +public class GetMetricsOperation extends Operation implements IdentifiedDataSerializable { private RawJobMetrics response; + private Set runningJobIds; public GetMetricsOperation() {} - public GetMetricsOperation(Predicate metricDescriptorPredicate) { - this.metricDescriptorPredicate = metricDescriptorPredicate; + public GetMetricsOperation(Set runningJobIds) { + this.runningJobIds = runningJobIds; } @Override @@ -60,6 +65,10 @@ public void run() { + " because it is not master. Master is: " + masterAddress); } + Predicate metricDescriptorPredicate = + dis -> + (dis.tagValue(JOB_ID) != null + && runningJobIds.contains(Long.parseLong(dis.tagValue(JOB_ID)))); ZetaMetricsCollector metricsRenderer = new ZetaMetricsCollector( @@ -71,13 +80,15 @@ public void run() { @Override protected void writeInternal(ObjectDataOutput out) throws IOException { super.writeInternal(out); - out.writeObject(metricDescriptorPredicate); + out.writeLongArray(runningJobIds.stream().mapToLong(Long::longValue).toArray()); } @Override protected void readInternal(ObjectDataInput in) throws IOException { super.readInternal(in); - this.metricDescriptorPredicate = in.readObject(); + this.runningJobIds = + Arrays.stream(Objects.requireNonNull(in.readLongArray())) + .collect(HashSet::new, HashSet::add, HashSet::addAll); } @Override