-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide #15601
Conversation
streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @raminqaf Thanks for picking up this ticket.
I'm not very familiar with Streams, so my comments aren't blocking, and I'll let a Streams committer take a look at this before merge.
Cool idea to use an enum for the refactor!
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/state/internals/JoinSide.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/state/internals/JoinSide.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/state/internals/JoinSide.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java
Show resolved
Hide resolved
...ams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueDeserializer.java
Outdated
Show resolved
Hide resolved
This PR looks like it'll have merge conflicts with #15510. Since that is fixing a bug it should probably have higher priority than this refactor, can you rebase on top of their changes? Alternatively we can wait for that to land first before returning to this PR. |
@gharris1727 I have broken down the KStreamKstreamJoin class into two classes. For now, I just moved the code (+the fix in #15510) to see if all the tests pass and if I am going in the correct direction. I managed to get rid of the unsafe type casts appropriately. I changed the PR into a draft because it depends on #15510. Please have a look and let me know if this is going in the correct direction. |
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoin.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamRightJoin.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @raminqaf I think this is in the right direction. I have some high-level concerns that I'll mention here instead of pointing out individual problems:
Can you please revert changes where things are re-ordered or have spacing changes, but don't have any semantic changes? I saw this for imports, constructor arguments, and some multi-line statements. It would be easier to review and generate less merge conflicts if these lines did not change.
I think the current refactor of the process and emitNonJoinedOuterRecords methods introduces too much duplication. I think you can keep most of the implementation inside of KStreamKStreamJoinProcessor if you give it generic arguments, and have small abstract methods whenever the generics are insufficient. I pointed out two simplifications for the outerJoinStore and getNullJoinedValue that I think demonstrate the idea, but there are certainly other abstract methods needed.
streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSide.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoin.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoin.java
Outdated
Show resolved
Hide resolved
dc60885
to
0e3cf15
Compare
…/KAFKA-16362 # Conflicts: # streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSide.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoin.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @raminqaf this is looking really good! I like how there is a separation between the this/other used in the joiner and Processor signatures, and left/right used in the outerJoinStore.
I really only had nits, as this accomplishes the type-safety I was looking for.
I do think there are still a lot of non-semantic changes to ordering and indenting that I think could be addressed before a Streams committer takes a look at this, and would make their review faster and easier.
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoin.java
Outdated
Show resolved
Hide resolved
@gharris1727 Thanks for the feedback! I reverted all the changes you requested and reverted a couple of other indentation problems that caused a diff. I can even go further and revert & inline the introduce private methods (i.e., |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I'm reaching the end of my useful review comments, and this is nearly ready for a streams reviewer. @mjsax @ableegoldman @guozhangwang do any of you have time to take a look at this?
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
Show resolved
Hide resolved
Thanks Greg. If no one's had time to look at this by next week, we'll assign a reviewer during the next Kafka Streams hangout |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@raminqaf Thanks for the PR!
I did a quick pass. I will go more into details in the coming days.
@@ -43,7 +42,7 @@ | |||
import static org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX; | |||
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; | |||
|
|||
class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K, VOut> { | |||
abstract class KStreamKStreamJoin<K, VL, VR, VOut, VThis, VOther> implements ProcessorSupplier<K, VThis, K, VOut> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not get the difference between VL
and VThis
as well as VR
and VOther
. Could you please elaborate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also a better name for VL
and VR
would be VLeft
and VRight
, respectively. VR
is sometimes used for result value type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We needed a way to detect the type of the side being called dynamically so that the super class can handle them based on the caller. This also ensures type safety.
When this class is called by the left side of the join VThis
would be left left type (VLeft
) and VOther
would be right type (VRight
).
class KStreamKStreamLeftJoinSide<K, VLeft, VRight, VOut> extends KStreamKStreamJoin<K, VLeft, VRight, VOut, VLeft, VRight> {
--^-- --^---
VThis VOther
Whereas for the right side of the join VThis
would be right left type (VRight
) and VOther
would be left type (VLeft
).
class KStreamKStreamRightJoinSide<K, VLeft, VRight, VOut> extends KStreamKStreamJoin<K, VLeft, VRight, VOut, VRight, VLeft>
--^-- --^---
VThis VOther
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is more tangible if we look at the abstract methods:
protected abstract VThis getThisValue(final LeftOrRightValue<? extends VLeft, ? extends VRight> leftOrRightValue);
protected abstract VOther getOtherValue(final LeftOrRightValue<? extends VLeft, ? extends VRight> leftOrRightValue);
Depending on which join side implements these methods, VThis
and VOther
can change. For instance, the getThisValue()
method on the left side of the join would be:
@Override
public VLeft getThisValue(final LeftOrRightValue<? extends VLeft, ? extends VRight> leftOrRightValue) {
return leftOrRightValue.getLeftValue();
}
Respectively, on the right side we have
@Override
public VRight getThisValue(final LeftOrRightValue<? extends VLeft, ? extends VRight> leftOrRightValue) {
return leftOrRightValue.getRightValue();
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to throw my own explanation in here:
VLeft and VRight are "absolute" in that they are the left and right types of the overall join. Both sides of the join have equivalent VLeft and VRight types, because they share a common outerJoinStore instance.
VThis and VOther are "relative" in that they are the type of records entering "this" side of the join, and the "other" side of the join, and this is necessarily swapped for the other side of the join.
|
||
import java.util.Optional; | ||
|
||
class KStreamKStreamLeftJoin<K, VL, VR, VOut> extends KStreamKStreamJoin<K, VL, VR, VOut, VL, VR> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean that this is the left side of the join or that this is a left join? That is a bit confusing. Maybe a better name might be KStreamKStreamJoinLeftSide
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. What do you think about KStreamKStreamLeftJoinSide
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would still prefer KStreamKStreamJoinLeftSide
. It is the left side of a stream-stream join. The processor are also named KStreamKStreamJoinLeftProcessor
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates, @raminqaf!
Here my feedback!
private void emitNonJoinedOuterRecords( | ||
final KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>> store, | ||
final Record<K, V1> record) { | ||
final KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<VLeft, VRight>> store, | ||
final Record<K, ?> record) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
private void emitNonJoinedOuterRecords(final KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<VLeft, VRight>> store,
final Record<K, ?> record) {
private void emitNonJoinedOuterRecords( | ||
final KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>> store, | ||
final Record<K, V1> record) { | ||
final KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<VLeft, VRight>> store, | ||
final Record<K, ?> record) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't that be
final Record<K, VThis> record
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! Avoiding the wildcard here makes it easier to read and understand. I have changed it!
(V1) leftOrRightValue.getRightValue(), | ||
(V2) leftOrRightValue.getLeftValue()); | ||
} | ||
private void forwardNonJoinedOuterRecords(final Record<K, ?> record, final KeyValue<? extends TimestampedKeyAndJoinSide<K>, ? extends LeftOrRightValue<VLeft, VRight>> nextKeyValue) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be simpler to define this method as:
private void forwardNonJoinedOuterRecords(final Record<K, VThis> record,
final TimestampedKeyAndJoinSide<K> timestampedKeyAndJoinSide,
final LeftOrRightValue<VLeft, VRight> leftOrRightValue) {
It makes the code a bit simpler and shorter.
Also here, why not Record<K, VThis> record
. That is exactly the type used at the call site.
|
||
private class KStreamKStreamJoinLeftProcessor extends KStreamKStreamJoinProcessor { | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@raminqaf Could you please rebase this PR on current trunk to get the new build setup? We need to restart the builds because one was red. |
Done! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates, @raminqaf !
LGTM!
commit cc269b0 Author: Antoine Pourchet <antoine@responsive.dev> Date: Wed May 29 14:15:02 2024 -0600 KAFKA-15045: (KIP-924 pt. 14) Callback to TaskAssignor::onAssignmentComputed (apache#16123) This PR adds the logic and wiring necessary to make the callback to TaskAssignor::onAssignmentComputed with the necessary parameters. We also fixed some log statements in the actual assignment error computation, as well as modified the ApplicationState::allTasks method to return a Map instead of a Set of TaskInfos. Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org> commit 862ea12 Author: Eugene Mitskevich <emitskevich@bloomberg.net> Date: Wed May 29 16:14:37 2024 -0400 MINOR: Fix rate metric spikes (apache#15889) Rate reports value in the form of sumOrCount/monitoredWindowSize. It has a bug in monitoredWindowSize calculation, which leads to spikes in result values. Reviewers: Jun Rao <junrao@gmail.com> commit 0f0c9ec Author: gongxuanzhang <gongxuanzhangmelt@gmail.com> Date: Thu May 30 01:08:17 2024 +0800 KAFKA-16771 First log directory printed twice when formatting storage (apache#16010) Reviewers: Chia-Ping Tsai <chia7712@gmail.com> commit 2d9994e Author: Andrew Schofield <aschofield@confluent.io> Date: Wed May 29 16:31:52 2024 +0100 KAFKA-16722: Introduce ConsumerGroupPartitionAssignor interface (apache#15998) KIP-932 introduces share groups to go alongside consumer groups. Both kinds of group use server-side assignors but it is unlikely that a single assignor class would be suitable for both. As a result, the KIP introduces specific interfaces for consumer group and share group partition assignors. This PR introduces only the consumer group interface, `o.a.k.coordinator.group.assignor.ConsumerGroupPartitionAssignor`. The share group interface will come in a later release. The existing implementations of the general `PartitionAssignor` interface have been changed to implement `ConsumerGroupPartitionAssignor` instead and all other code changes are just propagating the change throughout the codebase. Note that the code in the group coordinator that actually calculates assignments uses the general `PartitionAssignor` interface so that it can be used with both kinds of group, even though the assignors themselves are specific. Reviewers: Apoorv Mittal <amittal@confluent.io>, David Jacot <djacot@confluent.io> commit 0b75cf7 Author: gongxuanzhang <gongxuanzhangmelt@gmail.com> Date: Wed May 29 22:38:00 2024 +0800 KAFKA-16705 the flag "started" of RaftClusterInstance is false even though the cluster is started (apache#15946) Reviewers: Chia-Ping Tsai <chia7712@gmail.com> commit 8d11d95 Author: Loïc GREFFIER <loic.greffier@hotmail.fr> Date: Wed May 29 14:09:22 2024 +0200 KAFKA-16448: Add ProcessingExceptionHandler interface and implementations (apache#16090) This PR is part of KAFKA-16448 which aims to bring a ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions that occur during processing. This PR brings ProcessingExceptionHandler interface and default implementations. Co-authored-by: Dabz <d.gasparina@gmail.com> Co-authored-by: sebastienviale <sebastien.viale@michelin.com> Reviewer: Bruno Cadonna <cadonna@apache.org> commit b73f479 Author: Ramin Gharib <ramingharib@gmail.com> Date: Wed May 29 13:12:54 2024 +0200 KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide (apache#15601) The introduced changes provide a cleaner definition of the join side in KStreamKStreamJoin. Before, this was done by using a Boolean flag, which led to returning a raw LeftOrRightValue without generic arguments because the generic type arguments depended on the boolean input. Reviewers: Greg Harris <greg.harris@aiven.io>, Bruno Cadonna <cadonna@apache.org> commit 897cab2 Author: Luke Chen <showuon@gmail.com> Date: Wed May 29 15:30:18 2024 +0800 KAFKA-16399: Add JBOD support in tiered storage (apache#15690) After JBOD is supported in KRaft, we should also enable JBOD support in tiered storage. Unit tests and Integration tests are also added. Reviewers: Satish Duggana <satishd@apache.org>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>, Igor Soarez <soarez@apple.com>, Mickael Maison <mickael.maison@gmail.com> commit eefd114 Author: Dongnuo Lyu <139248811+dongnuo123@users.noreply.github.com> Date: Wed May 29 02:21:30 2024 -0400 KAFKA-16832; LeaveGroup API for upgrading ConsumerGroup (apache#16057) This patch implements the LeaveGroup API to the consumer groups that are in the mixed mode. Reviewers: Jeff Kim <jeff.kim@confluent.io>, David Jacot <djacot@confluent.io> commit 9562143 Author: A. Sophie Blee-Goldman <ableegoldman@gmail.com> Date: Tue May 28 21:35:02 2024 -0700 HOTFIX: remove unnecessary list creation (apache#16117) Removing a redundant list declaration in the new StickyTaskAssignor implementation Reviewers: Antoine Pourchet <antoine@responsive.dev> commit d64e3fb Author: Antoine Pourchet <antoine@responsive.dev> Date: Tue May 28 20:43:30 2024 -0600 KAFKA-15045: (KIP-924 pt. 13) AssignmentError calculation added (apache#16114) This PR adds the post-processing of the TaskAssignment to figure out if the new assignment is valid, and return an AssignmentError otherwise. Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org> commit 8d243df Author: Antoine Pourchet <antoine@responsive.dev> Date: Tue May 28 19:01:18 2024 -0600 KAFKA-15045: (KIP-924 pt. 12) Wiring in new assignment configs and logic (apache#16074) This PR creates the new public config of KIP-924 in StreamsConfig and uses it to instantiate user-created TaskAssignors. If such a TaskAssignor is found and successfully created we then use that assignor to perform the task assignment, otherwise we revert back to the pre KIP-924 world with the internal task assignors. Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Almog Gavra <almog@responsive.dev> commit 56ee139 Author: Antoine Pourchet <antoine@responsive.dev> Date: Tue May 28 18:05:51 2024 -0600 KAFKA-15045: (KIP-924 pt. 11) Implemented StickyTaskAssignor (apache#16052) This PR implements the StickyTaskAssignor with the new KIP 924 API. Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org> commit 59ba555 Author: Nick Telford <nick.telford@gmail.com> Date: Wed May 29 00:23:23 2024 +0100 KAFKA-15541: Add oldest-iterator-open-since-ms metric (apache#16041) Part of [KIP-989](https://cwiki.apache.org/confluence/x/9KCzDw). This new `StateStore` metric tracks the timestamp that the oldest surviving Iterator was created. This timestamp should continue to climb, and closely track the current time, as old iterators are closed and new ones created. If the timestamp remains very low (i.e. old), that suggests an Iterator has leaked, which should enable users to isolate the affected store. It will report no data when there are no currently open Iterators. Reviewers: Matthias J. Sax <matthias@confluent.io> commit 4eb60b5 Author: Frederik Rouleau <frouleau@confluent.io> Date: Tue May 28 23:56:47 2024 +0200 KAFKA-16507 Add KeyDeserializationException and ValueDeserializationException with record content (apache#15691) Implements KIP-1036. Add raw ConsumerRecord data to RecordDeserialisationException to make DLQ implementation easier. Reviewers: Kirk True <ktrue@confluent.io>, Andrew Schofield <aschofield@confluent.io>, Matthias J. Sax <matthias@confluent.io> commit 4d04eb8 Author: PoAn Yang <payang@apache.org> Date: Wed May 29 03:13:33 2024 +0800 KAFKA-16796 Introduce new org.apache.kafka.tools.api.Decoder to replace kafka.serializer.Decoder (apache#16064) Reviewers: Chia-Ping Tsai <chia7712@gmail.com> commit a649bc4 Author: Luke Chen <showuon@gmail.com> Date: Wed May 29 00:05:49 2024 +0800 KAFKA-16711: Make sure to update highestOffsetInRemoteStorage after log dir change (apache#15947) Reviewers: Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Satish Duggana <satishd@apache.org> commit 64f699a Author: Omnia Ibrahim <o.g.h.ibrahim@gmail.com> Date: Tue May 28 15:22:54 2024 +0100 KAFKA-15853: Move general configs out of KafkaConfig (apache#16040) Reviewers: Mickael Maison <mickael.maison@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com> commit 699438b Author: Sanskar Jhajharia <122860866+sjhajharia@users.noreply.github.com> Date: Tue May 28 16:34:44 2024 +0530 MINOR: Fix the config name in ProducerFailureHandlingTest (apache#16099) When moving from KafkaConfig.ReplicaFetchMaxBytesProp we used ReplicationConfigs.REPLICA_LAG_TIME_MAX_MS_CONFIG instead of ReplicationConfigs.REPLICA_FETCH_MAX_BYTES_CONFIG. This PR patches the same. Reviewers: Omnia Ibrahim <o.g.h.ibrahim@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com> commit a57c05b Author: Ken Huang <100591800+m1a2st@users.noreply.github.com> Date: Tue May 28 17:42:33 2024 +0900 KAFKA-16805 Stop using a ClosureBackedAction to configure Spotbugs reports (apache#16081) Reviewers: Chia-Ping Tsai <chia7712@gmail.com> commit 91284d8 Author: Luke Chen <showuon@gmail.com> Date: Tue May 28 12:23:34 2024 +0800 KAFKA-16709: abortAndPauseCleaning only when future log is not existed (apache#15951) When doing alter replica logDirs, we'll create a future log and pause log cleaning for the partition( here). And this log cleaning pausing will resume after alter replica logDirs completes (here). And when in the resuming log cleaning, we'll decrement 1 for the LogCleaningPaused count. Once the count reached 0, the cleaning pause is really resuming. (here). For more explanation about the logCleaningPaused state can check here. But, there's still one factor that could increase the LogCleaningPaused count: leadership change (here). When there's a leadership change, we'll check if there's a future log in this partition, if so, we'll create future log and pauseCleaning (LogCleaningPaused count + 1). So, if during the alter replica logDirs: 1. alter replica logDirs for tp0 triggered (LogCleaningPaused count = 1) 2. tp0 leadership changed (LogCleaningPaused count = 2) 3. alter replica logDirs completes, resuming logCleaning (LogCleaningPaused count = 1) 4. LogCleaning keeps paused because the count is always > 0 This PR fixes this issue by only abortAndPauseCleaning when future log is not existed. We did the same check in alterReplicaLogDirs. So this change can make sure there's only 1 abortAndPauseCleaning for either abortAndPauseCleaning or maybeAddLogDirFetchers. Tests also added. Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Igor Soarez <soarez@apple.com> commit adab48d Author: Greg Harris <greg.harris@aiven.io> Date: Mon May 27 18:33:01 2024 -0700 MINOR: Disable JDK 11 and 17 tests on PRs (apache#16051) Signed-off-by: Greg Harris <greg.harris@aiven.io> Reviewers: Justine Olshan <jolshan@confluent.io>, David Arthur <mumrah@gmail.com>, Ismael Juma <ismael@juma.me.uk>, Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com> commit bac8df5 Author: Colin P. McCabe <cmccabe@apache.org> Date: Mon May 27 08:53:53 2024 -0700 MINOR: fix typo in KAFKA-16515 commit da3304e Author: David Jacot <djacot@confluent.io> Date: Mon May 27 17:10:37 2024 +0200 KAFKA-16371; fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE (apache#16072) This patch was initially created in apache#15536. When there is a commit for multiple topic partitions and some, but not all, exceed the offset metadata limit, the pending commit is not properly cleaned up leading to UNSTABLE_OFFSET_COMMIT errors when trying to fetch the offsets with read_committed. This change makes it so the invalid commits are not added to the pendingOffsetCommits set. Co-authored-by: Kyle Phelps <kyle.phelps@datadoghq.com> Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Justine Olshan <jolshan@confluent.io> commit 524ad1e Author: Kamal Chandraprakash <kchandraprakash@uber.com> Date: Mon May 27 15:14:23 2024 +0530 KAFKA-16452: Don't throw OOORE when converting the offset to metadata (apache#15825) Don't throw OFFSET_OUT_OF_RANGE error when converting the offset to metadata, and next time the leader should increment the high watermark by itself after receiving fetch requests from followers. This can happen when checkpoint files are missing and being elected as a leader. Reviewers: Luke Chen <showuon@gmail.com>, Jun Rao <junrao@apache.org> commit d9ee9c9 Author: Nick Telford <nick.telford@gmail.com> Date: Sat May 25 20:22:56 2024 +0100 KAFKA-15541: Use LongAdder instead of AtomicInteger (apache#16076) `LongAdder` performs better than `AtomicInteger` when under contention from many threads. Since it's possible that many Interactive Query threads could create a large number of `KeyValueIterator`s, we don't want contention on a metric to be a performance bottleneck. The trade-off is memory, as `LongAdder` uses more memory to space out independent counters across different cache lines. In practice, I don't expect this to cause too many problems, as we're only constructing 1 per-store. Reviewers: Matthias J. Sax <matthias@confluent.io> commit a8d166c Author: Ritika Reddy <98577846+rreddy-22@users.noreply.github.com> Date: Sat May 25 09:06:15 2024 -0700 KAFKA-16625; Reverse lookup map from topic partitions to members (apache#15974) This patch speeds up the computation of the unassigned partitions by exposing the inverted target assignment. It allows the assignor to check whether a partition is assigned or not. Reviewers: Jeff Kim <jeff.kim@confluent.io>, David Jacot <djacot@confluent.io> commit d585a49 Author: Jeff Kim <kimkb2011@gmail.com> Date: Fri May 24 16:33:57 2024 -0400 KAFKA-16831: CoordinatorRuntime should initialize MemoryRecordsBuilder with max batch size write limit (apache#16059) CoordinatorRuntime should initialize MemoryRecordsBuilder with max batch size write limit. Otherwise, we default the write limit to the min buffer size of 16384 for the write limit. This causes the coordinator to threw RecordTooLargeException even when it's under the 1MB max batch size limit. Reviewers: David Jacot <djacot@confluent.io> commit 8eea6b8 Author: Edoardo Comar <ecomar@uk.ibm.com> Date: Fri May 24 20:33:00 2024 +0100 MINOR: mention KAFKA-15905 in docs "Notable changes in 3.7.1" (apache#16070) * MINOR: mention KAFKA-15905 in docs "Notable changes in 3.7.1/3.8.0" Co-Authored-By: Adrian Preston <prestona@uk.ibm.com> commit 4f55786 Author: Colin P. McCabe <cmccabe@apache.org> Date: Mon May 20 15:41:52 2024 -0700 KAFKA-16515: Fix the ZK Metadata cache confusion between brokers and controllers ZkMetadataCache could theoretically return KRaft controller information from a call to ZkMetadataCache.getAliveBrokerNode, which doesn't make sense. KRaft controllers are not part of the set of brokers. The only use-case for this functionality was in MetadataCacheControllerNodeProvider during ZK migration, where it allowed ZK brokers in migration mode to forward requests to kcontrollers when appropriate. This PR changes MetadataCacheControllerNodeProvider to simply delegate to quorumControllerNodeProvider in this case. Reviewers: José Armando García Sancio <jsancio@apache.org> commit 90892ae Author: Colin P. McCabe <cmccabe@apache.org> Date: Mon May 20 16:23:27 2024 -0700 KAFKA-16516: Fix the controller node provider for broker to control channel Fix the code in the RaftControllerNodeProvider to query RaftManager to find Node information, rather than consulting a static map. Add a RaftManager.voterNode function to supply this information. In KRaftClusterTest, add testControllerFailover to get more coverage of controller failovers. Reviewers: José Armando García Sancio <jsancio@apache.org> commit 2432a18 Author: KrishVora01 <156789009+KrishVora01@users.noreply.github.com> Date: Fri May 24 22:21:02 2024 +0530 KAFKA-16373: KIP-1028: Adding code to support Apache Kafka Docker Official Images (apache#16027) This PR aims to add JVM based Docker Official Image for Apache Kafka as per the following KIP - https://cwiki.apache.org/confluence/display/KAFKA/KIP-1028%3A+Docker+Official+Image+for+Apache+Kafka This PR adds the following functionalities: Introduces support for Apache Kafka Docker Official Images via: GitHub Workflows: - Workflow to prepare static source files for Docker images - Workflow to build and test Docker official images - Scripts to prepare source files and perform Docker image builds and tests A new directory for Docker official images, named docker/docker_official_images. This is the new directory to house all Docker Official Image assets. Co-authored-by: Vedarth Sharma <vesharma@confluent.io> Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Vedarth Sharma <vesharma@confluent.io> commit 0143c72 Author: Lianet Magrans <98415067+lianetm@users.noreply.github.com> Date: Fri May 24 14:19:43 2024 +0200 KAFKA-16815: Handle FencedInstanceId in HB response (apache#16047) Handle FencedInstanceIdException that a consumer may receive in the heartbeat response. This will be the case when a static consumer is removed from the group by and admin client, and another member joins with the same group.instance.id (allowed in). The first member will receive a FencedInstanceId on its next heartbeat. The expectation is that this should be handled as a fatal error. There are no actual changes in logic with this PR, given that without being handled, the FencedInstanceId was being treated as an "unexpected error", which are all treated as fatal errors, so the outcome remains the same. But we're introducing this small change just for accuracy in the logic and the logs: FencedInstanceId is expected during heartbeat, a log line is shown describing the situation and why it happened (and it's treated as a fatal error, just like it was before this PR). This PR also improves the test to ensure that the error propagated to the app thread matches the one received in the HB. Reviewers: Andrew Schofield <aschofield@confluent.io>, David Jacot <djacot@confluent.io> commit c5cd190 Author: Gantigmaa Selenge <39860586+tinaselenge@users.noreply.github.com> Date: Fri May 24 11:50:47 2024 +0100 MINOR: Refactor SSL/SASL admin integration tests to not use a custom authorizer (apache#15377) Reviewers: Mickael Maison <mickael.maison@gmail.com> commit 520aa86 Author: Jeff Kim <kimkb2011@gmail.com> Date: Fri May 24 03:51:50 2024 -0400 KAFKA-16626; Lazily convert subscribed topic names to topic ids (apache#15970) This patch aims to remove the data structure that stores the conversion from topic names to topic ids which was taking time similar to the actual assignment computation. Instead, we reuse the already existing ConsumerGroupMember.subscribedTopicNames() and do the conversion to topic ids when the iterator is requested. Reviewers: David Jacot <djacot@confluent.io> commit 6941598 Author: Krishna Agarwal <62741600+kagarwal06@users.noreply.github.com> Date: Fri May 24 12:16:01 2024 +0530 KAFKA-16826: Integrate Native Docker Image with github actions (apache#16045) This PR integrates the Native docker image with the existing github action jobs for the jvm docker image of AK. The integration is done to the following actions: docker_build_and_test.yml: Builds the docker image and runs sanity tests and CVE scan docker_rc_release.yml: Builds the RC docker image for both amd and arm platform and pushes it to the dockerhub. docker_promote.yml: Promotes the RC docker image to the released image tag Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Vedarth Sharma <142404391+VedarthConfluent@users.noreply.github.com> commit de32028 Author: Kuan-Po (Cooper) Tseng <brandboat@gmail.com> Date: Fri May 24 05:25:53 2024 +0800 KAFKA-16828 RackAwareTaskAssignorTest failed (apache#16044) Reviewers: Chia-Ping Tsai <chia7712@gmail.com> commit 11ad5e8 Author: Greg Harris <greg.harris@aiven.io> Date: Thu May 23 13:23:18 2024 -0700 MINOR: Refactor Values class to fix checkstyle, add benchmark, optimize exceptions (apache#15469) Signed-off-by: Greg Harris <greg.harris@aiven.io> Reviewers: Mickael Maison <mickael.maison@gmail.com>
…Side (apache#15601) The introduced changes provide a cleaner definition of the join side in KStreamKStreamJoin. Before, this was done by using a Boolean flag, which led to returning a raw LeftOrRightValue without generic arguments because the generic type arguments depended on the boolean input. Reviewers: Greg Harris <greg.harris@aiven.io>, Bruno Cadonna <cadonna@apache.org>
…Side (apache#15601) The introduced changes provide a cleaner definition of the join side in KStreamKStreamJoin. Before, this was done by using a Boolean flag, which led to returning a raw LeftOrRightValue without generic arguments because the generic type arguments depended on the boolean input. Reviewers: Greg Harris <greg.harris@aiven.io>, Bruno Cadonna <cadonna@apache.org>
…Side (apache#15601) The introduced changes provide a cleaner definition of the join side in KStreamKStreamJoin. Before, this was done by using a Boolean flag, which led to returning a raw LeftOrRightValue without generic arguments because the generic type arguments depended on the boolean input. Reviewers: Greg Harris <greg.harris@aiven.io>, Bruno Cadonna <cadonna@apache.org>
…Side (apache#15601) The introduced changes provide a cleaner definition of the join side in KStreamKStreamJoin. Before, this was done by using a Boolean flag, which led to returning a raw LeftOrRightValue without generic arguments because the generic type arguments depended on the boolean input. Reviewers: Greg Harris <greg.harris@aiven.io>, Bruno Cadonna <cadonna@apache.org>
Description
The introduced changes provide a cleaner definition of the join side in KStreamKStreamJoin. Before, this was done by using a Boolean flag, which led to returning a raw LeftOrRightValue without generic arguments because the generic type arguments depended on the boolean input.
Tests of @gharris1727 from his PR have been adopted to check the type-sefty of the joins.
Summary of testing strategy
Unit tests have been added for the new JoinSide enum.
Committer Checklist (excluded from commit message)