Skip to content

Commit

Permalink
[Bugfix][zeta] Fix the serialization issue of GetMetricsOperation dur…
Browse files Browse the repository at this point in the history
…ing multi-node operation. (#5206)
  • Loading branch information
ic4y authored Aug 9, 2023
1 parent 9d3c3de commit faeab89
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import static org.apache.seatunnel.api.common.metrics.MetricTags.JOB_ID;
import static org.apache.seatunnel.engine.server.metrics.JobMetricsUtil.toJobMetricsMap;

public class CoordinatorService {
Expand Down Expand Up @@ -568,17 +567,7 @@ public Map<Long, JobMetrics> getRunningJobMetrics() {
(RawJobMetrics)
NodeEngineUtil.sendOperationToMemberNode(
nodeEngine,
new GetMetricsOperation(
dis ->
(dis.tagValue(JOB_ID)
!= null
&& runningJobIds
.contains(
Long
.parseLong(
dis
.tagValue(
JOB_ID))))),
new GetMetricsOperation(runningJobIds),
address)
.get();
metrics.add(rawJobMetrics);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,22 @@
import com.hazelcast.spi.impl.operationservice.Operation;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.function.Predicate;

public class GetMetricsOperation extends Operation implements IdentifiedDataSerializable {
import static org.apache.seatunnel.api.common.metrics.MetricTags.JOB_ID;

private Predicate<MetricDescriptor> metricDescriptorPredicate;
public class GetMetricsOperation extends Operation implements IdentifiedDataSerializable {
private RawJobMetrics response;
private Set<Long> runningJobIds;

public GetMetricsOperation() {}

public GetMetricsOperation(Predicate<MetricDescriptor> metricDescriptorPredicate) {
this.metricDescriptorPredicate = metricDescriptorPredicate;
public GetMetricsOperation(Set<Long> runningJobIds) {
this.runningJobIds = runningJobIds;
}

@Override
Expand All @@ -60,6 +65,10 @@ public void run() {
+ " because it is not master. Master is: "
+ masterAddress);
}
Predicate<MetricDescriptor> metricDescriptorPredicate =
dis ->
(dis.tagValue(JOB_ID) != null
&& runningJobIds.contains(Long.parseLong(dis.tagValue(JOB_ID))));

ZetaMetricsCollector metricsRenderer =
new ZetaMetricsCollector(
Expand All @@ -71,13 +80,15 @@ public void run() {
@Override
protected void writeInternal(ObjectDataOutput out) throws IOException {
super.writeInternal(out);
out.writeObject(metricDescriptorPredicate);
out.writeLongArray(runningJobIds.stream().mapToLong(Long::longValue).toArray());
}

@Override
protected void readInternal(ObjectDataInput in) throws IOException {
super.readInternal(in);
this.metricDescriptorPredicate = in.readObject();
this.runningJobIds =
Arrays.stream(Objects.requireNonNull(in.readLongArray()))
.collect(HashSet::new, HashSet::add, HashSet::addAll);
}

@Override
Expand Down

0 comments on commit faeab89

Please sign in to comment.