Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer: make partition assignment handler public API #949

Merged
merged 5 commits into from
Oct 30, 2019

Conversation

ennru
Copy link
Member

@ennru ennru commented Oct 23, 2019

Purpose

Allow users to pass a PartitionAssignmentHandler which allows for hooking into Kafka rebalances.

Potential use cases may include

References

Fixes #539, #843, #911
#761

Changes

  • Let subscriptions pass a PartitionAssignmentHandler
  • Add javadsl.PartitionAssignmentHandler
  • Add seekToBeginning and seekToEnd (usful for User-defined seeking for subscriptions #911)
  • Test cases in Scala an Java
  • Upgrade Hamcrest 2.1 to 2.2

TODO

  • Documentation

/**
* See [[org.apache.kafka.clients.consumer.KafkaConsumer#seekToEnd(java.util.Collection[TopicPartition])]]
*/
def seekToEnd(tps: java.util.Collection[TopicPartition]): Unit = consumer.seekToEnd(tps)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest we also add KafkaConsumer.offsetsForTimes while we're updating the RestrictedConsumer. It's a blocking call. It's useful for apps running in multi-dc Kafka environments.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok.

/**
* The API is new and may change in further releases.
*
* Allows to execute user code when Kafka rebalances partitions between consumers, or an Alpakka Kafka consumer is stopped.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Allows to execute user code when Kafka rebalances partitions between consumers, or an Alpakka Kafka consumer is stopped.
* Allows the user to execute user code when Kafka rebalances partitions between consumers, or an Alpakka Kafka consumer is stopped.

* A warning will be logged if a callback takes longer than the configured `partition-handler-warning`.
*
* There is no point in calling `CommittableOffset`'s commit methods as their committing won't be executed as long as any of
* the callbacks in this class are called.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could mention that RestrictedConsumer.commitSync is available though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok.

* the callbacks in this class are called.
*
* This complements the methods of Kafka's [[org.apache.kafka.clients.consumer.ConsumerRebalanceListener ConsumerRebalanceListener]] with
* an `onStop` callback.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To match the scaladsl comments.

Suggested change
* an `onStop` callback.
* an `onStop` callback which is called before `Consumer.close`.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok.

*/
@ApiMayChange
trait PartitionAssignmentHandler {

/**
* See [[org.apache.kafka.clients.consumer.ConsumerRebalanceListener#onPartitionsRevoked]]
*
* @param revokedTps The list of partitions that were assigned to the consumer on the last rebalance
* @param revokedTps The list of partitions that were revoked from the consumer
* @param consumer A restricted version of the internally used [[org.apache.kafka.clients.consumer.Consumer Consumer]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not really a KafkaConsumer though in the is a sense. Should we link to RestrictedConsumer instead, or in addition, to this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok.

Copy link
Contributor

@seglo seglo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Nice work!

@@ -0,0 +1,8 @@
# Internal API
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the PR-based path convention. I'll do that in other PRs too (#930)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Rebalance event listeners don't respect ConsumerRebalanceListener contract
2 participants