Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster-sharding: single listener per entity type key + Java DSL #1080

Merged
merged 3 commits into from
Mar 18, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@

package akka.kafka.cluster.sharding

import java.util.concurrent.{CompletionStage, ConcurrentHashMap}
import java.util.concurrent.atomic.AtomicInteger

import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._
import akka.actor.{ExtendedActorSystem, Extension, ExtensionId}
import akka.actor.{ActorSystem, ClassicActorSystemProvider, ExtendedActorSystem, Extension, ExtensionId}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the FCQN for ActorSystem instead of importing it to make it more clear.

import akka.annotation.{ApiMayChange, InternalApi}
import akka.cluster.sharding.external.ExternalShardAllocation
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
Expand All @@ -24,6 +25,8 @@ import org.apache.kafka.common.utils.Utils
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContextExecutor, Future}
import scala.util.{Failure, Success}
import akka.util.JavaDurationConverters._
import scala.compat.java8.FutureConverters._

/**
* API MAY CHANGE
Expand Down Expand Up @@ -54,6 +57,32 @@ final class KafkaClusterSharding(system: ExtendedActorSystem) extends Extension
settings: ConsumerSettings[_, _]): Future[KafkaShardingMessageExtractor[M]] =
getPartitionCount(topic, timeout, settings).map(new KafkaShardingMessageExtractor[M](_))(system.dispatcher)

/**
*
* Java API
*
* API MAY CHANGE
*
* Asynchronously return a [[akka.cluster.sharding.typed.ShardingMessageExtractor]] with a default hashing strategy
* based on Apache Kafka's [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]].
*
* The number of partitions to use with the hashing strategy will be automatically determined by querying the Kafka
* cluster for the number of partitions of a user provided [[topic]]. Use the [[settings]] parameter to configure
* the Kafka Consumer connection required to retrieve the number of partitions. Each call to this method will result
* in a round trip to Kafka. This method should only be called once per entity type [[M]], per local actor system.
*
* All topics used in a Consumer [[akka.kafka.Subscription]] must contain the same number of partitions to ensure
* that entities are routed to the same Entity type.
*
*/
@ApiMayChange(issue = "https://github.com/akka/alpakka-kafka/issues/1074")
def messageExtractor[M](topic: String,
timeout: java.time.Duration,
settings: ConsumerSettings[_, _]): CompletionStage[KafkaShardingMessageExtractor[M]] =
getPartitionCount(topic, timeout.asScala, settings)
.map(new KafkaShardingMessageExtractor[M](_))(system.dispatcher)
.toJava

/**
* API MAY CHANGE
*
Expand Down Expand Up @@ -92,6 +121,36 @@ final class KafkaClusterSharding(system: ExtendedActorSystem) extends Extension
getPartitionCount(topic, timeout, settings)
.map(partitions => new KafkaShardingNoEnvelopeExtractor[M](partitions, entityIdExtractor))(system.dispatcher)

/**
* Java API
*
* API MAY CHANGE
*
* Asynchronously return a [[akka.cluster.sharding.typed.ShardingMessageExtractor]] with a default hashing strategy
* based on Apache Kafka's [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]].
*
* The number of partitions to use with the hashing strategy will be automatically determined by querying the Kafka
* cluster for the number of partitions of a user provided [[topic]]. Use the [[settings]] parameter to configure
* the Kafka Consumer connection required to retrieve the number of partitions. Use the [[entityIdExtractor]] to pick
* a field from the Entity to use as the entity id for the hashing strategy. Each call to this method will result
* in a round trip to Kafka. This method should only be called once per entity type [[M]], per local actor system.
*
* All topics used in a Consumer [[akka.kafka.Subscription]] must contain the same number of partitions to ensure
* that entities are routed to the same Entity type.
*/
@ApiMayChange(issue = "https://github.com/akka/alpakka-kafka/issues/1074")
def messageExtractorNoEnvelope[M](
topic: String,
timeout: java.time.Duration,
entityIdExtractor: java.util.function.Function[M, String],
settings: ConsumerSettings[_, _]
): CompletionStage[KafkaShardingNoEnvelopeExtractor[M]] =
getPartitionCount(topic, timeout.asScala, settings)
.map(partitions => new KafkaShardingNoEnvelopeExtractor[M](partitions, e => entityIdExtractor.apply(e)))(
system.dispatcher
)
.toJava

/**
* API MAY CHANGE
*
Expand All @@ -108,6 +167,24 @@ final class KafkaClusterSharding(system: ExtendedActorSystem) extends Extension
entityIdExtractor: M => String): KafkaShardingNoEnvelopeExtractor[M] =
new KafkaShardingNoEnvelopeExtractor[M](kafkaPartitions, entityIdExtractor)

/**
* API MAY CHANGE
*
* Asynchronously return a [[akka.cluster.sharding.typed.ShardingMessageExtractor]] with a default hashing strategy
* based on Apache Kafka's [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]].
*
* The number of partitions to use with the hashing strategy is provided explicitly with [[kafkaPartitions]].
*
* All topics used in a Consumer [[akka.kafka.Subscription]] must contain the same number of partitions to ensure
* that entities are routed to the same Entity type.
*/
@ApiMayChange(issue = "https://github.com/akka/alpakka-kafka/issues/1074")
def messageExtractorNoEnvelope[M](
kafkaPartitions: Int,
entityIdExtractor: java.util.function.Function[M, String]
): KafkaShardingNoEnvelopeExtractor[M] =
new KafkaShardingNoEnvelopeExtractor[M](kafkaPartitions, e => entityIdExtractor.apply(e))

private val metadataConsumerActorNum = new AtomicInteger
private def getPartitionCount[M](topic: String,
timeout: FiniteDuration,
Expand All @@ -125,7 +202,8 @@ final class KafkaClusterSharding(system: ExtendedActorSystem) extends Extension
}
}

private val shardingRebalanceListenerActorNum = new AtomicInteger
private val rebalanceListeners =
new ConcurrentHashMap[EntityTypeKey[_], akka.actor.typed.ActorRef[ConsumerRebalanceEvent]]()

/**
* API MAY CHANGE
Expand All @@ -143,11 +221,16 @@ final class KafkaClusterSharding(system: ExtendedActorSystem) extends Extension
* val listenerClassicActorRef: akka.actor.ActorRef = listenerTypedActorRef.toClassic
* }}}
*/
@ApiMayChange(issue = "https://github.com/akka/alpakka-kafka/issues/1074")
def rebalanceListener(typeKey: EntityTypeKey[_]): akka.actor.typed.ActorRef[ConsumerRebalanceEvent] =
rebalanceListener(system.toTyped, typeKey)
def rebalanceListener(typeKey: EntityTypeKey[_]): akka.actor.typed.ActorRef[ConsumerRebalanceEvent] = {
ennru marked this conversation as resolved.
Show resolved Hide resolved
rebalanceListeners.computeIfAbsent(typeKey, _ => {
system.toTyped
.systemActorOf(RebalanceListener(typeKey), s"kafka-cluster-sharding-rebalance-listener-${typeKey.name}")
})
}

/**
* Java API
*
* API MAY CHANGE
*
* Create an Alpakka Kafka rebalance listener that handles [[TopicPartitionsAssigned]] events. The [[typeKey]] is
Expand All @@ -163,11 +246,10 @@ final class KafkaClusterSharding(system: ExtendedActorSystem) extends Extension
* val listenerClassicActorRef: akka.actor.ActorRef = listenerTypedActorRef.toClassic
* }}}
*/
@ApiMayChange(issue = "https://github.com/akka/alpakka-kafka/issues/1074")
def rebalanceListener(otherSystem: ActorSystem[_], typeKey: EntityTypeKey[_]): ActorRef[ConsumerRebalanceEvent] = {
val num = shardingRebalanceListenerActorNum.getAndIncrement()
otherSystem
.systemActorOf(RebalanceListener(typeKey), s"kafka-cluster-sharding-rebalance-listener-$num")
def rebalanceListener(
typeKey: akka.cluster.sharding.typed.javadsl.EntityTypeKey[_]
): akka.actor.typed.ActorRef[ConsumerRebalanceEvent] = {
rebalanceListener(typeKey.asScala)
}
}

Expand Down Expand Up @@ -250,4 +332,14 @@ object KafkaClusterSharding extends ExtensionId[KafkaClusterSharding] {

override def createExtension(system: ExtendedActorSystem): KafkaClusterSharding =
new KafkaClusterSharding(system)

/**
* Java API
*/
override def get(system: ClassicActorSystemProvider): KafkaClusterSharding = super.get(system)

/**
* Java API
*/
override def get(system: ActorSystem): KafkaClusterSharding = super.get(system)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package docs.javadsl;

import akka.actor.typed.ActorSystem;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.Adapter;
import akka.actor.typed.javadsl.Behaviors;
import akka.cluster.sharding.external.ExternalShardAllocationStrategy;
import akka.cluster.sharding.typed.javadsl.ClusterSharding;
import akka.cluster.sharding.typed.javadsl.Entity;
import akka.cluster.sharding.typed.javadsl.EntityTypeKey;
import akka.kafka.AutoSubscription;
import akka.kafka.ConsumerRebalanceEvent;
import akka.kafka.ConsumerSettings;
import akka.kafka.Subscriptions;
import akka.kafka.cluster.sharding.KafkaClusterSharding;
import akka.kafka.javadsl.Consumer;
import akka.util.Timeout;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.concurrent.CompletionStage;

public class ClusterShardingExample {

// #user-entity
final static class User {
public final String id;
public final String mame;

User(String id, String mame) {
this.id = id;
this.mame = mame;
}
}
// #user-entity

public static Behavior<User> userBehaviour() {
return Behaviors.empty();
}

public static void example() {
ActorSystem<Void> typedSystem = ActorSystem.create(Behaviors.empty(), "ClusterShardingExample");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should just be system as in other examples.

String kafkaBootstrapServers = "localhost:9092";


// #message-extractor
// automatically retrieving the number of partitions requires a round trip to a Kafka broker
CompletionStage<KafkaClusterSharding.KafkaShardingNoEnvelopeExtractor<User>> messageExtractor = KafkaClusterSharding.get(typedSystem).messageExtractorNoEnvelope(
"user-topic",
Duration.ofSeconds(10),
(User msg) -> msg.id,
ConsumerSettings.create(Adapter.toClassic(typedSystem), new StringDeserializer(), new StringDeserializer())
);
// #message-extractor

// #setup-cluster-sharding

String groupId = "user-topic-group-id";
EntityTypeKey<User> typeKey = EntityTypeKey.create(User.class, groupId);

messageExtractor.thenAccept(extractor -> ClusterSharding.get(typedSystem).init(
Entity.of(typeKey, ctx -> userBehaviour())
.withAllocationStrategy(new ExternalShardAllocationStrategy(typedSystem, typeKey.name(), Timeout.create(Duration.ofSeconds(5))))
.withMessageExtractor(extractor)));
// #setup-cluster-sharding

// #rebalance-listener
akka.actor.typed.ActorRef<ConsumerRebalanceEvent> rebalanceListener = KafkaClusterSharding.get(typedSystem).rebalanceListener(typeKey);

ConsumerSettings<String, byte[]> consumerSettings = ConsumerSettings.create(Adapter.toClassic(typedSystem), new StringDeserializer(), new ByteArrayDeserializer())
.withBootstrapServers(kafkaBootstrapServers)
.withGroupId(typeKey.name());// use the same group id as we used in the `EntityTypeKey` for `User`

// pass the rebalance listener to the topic subscription
AutoSubscription subscription = Subscriptions
.topics("user-topic")
.withRebalanceListener(Adapter.toClassic(rebalanceListener));

Consumer.plainSource(consumerSettings, subscription);
// #rebalance-listener


}
}
12 changes: 12 additions & 0 deletions docs/src/main/paradox/cluster-sharding.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,25 @@ Given a user entity.
Scala
: @@snip [snip](/cluster-sharding/src/test/scala/docs/scaladsl/ClusterShardingExample.scala) { #user-entity }

Java
: @@snip [snip](/cluster-sharding/src/test/java/docs/javadsl/ClusterShardingExample.java) { #user-entity }

Create a `MessageExtractor`.

Scala
: @@snip [snip](/cluster-sharding/src/test/scala/docs/scaladsl/ClusterShardingExample.scala) { #message-extractor }

Java
: @@snip [snip](/cluster-sharding/src/test/java/docs/javadsl/ClusterShardingExample.java) { #message-extractor }

Setup Akka Typed Cluster Sharding.

Scala
: @@snip [snip](/cluster-sharding/src/test/scala/docs/scaladsl/ClusterShardingExample.scala) { #setup-cluster-sharding }

Java
: @@snip [snip](/cluster-sharding/src/test/java/docs/javadsl/ClusterShardingExample.java) { #setup-cluster-sharding }

## Rebalance Listener

The Rebalance Listener is a pre-defined Actor that will handle @scaladoc[ConsumerRebalanceEvents](akka.kafka.ConsumerRebalanceEvent) that will update the Akka Cluster External Sharding strategy when subscribed partitions are re-assigned to consumers running on different cluster nodes.
Expand All @@ -81,3 +90,6 @@ Create the rebalance listener using the extension and pass it into an Alpakka Ka

Scala
: @@snip [snip](/cluster-sharding/src/test/scala/docs/scaladsl/ClusterShardingExample.scala) { #rebalance-listener }

Java
: @@snip [snip](/cluster-sharding/src/test/java/docs/javadsl/ClusterShardingExample.java) { #rebalance-listener }