-
Notifications
You must be signed in to change notification settings - Fork 387
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Consumer: make partition assignment handler public API #949
Conversation
/** | ||
* See [[org.apache.kafka.clients.consumer.KafkaConsumer#seekToEnd(java.util.Collection[TopicPartition])]] | ||
*/ | ||
def seekToEnd(tps: java.util.Collection[TopicPartition]): Unit = consumer.seekToEnd(tps) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest we also add KafkaConsumer.offsetsForTimes
while we're updating the RestrictedConsumer
. It's a blocking call. It's useful for apps running in multi-dc Kafka environments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.
/** | ||
* The API is new and may change in further releases. | ||
* | ||
* Allows to execute user code when Kafka rebalances partitions between consumers, or an Alpakka Kafka consumer is stopped. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* Allows to execute user code when Kafka rebalances partitions between consumers, or an Alpakka Kafka consumer is stopped. | |
* Allows the user to execute user code when Kafka rebalances partitions between consumers, or an Alpakka Kafka consumer is stopped. |
* A warning will be logged if a callback takes longer than the configured `partition-handler-warning`. | ||
* | ||
* There is no point in calling `CommittableOffset`'s commit methods as their committing won't be executed as long as any of | ||
* the callbacks in this class are called. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could mention that RestrictedConsumer.commitSync
is available though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.
* the callbacks in this class are called. | ||
* | ||
* This complements the methods of Kafka's [[org.apache.kafka.clients.consumer.ConsumerRebalanceListener ConsumerRebalanceListener]] with | ||
* an `onStop` callback. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To match the scaladsl comments.
* an `onStop` callback. | |
* an `onStop` callback which is called before `Consumer.close`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.
*/ | ||
@ApiMayChange | ||
trait PartitionAssignmentHandler { | ||
|
||
/** | ||
* See [[org.apache.kafka.clients.consumer.ConsumerRebalanceListener#onPartitionsRevoked]] | ||
* | ||
* @param revokedTps The list of partitions that were assigned to the consumer on the last rebalance | ||
* @param revokedTps The list of partitions that were revoked from the consumer | ||
* @param consumer A restricted version of the internally used [[org.apache.kafka.clients.consumer.Consumer Consumer]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not really a KafkaConsumer
though in the is a
sense. Should we link to RestrictedConsumer
instead, or in addition, to this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Nice work!
@@ -0,0 +1,8 @@ | |||
# Internal API |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the PR-based path convention. I'll do that in other PRs too (#930)
Purpose
Allow users to pass a
PartitionAssignmentHandler
which allows for hooking into Kafka rebalances.Potential use cases may include
References
Fixes #539, #843, #911
#761
Changes
PartitionAssignmentHandler
javadsl.PartitionAssignmentHandler
seekToBeginning
andseekToEnd
(usful for User-defined seeking for subscriptions #911)TODO