From c446b9bd6320aa93e9363640c06166320d3f53a5 Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Wed, 29 May 2024 13:12:54 +0200 Subject: [PATCH] KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide (#15601) The introduced changes provide a cleaner definition of the join side in KStreamKStreamJoin. Before, this was done by using a Boolean flag, which led to returning a raw LeftOrRightValue without generic arguments because the generic type arguments depended on the boolean input. Reviewers: Greg Harris , Bruno Cadonna --- .../kstream/internals/KStreamImplJoin.java | 6 +- .../kstream/internals/KStreamKStreamJoin.java | 160 +++++++++--------- .../internals/KStreamKStreamJoinLeftSide.java | 71 ++++++++ .../KStreamKStreamJoinRightSide.java | 70 ++++++++ .../state/internals/LeftOrRightValue.java | 15 -- .../internals/TimestampedKeyAndJoinSide.java | 22 ++- ...TimestampedKeyAndJoinSideDeserializer.java | 5 +- .../internals/KStreamKStreamJoinTest.java | 3 +- ...mestampedKeyAndJoinSideSerializerTest.java | 6 +- 9 files changed, 244 insertions(+), 114 deletions(-) create mode 100644 streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinLeftSide.java create mode 100644 streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinRightSide.java diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java index 6b70d2702c2bc..394c130058801 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java @@ -172,8 +172,7 @@ public KStream join(final KStream lhs, final TimeTrackerSupplier sharedTimeTrackerSupplier = new TimeTrackerSupplier(); final JoinWindowsInternal internalWindows = new JoinWindowsInternal(windows); - final KStreamKStreamJoin joinThis = new KStreamKStreamJoin<>( - true, + final KStreamKStreamJoinLeftSide joinThis = new KStreamKStreamJoinLeftSide<>( otherWindowStore.name(), internalWindows, joiner, @@ -182,8 +181,7 @@ public KStream join(final KStream lhs, sharedTimeTrackerSupplier ); - final KStreamKStreamJoin joinOther = new KStreamKStreamJoin<>( - false, + final KStreamKStreamJoinRightSide joinOther = new KStreamKStreamJoinRightSide<>( thisWindowStore.name(), internalWindows, AbstractStream.reverseJoinerWithKey(joiner), diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java index b8b48ff2c4df6..5646802aa9492 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java @@ -23,7 +23,6 @@ import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker; import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTrackerSupplier; import org.apache.kafka.streams.processor.api.ContextualProcessor; -import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.processor.api.Record; @@ -43,7 +42,7 @@ import static org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX; import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; -class KStreamKStreamJoin implements ProcessorSupplier { +abstract class KStreamKStreamJoin implements ProcessorSupplier { private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamJoin.class); private final String otherWindowName; @@ -55,28 +54,22 @@ class KStreamKStreamJoin implements ProcessorSupplier outerJoinWindowName; - private final ValueJoinerWithKey joiner; + private final ValueJoinerWithKey joiner; private final TimeTrackerSupplier sharedTimeTrackerSupplier; - KStreamKStreamJoin(final boolean isLeftSide, - final String otherWindowName, + KStreamKStreamJoin(final String otherWindowName, final JoinWindowsInternal windows, - final ValueJoinerWithKey joiner, + final ValueJoinerWithKey joiner, final boolean outer, final Optional outerJoinWindowName, + final long joinBeforeMs, + final long joinAfterMs, final TimeTrackerSupplier sharedTimeTrackerSupplier) { - this.isLeftSide = isLeftSide; this.otherWindowName = otherWindowName; - if (isLeftSide) { - this.joinBeforeMs = windows.beforeMs; - this.joinAfterMs = windows.afterMs; - } else { - this.joinBeforeMs = windows.afterMs; - this.joinAfterMs = windows.beforeMs; - } + this.joinBeforeMs = joinBeforeMs; + this.joinAfterMs = joinAfterMs; this.windowsAfterMs = windows.afterMs; this.windowsBeforeMs = windows.beforeMs; this.joinGraceMs = windows.gracePeriodMs(); @@ -87,15 +80,10 @@ class KStreamKStreamJoin implements ProcessorSupplier get() { - return new KStreamKStreamJoinProcessor(); - } - - private class KStreamKStreamJoinProcessor extends ContextualProcessor { - private WindowStore otherWindowStore; + protected abstract class KStreamKStreamJoinProcessor extends ContextualProcessor { + private WindowStore otherWindowStore; private Sensor droppedRecordsSensor; - private Optional, LeftOrRightValue>> outerJoinStore = Optional.empty(); + private Optional, LeftOrRightValue>> outerJoinStore = Optional.empty(); private InternalProcessorContext internalProcessorContext; private TimeTracker sharedTimeTracker; @@ -122,13 +110,10 @@ public void init(final ProcessorContext context) { } } - @SuppressWarnings("unchecked") @Override - public void process(final Record record) { + public void process(final Record record) { final long inputRecordTimestamp = record.timestamp(); - final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs); - final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs); sharedTimeTracker.advanceStreamTime(inputRecordTimestamp); @@ -144,26 +129,11 @@ public void process(final Record record) { outerJoinStore.ifPresent(store -> emitNonJoinedOuterRecords(store, record)); } - boolean needOuterJoin = outer; - try (final WindowStoreIterator iter = otherWindowStore.fetch(record.key(), timeFrom, timeTo)) { - while (iter.hasNext()) { - needOuterJoin = false; - final KeyValue otherRecord = iter.next(); - final long otherRecordTimestamp = otherRecord.key; - - outerJoinStore.ifPresent(store -> { - // use putIfAbsent to first read and see if there's any values for the key, - // if yes delete the key, otherwise do not issue a put; - // we may delete some values with the same key early but since we are going - // range over all values of the same key even after failure, since the other window-store - // is only cleaned up by stream time, so this is okay for at-least-once. - store.putIfAbsent(TimestampedKeyAndJoinSide.make(!isLeftSide, record.key(), otherRecordTimestamp), null); - }); - - context().forward( - record.withValue(joiner.apply(record.key(), record.value(), otherRecord.value)) - .withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp))); - } + final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs); + final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs); + try (final WindowStoreIterator iter = otherWindowStore.fetch(record.key(), timeFrom, timeTo)) { + final boolean needOuterJoin = outer && !iter.hasNext(); + iter.forEachRemaining(otherRecord -> emitInnerJoin(record, otherRecord, inputRecordTimestamp)); if (needOuterJoin) { // The maxStreamTime contains the max time observed in both sides of the join. @@ -187,17 +157,24 @@ public void process(final Record record) { context().forward(record.withValue(joiner.apply(record.key(), record.value(), null))); } else { sharedTimeTracker.updatedMinTime(inputRecordTimestamp); - outerJoinStore.ifPresent(store -> store.put( - TimestampedKeyAndJoinSide.make(isLeftSide, record.key(), inputRecordTimestamp), - LeftOrRightValue.make(isLeftSide, record.value()))); + putInOuterJoinStore(record); } } } } - private void emitNonJoinedOuterRecords( - final KeyValueStore, LeftOrRightValue> store, - final Record record) { + protected abstract TimestampedKeyAndJoinSide makeThisKey(final K key, final long inputRecordTimestamp); + + protected abstract LeftOrRightValue makeThisValue(final VThis thisValue); + + protected abstract TimestampedKeyAndJoinSide makeOtherKey(final K key, final long timestamp); + + protected abstract VThis getThisValue(final LeftOrRightValue leftOrRightValue); + + protected abstract VOther getOtherValue(final LeftOrRightValue leftOrRightValue); + + private void emitNonJoinedOuterRecords(final KeyValueStore, LeftOrRightValue> store, + final Record record) { // calling `store.all()` creates an iterator what is an expensive operation on RocksDB; // to reduce runtime cost, we try to avoid paying those cost @@ -221,26 +198,24 @@ private void emitNonJoinedOuterRecords( // reset to MAX_VALUE in case the store is empty sharedTimeTracker.minTime = Long.MAX_VALUE; - try (final KeyValueIterator, LeftOrRightValue> it = store.all()) { + try (final KeyValueIterator, LeftOrRightValue> it = store.all()) { TimestampedKeyAndJoinSide prevKey = null; boolean outerJoinLeftWindowOpen = false; boolean outerJoinRightWindowOpen = false; while (it.hasNext()) { + final KeyValue, LeftOrRightValue> nextKeyValue = it.next(); + final TimestampedKeyAndJoinSide timestampedKeyAndJoinSide = nextKeyValue.key; + sharedTimeTracker.minTime = timestampedKeyAndJoinSide.getTimestamp(); if (outerJoinLeftWindowOpen && outerJoinRightWindowOpen) { // if windows are open for both joinSides we can break since there are no more candidates to emit break; } - final KeyValue, LeftOrRightValue> next = it.next(); - final TimestampedKeyAndJoinSide timestampedKeyAndJoinSide = next.key; - final long timestamp = timestampedKeyAndJoinSide.getTimestamp(); - sharedTimeTracker.minTime = timestamp; // Continue with the next outer record if window for this joinSide has not closed yet // There might be an outer record for the other joinSide which window has not closed yet // We rely on the ordering of KeyValueIterator - final long outerJoinLookBackTimeMs = getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide); - if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + joinGraceMs >= sharedTimeTracker.streamTime) { + if (isOuterJoinWindowOpen(timestampedKeyAndJoinSide)) { if (timestampedKeyAndJoinSide.isLeftSide()) { outerJoinLeftWindowOpen = true; // there are no more candidates to emit on left-outerJoin-side } else { @@ -249,13 +224,9 @@ private void emitNonJoinedOuterRecords( // We continue with the next outer record continue; } - - final K key = timestampedKeyAndJoinSide.getKey(); - final LeftOrRightValue leftOrRightValue = next.value; - final VOut nullJoinedValue = getNullJoinedValue(key, leftOrRightValue); - context().forward( - record.withKey(key).withValue(nullJoinedValue).withTimestamp(timestamp) - ); + + final LeftOrRightValue leftOrRightValue = nextKeyValue.value; + forwardNonJoinedOuterRecords(record, timestampedKeyAndJoinSide, leftOrRightValue); if (prevKey != null && !prevKey.equals(timestampedKeyAndJoinSide)) { // blind-delete the previous key from the outer window store now it is emitted; @@ -275,20 +246,22 @@ private void emitNonJoinedOuterRecords( } } - @SuppressWarnings("unchecked") - private VOut getNullJoinedValue( - final K key, - final LeftOrRightValue leftOrRightValue) { - // depending on the JoinSide fill in the joiner key and joiner values - if (isLeftSide) { - return joiner.apply(key, - leftOrRightValue.getLeftValue(), - leftOrRightValue.getRightValue()); - } else { - return joiner.apply(key, - (V1) leftOrRightValue.getRightValue(), - (V2) leftOrRightValue.getLeftValue()); - } + private void forwardNonJoinedOuterRecords(final Record record, + final TimestampedKeyAndJoinSide timestampedKeyAndJoinSide, + final LeftOrRightValue leftOrRightValue) { + final K key = timestampedKeyAndJoinSide.getKey(); + final long timestamp = timestampedKeyAndJoinSide.getTimestamp(); + final VThis thisValue = getThisValue(leftOrRightValue); + final VOther otherValue = getOtherValue(leftOrRightValue); + final VOut nullJoinedValue = joiner.apply(key, thisValue, otherValue); + context().forward( + record.withKey(key).withValue(nullJoinedValue).withTimestamp(timestamp) + ); + } + + private boolean isOuterJoinWindowOpen(final TimestampedKeyAndJoinSide timestampedKeyAndJoinSide) { + final long outerJoinLookBackTimeMs = getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide); + return sharedTimeTracker.minTime + outerJoinLookBackTimeMs + joinGraceMs >= sharedTimeTracker.streamTime; } private long getOuterJoinLookBackTimeMs( @@ -301,6 +274,31 @@ private long getOuterJoinLookBackTimeMs( } } + private void emitInnerJoin(final Record thisRecord, final KeyValue otherRecord, + final long inputRecordTimestamp) { + outerJoinStore.ifPresent(store -> { + // use putIfAbsent to first read and see if there's any values for the key, + // if yes delete the key, otherwise do not issue a put; + // we may delete some values with the same key early but since we are going + // range over all values of the same key even after failure, since the other window-store + // is only cleaned up by stream time, so this is okay for at-least-once. + final TimestampedKeyAndJoinSide otherKey = makeOtherKey(thisRecord.key(), otherRecord.key); + store.putIfAbsent(otherKey, null); + }); + + context().forward( + thisRecord.withValue(joiner.apply(thisRecord.key(), thisRecord.value(), otherRecord.value)) + .withTimestamp(Math.max(inputRecordTimestamp, otherRecord.key))); + } + + private void putInOuterJoinStore(final Record thisRecord) { + outerJoinStore.ifPresent(store -> { + final TimestampedKeyAndJoinSide thisKey = makeThisKey(thisRecord.key(), thisRecord.timestamp()); + final LeftOrRightValue thisValue = makeThisValue(thisRecord.value()); + store.put(thisKey, thisValue); + }); + } + @Override public void close() { sharedTimeTrackerSupplier.remove(context().taskId()); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinLeftSide.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinLeftSide.java new file mode 100644 index 0000000000000..3b4ee5b33930a --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinLeftSide.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.kstream.ValueJoinerWithKey; +import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTrackerSupplier; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.state.internals.LeftOrRightValue; +import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide; + +import java.util.Optional; + +class KStreamKStreamJoinLeftSide extends KStreamKStreamJoin { + + KStreamKStreamJoinLeftSide(final String otherWindowName, + final JoinWindowsInternal windows, + final ValueJoinerWithKey joiner, + final boolean outer, + final Optional outerJoinWindowName, + final TimeTrackerSupplier sharedTimeTrackerSupplier) { + super(otherWindowName, windows, joiner, outer, outerJoinWindowName, windows.beforeMs, windows.afterMs, + sharedTimeTrackerSupplier); + } + + @Override + public Processor get() { + return new KStreamKStreamJoinLeftProcessor(); + } + + private class KStreamKStreamJoinLeftProcessor extends KStreamKStreamJoinProcessor { + + @Override + public TimestampedKeyAndJoinSide makeThisKey(final K key, final long timestamp) { + return TimestampedKeyAndJoinSide.makeLeft(key, timestamp); + } + + @Override + public LeftOrRightValue makeThisValue(final VLeft thisValue) { + return LeftOrRightValue.makeLeftValue(thisValue); + } + + @Override + public TimestampedKeyAndJoinSide makeOtherKey(final K key, final long timestamp) { + return TimestampedKeyAndJoinSide.makeRight(key, timestamp); + } + + @Override + public VLeft getThisValue(final LeftOrRightValue leftOrRightValue) { + return leftOrRightValue.getLeftValue(); + } + + @Override + public VRight getOtherValue(final LeftOrRightValue leftOrRightValue) { + return leftOrRightValue.getRightValue(); + } + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinRightSide.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinRightSide.java new file mode 100644 index 0000000000000..e4bcfe4e10517 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinRightSide.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.kstream.ValueJoinerWithKey; +import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTrackerSupplier; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.state.internals.LeftOrRightValue; +import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide; + +import java.util.Optional; + +class KStreamKStreamJoinRightSide extends KStreamKStreamJoin { + + KStreamKStreamJoinRightSide(final String otherWindowName, + final JoinWindowsInternal windows, + final ValueJoinerWithKey joiner, + final boolean outer, + final Optional outerJoinWindowName, + final TimeTrackerSupplier sharedTimeTrackerSupplier) { + super(otherWindowName, windows, joiner, outer, outerJoinWindowName, windows.afterMs, windows.beforeMs, + sharedTimeTrackerSupplier); + } + + @Override + public Processor get() { + return new KStreamKStreamRightJoinProcessor(); + } + + private class KStreamKStreamRightJoinProcessor extends KStreamKStreamJoinProcessor { + @Override + public TimestampedKeyAndJoinSide makeThisKey(final K key, final long timestamp) { + return TimestampedKeyAndJoinSide.makeRight(key, timestamp); + } + + @Override + public LeftOrRightValue makeThisValue(final VRight thisValue) { + return LeftOrRightValue.makeRightValue(thisValue); + } + + @Override + public TimestampedKeyAndJoinSide makeOtherKey(final K key, final long timestamp) { + return TimestampedKeyAndJoinSide.makeLeft(key, timestamp); + } + + @Override + public VRight getThisValue(final LeftOrRightValue leftOrRightValue) { + return leftOrRightValue.getRightValue(); + } + + @Override + public VLeft getOtherValue(final LeftOrRightValue leftOrRightValue) { + return leftOrRightValue.getLeftValue(); + } + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java index bb7b516eddc39..5f5e0ab7dd717 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java @@ -63,21 +63,6 @@ public static LeftOrRightValue makeRightValue(final V2 rightVal return new LeftOrRightValue<>(null, rightValue); } - /** - * Create a new {@link LeftOrRightValue} instance with the V value as {@code leftValue} if - * {@code isLeftSide} is True; otherwise {@code rightValue} if {@code isLeftSide} is False. - * - * @param value the V value (either V1 or V2 type) - * @param the type of the value - * @return a new {@link LeftOrRightValue} instance - */ - public static LeftOrRightValue make(final boolean isLeftSide, final V value) { - Objects.requireNonNull(value, "value is null"); - return isLeftSide - ? LeftOrRightValue.makeLeftValue(value) - : LeftOrRightValue.makeRightValue(value); - } - public V1 getLeftValue() { return leftValue; } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSide.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSide.java index c0516e101540e..3b799e815399f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSide.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSide.java @@ -42,17 +42,25 @@ private TimestampedKeyAndJoinSide(final boolean leftSide, final K key, final lon } /** - * Create a new {@link TimestampedKeyAndJoinSide} instance if the provide {@code key} is not {@code null}. + * Create a new {@link TimestampedKeyAndJoinSide} instance for the left join side if the provide {@code key} is not {@code null}. * - * @param leftSide True if the key is part of the left join side; False if it is from the right join side * @param key the key * @param the type of the key - * @return a new {@link TimestampedKeyAndJoinSide} instance if the provide {@code key} is not {@code null} + * @return a new {@link TimestampedKeyAndJoinSide} instance for the left join side if the provide {@code key} is not {@code null} */ - public static TimestampedKeyAndJoinSide make(final boolean leftSide, final K key, final long timestamp) { - return new TimestampedKeyAndJoinSide<>(leftSide, key, timestamp); + public static TimestampedKeyAndJoinSide makeLeft(final K key, final long timestamp) { + return new TimestampedKeyAndJoinSide<>(true, key, timestamp); + } + /** + * Create a new {@link TimestampedKeyAndJoinSide} instance for the right join side if the provide {@code key} is not {@code null}. + * + * @param key the key + * @param the type of the key + * @return a new {@link TimestampedKeyAndJoinSide} instance for the right join side if the provide {@code key} is not {@code null} + */ + public static TimestampedKeyAndJoinSide makeRight(final K key, final long timestamp) { + return new TimestampedKeyAndJoinSide<>(false, key, timestamp); } - public boolean isLeftSide() { return leftSide; } @@ -89,4 +97,4 @@ public boolean equals(final Object o) { public int hashCode() { return Objects.hash(leftSide, key, timestamp); } -} \ No newline at end of file +} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSideDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSideDeserializer.java index 9ecea46c84e78..4b035eff6d873 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSideDeserializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSideDeserializer.java @@ -55,11 +55,12 @@ public void configure(final Map configs, final boolean isKey) { @Override public TimestampedKeyAndJoinSide deserialize(final String topic, final byte[] data) { - final boolean bool = data[StateSerdes.TIMESTAMP_SIZE] == 1; + final boolean isLeft = data[StateSerdes.TIMESTAMP_SIZE] == 1; final K key = keyDeserializer.deserialize(topic, rawKey(data)); final long timestamp = timestampDeserializer.deserialize(topic, rawTimestamp(data)); - return TimestampedKeyAndJoinSide.make(bool, key, timestamp); + return isLeft ? TimestampedKeyAndJoinSide.makeLeft(key, timestamp) : + TimestampedKeyAndJoinSide.makeRight(key, timestamp); } private byte[] rawTimestamp(final byte[] data) { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java index 7fd62bb78bddd..3c602ee8d5d7e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java @@ -456,8 +456,7 @@ public void shouldThrottleEmitNonJoinedOuterRecordsEvenWhenClockDrift() { * This test is testing something internal to [[KStreamKStreamJoin]], so we had to setup low-level api manually. */ final KStreamImplJoin.TimeTrackerSupplier tracker = new KStreamImplJoin.TimeTrackerSupplier(); - final KStreamKStreamJoin join = new KStreamKStreamJoin<>( - false, + final KStreamKStreamJoinRightSide join = new KStreamKStreamJoinRightSide<>( "other", new JoinWindowsInternal(JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(1000))), (key, v1, v2) -> v1 + v2, diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSideSerializerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSideSerializerTest.java index 5cca8f6ba54da..8f4191fb8c6a0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSideSerializerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSideSerializerTest.java @@ -34,7 +34,7 @@ public class TimestampedKeyAndJoinSideSerializerTest { public void shouldSerializeKeyWithJoinSideAsTrue() { final String value = "some-string"; - final TimestampedKeyAndJoinSide timestampedKeyAndJoinSide = TimestampedKeyAndJoinSide.make(true, value, 10); + final TimestampedKeyAndJoinSide timestampedKeyAndJoinSide = TimestampedKeyAndJoinSide.makeLeft(value, 10); final byte[] serialized = STRING_SERDE.serializer().serialize(TOPIC, timestampedKeyAndJoinSide); @@ -51,7 +51,7 @@ public void shouldSerializeKeyWithJoinSideAsTrue() { public void shouldSerializeKeyWithJoinSideAsFalse() { final String value = "some-string"; - final TimestampedKeyAndJoinSide timestampedKeyAndJoinSide = TimestampedKeyAndJoinSide.make(false, value, 20); + final TimestampedKeyAndJoinSide timestampedKeyAndJoinSide = TimestampedKeyAndJoinSide.makeRight(value, 20); final byte[] serialized = STRING_SERDE.serializer().serialize(TOPIC, timestampedKeyAndJoinSide); @@ -67,6 +67,6 @@ public void shouldSerializeKeyWithJoinSideAsFalse() { @Test public void shouldThrowIfSerializeNullData() { assertThrows(NullPointerException.class, - () -> STRING_SERDE.serializer().serialize(TOPIC, TimestampedKeyAndJoinSide.make(true, null, 0))); + () -> STRING_SERDE.serializer().serialize(TOPIC, TimestampedKeyAndJoinSide.makeLeft(null, 0))); } }