-
Notifications
You must be signed in to change notification settings - Fork 387
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add wrapper for Metadata requests #497 #900
Add wrapper for Metadata requests #497 #900
Conversation
I think this feature will be useful for #886, so I would like to implement it first. At the moment I have prepared a draft of the target implementation. I need an opinion whether this is a good way or not. If this is a good approach, I will continue this way. Otherwise, please give me some hints. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for giving this some attention!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, less magic is better.
)(implicit ec: ExecutionContext): Future[Map[TopicPartition, Long]] = | ||
(consumerActor ? GetBeginningOffsets(partitions))(timeout) | ||
.mapTo[BeginningOffsets] | ||
.map(_.response.get) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will throw an exception if there was an error and fail the future that way. Instead, you should transform
the Future
. To be Scala 2.11 compatible you need to do it like this
.flatMap {
case Success(res) => Future.successful(res)
case Failure(e) => Future.failed(e)
}(ExecutionContexts.sameThreadExecutionContext)
With this it is OK to use Akka's sameThreadExecutionContext
to run the flatMap
on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need this? I added failing test cases and the code behaves the same for:
.map(_.response.get)
as well as for:
.map(_.response)
.flatMap {
case Success(res) => Future.successful(res)
case Failure(exception) => Future.failed(exception)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To run it safely on the sameThreadExecutionContext
we may not allow any exceptions happening in the operator.
tests/src/test/scala/akka/kafka/scaladsl/MetadataClientSpec.scala
Outdated
Show resolved
Hide resolved
…mbeddedKafkaJunit4Test.
Have you thought about making the |
Yes, I think it's a good way. |
I pushed new proposal. Please look again and tell me if I can continue this way for the other requests. |
import scala.concurrent.{ExecutionContextExecutor, Future} | ||
import scala.util.{Failure, Success} | ||
|
||
class MetadataClient(actorSystem: ActorSystem, timeout: Timeout) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be preferable to have one MetadataClient
instance per consumer actor/settings.
Make the primary constructor private
and let it take the consumer actor's ActorRef
.
Create factory methods in the companion object, one taking the ActorRef
and one taking ConsumerSettings
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the hint. I pushed new implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ennru could you look at it again and say if you meant it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that looks good.
The actor should only be stopped by the metadata client if it was created by it, you'd possibly need to pass that information to the client. So two constructors on that class might make sense.
close
might be better than stop
.
It would be great if you could get back to this to push it over the line. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code looks good!
Please suggest an update to the docs in https://doc.akka.io/docs/alpakka-kafka/snapshot/consumer-metadata.html
@ennru I pushed updated docs. Look at this please. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great!
It would be better to show this documentation in consumer-metadata.md
as this now becomes the recommened way to access the consumer metadata.
I agree with you that it should be in the
|
Please join the new docs into the |
Done. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Thank you for implementing this! |
Thank you for the opportunity to contribute to alpakka-kafka. It was the best experience I’ve ever had in open source! Thank you for your help and advices. |
Purpose
Introduction of a wrapper for
Metadata
requests.References
See #497
Changes
MetadataClient
Background Context
The new feature to request Kafka Metadata from the internal KafkaConsumerActor (see #491) would be even nicer with thin wrappers (javadsl and scaladsl) hiding the ask calls and mapping to the correct response types.