diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java index 9a706f1756dc1..35ba2865eeaad 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java @@ -44,8 +44,6 @@ public class FileStateHandle implements StreamStateHandle { /** The size of the state in the file. */ private final long stateSize; - private final PhysicalStateHandleID physicalID; - /** * Creates a new file state for the given file path. * @@ -55,7 +53,6 @@ public FileStateHandle(Path filePath, long stateSize) { checkArgument(stateSize >= -1); this.filePath = checkNotNull(filePath); this.stateSize = stateSize; - this.physicalID = new PhysicalStateHandleID(filePath.toUri().toString()); } /** @@ -79,7 +76,7 @@ public Optional asBytesIfInMemory() { @Override public PhysicalStateHandleID getStreamStateHandleID() { - return physicalID; + return new PhysicalStateHandleID(filePath.toUri().toString()); } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java index d64ed7093bb13..6f73c18d4d97e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java @@ -41,13 +41,10 @@ public class ByteStreamStateHandle implements StreamStateHandle { */ private final String handleName; - private final PhysicalStateHandleID physicalID; - /** Creates a new ByteStreamStateHandle containing the given data. */ public ByteStreamStateHandle(String handleName, byte[] data) { this.handleName = Preconditions.checkNotNull(handleName); this.data = Preconditions.checkNotNull(data); - this.physicalID = new PhysicalStateHandleID(handleName); } @Override @@ -62,7 +59,7 @@ public Optional asBytesIfInMemory() { @Override public PhysicalStateHandleID getStreamStateHandleID() { - return physicalID; + return new PhysicalStateHandleID(handleName); } public byte[] getData() {