Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add wrapper for Metadata requests #497 #900

Merged
merged 33 commits into from
Nov 27, 2019

Conversation

jewertow
Copy link
Contributor

@jewertow jewertow commented Sep 8, 2019

Purpose

Introduction of a wrapper for Metadata requests.

References

See #497

Changes

  • Add MetadataClient

Background Context

The new feature to request Kafka Metadata from the internal KafkaConsumerActor (see #491) would be even nicer with thin wrappers (javadsl and scaladsl) hiding the ask calls and mapping to the correct response types.

@jewertow jewertow marked this pull request as ready for review September 8, 2019 20:49
@jewertow
Copy link
Contributor Author

jewertow commented Sep 8, 2019

I think this feature will be useful for #886, so I would like to implement it first. At the moment I have prepared a draft of the target implementation. I need an opinion whether this is a good way or not. If this is a good approach, I will continue this way. Otherwise, please give me some hints.

Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for giving this some attention!

Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, less magic is better.

tests/src/test/java/docs/javadsl/MetadataClientTest.java Outdated Show resolved Hide resolved
)(implicit ec: ExecutionContext): Future[Map[TopicPartition, Long]] =
(consumerActor ? GetBeginningOffsets(partitions))(timeout)
.mapTo[BeginningOffsets]
.map(_.response.get)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will throw an exception if there was an error and fail the future that way. Instead, you should transform the Future. To be Scala 2.11 compatible you need to do it like this

    .flatMap {
      case Success(res) => Future.successful(res)
      case Failure(e) => Future.failed(e)
    }(ExecutionContexts.sameThreadExecutionContext)

With this it is OK to use Akka's sameThreadExecutionContext to run the flatMap on.

Copy link
Contributor Author

@jewertow jewertow Sep 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need this? I added failing test cases and the code behaves the same for:

.map(_.response.get)

as well as for:

.map(_.response)
.flatMap {
  case Success(res) => Future.successful(res)
  case Failure(exception) => Future.failed(exception)
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To run it safely on the sameThreadExecutionContext we may not allow any exceptions happening in the operator.

tests/src/test/java/docs/javadsl/MetadataClientTest.java Outdated Show resolved Hide resolved
@ennru
Copy link
Member

ennru commented Sep 30, 2019

Have you thought about making the MetadataClient an instance which would keep the actor ref and timeout? That would open up to have a convenience factory method which would take ConsumerSettings and create the actor internally (a bit like your first proposal).

@jewertow
Copy link
Contributor Author

jewertow commented Oct 1, 2019

Yes, I think it's a good way.

@jewertow
Copy link
Contributor Author

jewertow commented Oct 7, 2019

I pushed new proposal. Please look again and tell me if I can continue this way for the other requests.

import scala.concurrent.{ExecutionContextExecutor, Future}
import scala.util.{Failure, Success}

class MetadataClient(actorSystem: ActorSystem, timeout: Timeout) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be preferable to have one MetadataClient instance per consumer actor/settings.
Make the primary constructor private and let it take the consumer actor's ActorRef.
Create factory methods in the companion object, one taking the ActorRef and one taking ConsumerSettings.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the hint. I pushed new implementation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ennru could you look at it again and say if you meant it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that looks good.
The actor should only be stopped by the metadata client if it was created by it, you'd possibly need to pass that information to the client. So two constructors on that class might make sense.
close might be better than stop.

@ennru
Copy link
Member

ennru commented Nov 6, 2019

It would be great if you could get back to this to push it over the line.

Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code looks good!
Please suggest an update to the docs in https://doc.akka.io/docs/alpakka-kafka/snapshot/consumer-metadata.html

@jewertow jewertow changed the title WIP: Add wrapper for Metadata requests #497 Add wrapper for Metadata requests #497 Nov 22, 2019
@jewertow
Copy link
Contributor Author

@ennru I pushed updated docs. Look at this please.

Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great!
It would be better to show this documentation in consumer-metadata.md as this now becomes the recommened way to access the consumer metadata.

tests/src/test/java/docs/javadsl/MetadataClientTest.java Outdated Show resolved Hide resolved
docs/src/main/paradox/metadata-client.md Outdated Show resolved Hide resolved
docs/src/main/paradox/metadata-client.md Outdated Show resolved Hide resolved
@jewertow
Copy link
Contributor Author

I agree with you that it should be in the consumer-metadata.md documentation, but current implementation of MetadataClient does not support requests of the type GetOffsetsForTimes.
I'm going to add support for the GetOffsetsForTimes in the foreseeable future.
For now I would add to consumer-metadata docs the following statement:

@@@ warning

Since the `KafkaConsumerActor` is the internal mechanism, the recommended method to get metadata is using the [metadata-client](metadata-client.md).

@ennru
Copy link
Member

ennru commented Nov 26, 2019

Please join the new docs into the consumer-metadata.md page.
Show the MetadataClient first and have the raw experience after it.

@jewertow
Copy link
Contributor Author

Done.

Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@ennru ennru added this to the 2.0.0 milestone Nov 27, 2019
@ennru ennru merged commit 97c1cff into akka:master Nov 27, 2019
@ennru
Copy link
Member

ennru commented Nov 27, 2019

Thank you for implementing this!

@jewertow
Copy link
Contributor Author

Thank you for the opportunity to contribute to alpakka-kafka. It was the best experience I’ve ever had in open source! Thank you for your help and advices.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants