From 81d32f6d67918f9821f77e7929c5eab04faa1177 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 30 Dec 2024 17:55:00 +0800 Subject: [PATCH] Pipe: Fixed the NPE for pipe heartbeat when there are nodes shutting down (#14584) --- .../pipe/coordinator/runtime/PipeRuntimeCoordinator.java | 4 +--- .../pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java | 8 +++++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java index d960c3fe88a4..5b6369a2c3c9 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java @@ -29,8 +29,6 @@ import org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat.PipeHeartbeat; import org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat.PipeHeartbeatScheduler; -import javax.validation.constraints.NotNull; - import java.nio.ByteBuffer; import java.util.List; import java.util.concurrent.ExecutorService; @@ -106,7 +104,7 @@ public void stopPipeHeartbeat() { public void parseHeartbeat( final int dataNodeId, - @NotNull final List pipeMetaByteBufferListFromDataNode, + /* @Nullable */ final List pipeMetaByteBufferListFromDataNode, /* @Nullable */ final List pipeCompletedListFromAgent, /* @Nullable */ final List pipeRemainingEventCountListFromAgent, /* @Nullable */ final List pipeRemainingTimeListFromAgent) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java index 02ed8cca2fca..547310ce49ca 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java @@ -22,8 +22,6 @@ import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta; -import javax.validation.constraints.NotNull; - import java.nio.ByteBuffer; import java.util.HashMap; import java.util.List; @@ -37,10 +35,14 @@ public class PipeHeartbeat { private final Map remainingTimeMap = new HashMap<>(); public PipeHeartbeat( - @NotNull final List pipeMetaByteBufferListFromAgent, + /* @Nullable */ final List pipeMetaByteBufferListFromAgent, /* @Nullable */ final List pipeCompletedListFromAgent, /* @Nullable */ final List pipeRemainingEventCountListFromAgent, /* @Nullable */ final List pipeRemainingTimeListFromAgent) { + // Pipe meta may be null for nodes shutting down, return empty heartbeat + if (Objects.isNull(pipeMetaByteBufferListFromAgent)) { + return; + } for (int i = 0; i < pipeMetaByteBufferListFromAgent.size(); ++i) { final PipeMeta pipeMeta = PipeMeta.deserialize4TaskAgent(pipeMetaByteBufferListFromAgent.get(i));