Skip to content

Commit

Permalink
Cluster-sharding: single listener per entity type key + Java DSL (#1080)
Browse files Browse the repository at this point in the history
Also remove the method that takes in an actor system. We shouldn't allow
an extension from one system to use another system
  • Loading branch information
chbatey authored Mar 18, 2020
1 parent 789b527 commit feabac8
Show file tree
Hide file tree
Showing 5 changed files with 224 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@

package akka.kafka.cluster.sharding

import java.util.concurrent.{CompletionStage, ConcurrentHashMap}
import java.util.concurrent.atomic.AtomicInteger

import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._
import akka.actor.{ExtendedActorSystem, Extension, ExtensionId}
import akka.actor.{ActorSystem, ClassicActorSystemProvider, ExtendedActorSystem, Extension, ExtensionId}
import akka.annotation.{ApiMayChange, InternalApi}
import akka.cluster.sharding.external.ExternalShardAllocation
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
Expand All @@ -24,6 +25,8 @@ import org.apache.kafka.common.utils.Utils
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContextExecutor, Future}
import scala.util.{Failure, Success}
import akka.util.JavaDurationConverters._
import scala.compat.java8.FutureConverters._

/**
* API MAY CHANGE
Expand Down Expand Up @@ -54,6 +57,32 @@ final class KafkaClusterSharding(system: ExtendedActorSystem) extends Extension
settings: ConsumerSettings[_, _]): Future[KafkaShardingMessageExtractor[M]] =
getPartitionCount(topic, timeout, settings).map(new KafkaShardingMessageExtractor[M](_))(system.dispatcher)

/**
*
* Java API
*
* API MAY CHANGE
*
* Asynchronously return a [[akka.cluster.sharding.typed.ShardingMessageExtractor]] with a default hashing strategy
* based on Apache Kafka's [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]].
*
* The number of partitions to use with the hashing strategy will be automatically determined by querying the Kafka
* cluster for the number of partitions of a user provided [[topic]]. Use the [[settings]] parameter to configure
* the Kafka Consumer connection required to retrieve the number of partitions. Each call to this method will result
* in a round trip to Kafka. This method should only be called once per entity type [[M]], per local actor system.
*
* All topics used in a Consumer [[akka.kafka.Subscription]] must contain the same number of partitions to ensure
* that entities are routed to the same Entity type.
*
*/
@ApiMayChange(issue = "https://github.com/akka/alpakka-kafka/issues/1074")
def messageExtractor[M](topic: String,
timeout: java.time.Duration,
settings: ConsumerSettings[_, _]): CompletionStage[KafkaShardingMessageExtractor[M]] =
getPartitionCount(topic, timeout.asScala, settings)
.map(new KafkaShardingMessageExtractor[M](_))(system.dispatcher)
.toJava

/**
* API MAY CHANGE
*
Expand Down Expand Up @@ -92,6 +121,36 @@ final class KafkaClusterSharding(system: ExtendedActorSystem) extends Extension
getPartitionCount(topic, timeout, settings)
.map(partitions => new KafkaShardingNoEnvelopeExtractor[M](partitions, entityIdExtractor))(system.dispatcher)

/**
* Java API
*
* API MAY CHANGE
*
* Asynchronously return a [[akka.cluster.sharding.typed.ShardingMessageExtractor]] with a default hashing strategy
* based on Apache Kafka's [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]].
*
* The number of partitions to use with the hashing strategy will be automatically determined by querying the Kafka
* cluster for the number of partitions of a user provided [[topic]]. Use the [[settings]] parameter to configure
* the Kafka Consumer connection required to retrieve the number of partitions. Use the [[entityIdExtractor]] to pick
* a field from the Entity to use as the entity id for the hashing strategy. Each call to this method will result
* in a round trip to Kafka. This method should only be called once per entity type [[M]], per local actor system.
*
* All topics used in a Consumer [[akka.kafka.Subscription]] must contain the same number of partitions to ensure
* that entities are routed to the same Entity type.
*/
@ApiMayChange(issue = "https://github.com/akka/alpakka-kafka/issues/1074")
def messageExtractorNoEnvelope[M](
topic: String,
timeout: java.time.Duration,
entityIdExtractor: java.util.function.Function[M, String],
settings: ConsumerSettings[_, _]
): CompletionStage[KafkaShardingNoEnvelopeExtractor[M]] =
getPartitionCount(topic, timeout.asScala, settings)
.map(partitions => new KafkaShardingNoEnvelopeExtractor[M](partitions, e => entityIdExtractor.apply(e)))(
system.dispatcher
)
.toJava

/**
* API MAY CHANGE
*
Expand All @@ -108,6 +167,24 @@ final class KafkaClusterSharding(system: ExtendedActorSystem) extends Extension
entityIdExtractor: M => String): KafkaShardingNoEnvelopeExtractor[M] =
new KafkaShardingNoEnvelopeExtractor[M](kafkaPartitions, entityIdExtractor)

/**
* API MAY CHANGE
*
* Asynchronously return a [[akka.cluster.sharding.typed.ShardingMessageExtractor]] with a default hashing strategy
* based on Apache Kafka's [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]].
*
* The number of partitions to use with the hashing strategy is provided explicitly with [[kafkaPartitions]].
*
* All topics used in a Consumer [[akka.kafka.Subscription]] must contain the same number of partitions to ensure
* that entities are routed to the same Entity type.
*/
@ApiMayChange(issue = "https://github.com/akka/alpakka-kafka/issues/1074")
def messageExtractorNoEnvelope[M](
kafkaPartitions: Int,
entityIdExtractor: java.util.function.Function[M, String]
): KafkaShardingNoEnvelopeExtractor[M] =
new KafkaShardingNoEnvelopeExtractor[M](kafkaPartitions, e => entityIdExtractor.apply(e))

private val metadataConsumerActorNum = new AtomicInteger
private def getPartitionCount[M](topic: String,
timeout: FiniteDuration,
Expand All @@ -125,7 +202,8 @@ final class KafkaClusterSharding(system: ExtendedActorSystem) extends Extension
}
}

private val shardingRebalanceListenerActorNum = new AtomicInteger
private val rebalanceListeners =
new ConcurrentHashMap[EntityTypeKey[_], akka.actor.typed.ActorRef[ConsumerRebalanceEvent]]()

/**
* API MAY CHANGE
Expand All @@ -144,10 +222,16 @@ final class KafkaClusterSharding(system: ExtendedActorSystem) extends Extension
* }}}
*/
@ApiMayChange(issue = "https://github.com/akka/alpakka-kafka/issues/1074")
def rebalanceListener(typeKey: EntityTypeKey[_]): akka.actor.typed.ActorRef[ConsumerRebalanceEvent] =
rebalanceListener(system.toTyped, typeKey)
def rebalanceListener(typeKey: EntityTypeKey[_]): akka.actor.typed.ActorRef[ConsumerRebalanceEvent] = {
rebalanceListeners.computeIfAbsent(typeKey, _ => {
system.toTyped
.systemActorOf(RebalanceListener(typeKey), s"kafka-cluster-sharding-rebalance-listener-${typeKey.name}")
})
}

/**
* Java API
*
* API MAY CHANGE
*
* Create an Alpakka Kafka rebalance listener that handles [[TopicPartitionsAssigned]] events. The [[typeKey]] is
Expand All @@ -164,10 +248,10 @@ final class KafkaClusterSharding(system: ExtendedActorSystem) extends Extension
* }}}
*/
@ApiMayChange(issue = "https://github.com/akka/alpakka-kafka/issues/1074")
def rebalanceListener(otherSystem: ActorSystem[_], typeKey: EntityTypeKey[_]): ActorRef[ConsumerRebalanceEvent] = {
val num = shardingRebalanceListenerActorNum.getAndIncrement()
otherSystem
.systemActorOf(RebalanceListener(typeKey), s"kafka-cluster-sharding-rebalance-listener-$num")
def rebalanceListener(
typeKey: akka.cluster.sharding.typed.javadsl.EntityTypeKey[_]
): akka.actor.typed.ActorRef[ConsumerRebalanceEvent] = {
rebalanceListener(typeKey.asScala)
}
}

Expand Down Expand Up @@ -250,4 +334,14 @@ object KafkaClusterSharding extends ExtensionId[KafkaClusterSharding] {

override def createExtension(system: ExtendedActorSystem): KafkaClusterSharding =
new KafkaClusterSharding(system)

/**
* Java API
*/
override def get(system: ClassicActorSystemProvider): KafkaClusterSharding = super.get(system)

/**
* Java API
*/
override def get(system: ActorSystem): KafkaClusterSharding = super.get(system)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package docs.javadsl;

import akka.actor.typed.ActorSystem;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.Adapter;
import akka.actor.typed.javadsl.Behaviors;
import akka.cluster.sharding.external.ExternalShardAllocationStrategy;
import akka.cluster.sharding.typed.javadsl.ClusterSharding;
import akka.cluster.sharding.typed.javadsl.Entity;
import akka.cluster.sharding.typed.javadsl.EntityTypeKey;
import akka.kafka.AutoSubscription;
import akka.kafka.ConsumerRebalanceEvent;
import akka.kafka.ConsumerSettings;
import akka.kafka.Subscriptions;
import akka.kafka.cluster.sharding.KafkaClusterSharding;
import akka.kafka.javadsl.Consumer;
import akka.util.Timeout;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.concurrent.CompletionStage;

public class ClusterShardingExample {

// #user-entity
static final class User {
public final String id;
public final String mame;

User(String id, String mame) {
this.id = id;
this.mame = mame;
}
}
// #user-entity

public static Behavior<User> userBehaviour() {
return Behaviors.empty();
}

public static void example() {
ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "ClusterShardingExample");
String kafkaBootstrapServers = "localhost:9092";

// #message-extractor
// automatically retrieving the number of partitions requires a round trip to a Kafka broker
CompletionStage<KafkaClusterSharding.KafkaShardingNoEnvelopeExtractor<User>> messageExtractor =
KafkaClusterSharding.get(system)
.messageExtractorNoEnvelope(
"user-topic",
Duration.ofSeconds(10),
(User msg) -> msg.id,
ConsumerSettings.create(
Adapter.toClassic(system), new StringDeserializer(), new StringDeserializer()));
// #message-extractor

// #setup-cluster-sharding

String groupId = "user-topic-group-id";
EntityTypeKey<User> typeKey = EntityTypeKey.create(User.class, groupId);

messageExtractor.thenAccept(
extractor ->
ClusterSharding.get(system)
.init(
Entity.of(typeKey, ctx -> userBehaviour())
.withAllocationStrategy(
new ExternalShardAllocationStrategy(
system, typeKey.name(), Timeout.create(Duration.ofSeconds(5))))
.withMessageExtractor(extractor)));
// #setup-cluster-sharding

// #rebalance-listener
akka.actor.typed.ActorRef<ConsumerRebalanceEvent> rebalanceListener =
KafkaClusterSharding.get(system).rebalanceListener(typeKey);

ConsumerSettings<String, byte[]> consumerSettings =
ConsumerSettings.create(
Adapter.toClassic(system), new StringDeserializer(), new ByteArrayDeserializer())
.withBootstrapServers(kafkaBootstrapServers)
.withGroupId(
typeKey
.name()); // use the same group id as we used in the `EntityTypeKey` for `User`

// pass the rebalance listener to the topic subscription
AutoSubscription subscription =
Subscriptions.topics("user-topic")
.withRebalanceListener(Adapter.toClassic(rebalanceListener));

Consumer.plainSource(consumerSettings, subscription);
// #rebalance-listener

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,10 @@ import scala.util.{Failure, Success}
* https://github.com/akka/akka-samples/tree/2.6/akka-sample-kafka-to-sharding-scala
*/
object ClusterShardingExample {
val typedSystem = ActorSystem(Behaviors.empty, "ClusterShardingExample")
val classicSystem = typedSystem.toClassic
val system = ActorSystem(Behaviors.empty, "ClusterShardingExample")
val kafkaBootstrapServers = "localhost:9092"

implicit val ec = typedSystem.executionContext
implicit val ec = system.executionContext

def userBehaviour(): Behavior[User] = Behaviors.empty[User]

Expand All @@ -43,11 +42,11 @@ object ClusterShardingExample {
// #message-extractor
// automatically retrieving the number of partitions requires a round trip to a Kafka broker
val messageExtractor: Future[KafkaClusterSharding.KafkaShardingNoEnvelopeExtractor[User]] =
KafkaClusterSharding(typedSystem.toClassic).messageExtractorNoEnvelope(
KafkaClusterSharding(system.toClassic).messageExtractorNoEnvelope(
timeout = 10.seconds,
topic = "user-topic",
entityIdExtractor = (msg: User) => msg.id,
settings = ConsumerSettings(classicSystem, new StringDeserializer, new StringDeserializer)
settings = ConsumerSettings(system.toClassic, new StringDeserializer, new StringDeserializer)
.withBootstrapServers(kafkaBootstrapServers)
)
// #message-extractor
Expand All @@ -59,27 +58,27 @@ object ClusterShardingExample {

messageExtractor.onComplete {
case Success(extractor) =>
ClusterSharding(typedSystem).init(
ClusterSharding(system).init(
Entity(typeKey)(createBehavior = _ => userBehaviour())
.withAllocationStrategy(new ExternalShardAllocationStrategy(typedSystem, typeKey.name))
.withAllocationStrategy(new ExternalShardAllocationStrategy(system, typeKey.name))
.withMessageExtractor(extractor)
.withSettings(ClusterShardingSettings(typedSystem))
.withSettings(ClusterShardingSettings(system))
)
case Failure(ex) => typedSystem.log.error("An error occurred while obtaining the message extractor", ex)
case Failure(ex) => system.log.error("An error occurred while obtaining the message extractor", ex)
}
// #setup-cluster-sharding

// #rebalance-listener
// obtain an Akka classic ActorRef that will handle consumer group rebalance events
val rebalanceListener: akka.actor.typed.ActorRef[ConsumerRebalanceEvent] =
KafkaClusterSharding(classicSystem).rebalanceListener(typeKey)
KafkaClusterSharding(system.toClassic).rebalanceListener(typeKey)

// convert the rebalance listener to a classic ActorRef
// convert the rebalance listener to a classic ActorRef until Alpakka Kafka supports Akka Typed
import akka.actor.typed.scaladsl.adapter._
val rebalanceListenerClassic: akka.actor.ActorRef = rebalanceListener.toClassic

val consumerSettings =
ConsumerSettings(classicSystem, new StringDeserializer, new ByteArrayDeserializer)
ConsumerSettings(system.toClassic, new StringDeserializer, new ByteArrayDeserializer)
.withBootstrapServers(kafkaBootstrapServers)
.withGroupId(typeKey.name) // use the same group id as we used in the `EntityTypeKey` for `User`

Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/akka/kafka/Subscriptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,12 @@ sealed trait AutoSubscription extends Subscription {
}
}

@ApiMayChange
sealed trait ConsumerRebalanceEvent
@ApiMayChange
final case class TopicPartitionsAssigned(sub: Subscription, topicPartitions: Set[TopicPartition])
extends ConsumerRebalanceEvent
@ApiMayChange
final case class TopicPartitionsRevoked(sub: Subscription, topicPartitions: Set[TopicPartition])
extends ConsumerRebalanceEvent

Expand Down
12 changes: 12 additions & 0 deletions docs/src/main/paradox/cluster-sharding.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,25 @@ Given a user entity.
Scala
: @@snip [snip](/cluster-sharding/src/test/scala/docs/scaladsl/ClusterShardingExample.scala) { #user-entity }

Java
: @@snip [snip](/cluster-sharding/src/test/java/docs/javadsl/ClusterShardingExample.java) { #user-entity }

Create a `MessageExtractor`.

Scala
: @@snip [snip](/cluster-sharding/src/test/scala/docs/scaladsl/ClusterShardingExample.scala) { #message-extractor }

Java
: @@snip [snip](/cluster-sharding/src/test/java/docs/javadsl/ClusterShardingExample.java) { #message-extractor }

Setup Akka Typed Cluster Sharding.

Scala
: @@snip [snip](/cluster-sharding/src/test/scala/docs/scaladsl/ClusterShardingExample.scala) { #setup-cluster-sharding }

Java
: @@snip [snip](/cluster-sharding/src/test/java/docs/javadsl/ClusterShardingExample.java) { #setup-cluster-sharding }

## Rebalance Listener

The Rebalance Listener is a pre-defined Actor that will handle @scaladoc[ConsumerRebalanceEvents](akka.kafka.ConsumerRebalanceEvent) that will update the Akka Cluster External Sharding strategy when subscribed partitions are re-assigned to consumers running on different cluster nodes.
Expand All @@ -81,3 +90,6 @@ Create the rebalance listener using the extension and pass it into an Alpakka Ka

Scala
: @@snip [snip](/cluster-sharding/src/test/scala/docs/scaladsl/ClusterShardingExample.scala) { #rebalance-listener }

Java
: @@snip [snip](/cluster-sharding/src/test/java/docs/javadsl/ClusterShardingExample.java) { #rebalance-listener }

0 comments on commit feabac8

Please sign in to comment.