Skip to content

Commit

Permalink
Fix JmsIO NPE (#32489)
Browse files Browse the repository at this point in the history
  • Loading branch information
Abacn committed Sep 18, 2024
1 parent 3105260 commit ef53217
Showing 1 changed file with 23 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
Expand Down Expand Up @@ -586,7 +587,7 @@ public boolean requiresDeduping() {
}

static class UnboundedJmsReader<T> extends UnboundedReader<T> {

private static final byte[] EMPTY = new byte[0];
private UnboundedJmsSource<T> source;
@VisibleForTesting JmsCheckpointMark.Preparer checkpointMarkPreparer;
private Connection connection;
Expand All @@ -604,7 +605,7 @@ public UnboundedJmsReader(UnboundedJmsSource<T> source, PipelineOptions options)
this.source = source;
this.checkpointMarkPreparer = JmsCheckpointMark.newPreparer();
this.currentMessage = null;
this.currentID = new byte[0];
this.currentID = EMPTY;
this.options = options;
}

Expand Down Expand Up @@ -684,18 +685,22 @@ public boolean advance() throws IOException {
currentTimestamp = new Instant(message.getJMSTimestamp());

String messageID = message.getJMSMessageID();
if (this.source.spec.isRequiresDeduping()) {
// per JMS specification, message ID has prefix "id:". The runner use it to dedup message.
// Empty or non-exist message id (possible for optimization configuration set) will induce
// data loss.
if (messageID.length() <= 3) {
throw new RuntimeException(
String.format(
"Invalid JMSMessageID %s while requiresDeduping is set. Data loss possible.",
messageID));
if (messageID != null) {
if (this.source.spec.isRequiresDeduping()) {
// per JMS specification, message ID has prefix "id:". The runner use it to dedup
// message. Empty or non-exist message id (possible for optimization configuration set)
// will cause data loss.
if (messageID.length() <= 3) {
throw new RuntimeException(
String.format(
"Invalid JMSMessageID %s while requiresDeduping is set. Data loss possible.",
messageID));
}
}
currentID = messageID.getBytes(StandardCharsets.UTF_8);
} else {
currentID = EMPTY;
}
currentID = messageID.getBytes(StandardCharsets.UTF_8);

return true;
} catch (Exception e) {
Expand Down Expand Up @@ -728,6 +733,12 @@ public Instant getCurrentTimestamp() {
public byte[] getCurrentRecordId() {
if (currentMessage == null) {
throw new NoSuchElementException();
} else if (currentID == EMPTY && this.source.spec.isRequiresDeduping()) {
LOG.warn(
"Empty JMSRecordID received when requiresDeduping enabled, runner deduplication will"
+ " not be effective");
// Return a random UUID to ensure it won't get dedup
currentID = UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8);
}
return currentID;
}
Expand Down

0 comments on commit ef53217

Please sign in to comment.