diff --git a/docs/src/main/paradox/atleastonce.md b/docs/src/main/paradox/atleastonce.md index ef4361934..904d80938 100644 --- a/docs/src/main/paradox/atleastonce.md +++ b/docs/src/main/paradox/atleastonce.md @@ -16,7 +16,7 @@ the flow in a particular state, and that state could be unlikely to occur. When connecting a committable source to a producer flow, some applications may require each consumed message to produce more than one message. In that case, in order to preserve at-least-once semantics, the message offset should only be committed after all associated messages have been produced. -To achieve this, use the @scaladoc[ProducerMessage.MultiMessage](akka.kafka.ProducerMessage$$MultiMessage) implementation of @scaladoc[ProducerMessage.Envelope](akka.kafka.ProducerMessage$$Envelope): +To achieve this, use the @apidoc[ProducerMessage.MultiMessage] implementation of @apidoc[ProducerMessage.Envelope]: Scala : @@ snip [snip](/tests/src/test/scala/docs/scaladsl/AtLeastOnce.scala) { #oneToMany } @@ -27,9 +27,9 @@ Java ### Batches -If committable messages are processed in batches (using `batch` or `grouped`), it is also important to commit the resulting @scaladoc[CommittableOffsetBatch](akka.kafka.ConsumerMessage$$CommittableOffsetBatch) only after all messages in the batch are fully processed. +If committable messages are processed in batches (using `batch` or `grouped`), it is also important to commit the resulting @apidoc[ConsumerMessage.CommittableOffsetBatch] only after all messages in the batch are fully processed. -Should the batch need to be split up again, using mapConcat, care should be taken to associate the @scaladoc[CommittableOffsetBatch](akka.kafka.ConsumerMessage$$CommittableOffsetBatch) only with the last message. This scenario could occur if we created batches to more efficiently update a database and then needed to split up the batches to send individual messages to a Kafka producer flow. +Should the batch need to be split up again, using mapConcat, care should be taken to associate the @apidoc[ConsumerMessage.CommittableOffsetBatch] only with the last message. This scenario could occur if we created batches to more efficiently update a database and then needed to split up the batches to send individual messages to a Kafka producer flow. ### Multiple Destinations @@ -67,7 +67,7 @@ This is a significant challenge. Below we suggest a few strategies to deal with Since @javadoc[ProducerRecord](org.apache.kafka.clients.producer.ProducerRecord) contains the destination topic, it is possible to use a single producer flow to write to any number of topics. This preserves the ordering of messages coming from the committable source. Since the destination topics likely admit different types of messages, it will be necessary to serialize the messages to the appropriate input type for the common producer flow, which could be a byte array or a string. -In case a committable message should lead to the production of multiple messages, the @scaladoc[ProducerMessage.MultiMessage](akka.kafka.ProducerMessage$$MultiMessage) is available. If no messages should be produced, the @scaladoc[ProducerMessage.PassThroughMessage](akka.kafka.ProducerMessage$$PassThroughMessage) can be used. +In case a committable message should lead to the production of multiple messages, the @apidoc[ProducerMessage.MultiMessage] is available. If no messages should be produced, the @apidoc[ProducerMessage.PassThroughMessage] can be used. Scala : @@ snip [snip](/tests/src/test/scala/docs/scaladsl/AtLeastOnce.scala) { #oneToConditional } @@ -82,4 +82,4 @@ Failure to deserialize a message is a particular case of conditional message pro Why can't we commit the offsets of bad messages as soon as we encounter them, instead of passing them downstream? Because the previous offsets, for messages that have deserialized successfully, may not have been committed yet. That's possible if the downstream flow includes a buffer, an asynchronous boundary or performs batching. It is then likely that some previous messages would concurrently be making their way downstream to a final committing stage. -Note that here we assume that we take the full control over the handling of messages that fail to deserialize. To do this, we should not ask for the deserialization to be performed by the committable source. We can instead create a @apidoc[ConsumerSettings$] parametrized by byte arrays. A subsequent `map` can deserialize and use @scaladoc[ProducerMessage.PassThroughMessage](akka.kafka.ProducerMessage$$PassThroughMessage) to skip bad messages. +Note that here we assume that we take the full control over the handling of messages that fail to deserialize. To do this, we should not ask for the deserialization to be performed by the committable source. We can instead create a @apidoc[ConsumerSettings$] parametrized by byte arrays. A subsequent `map` can deserialize and use @apidoc[ProducerMessage.PassThroughMessage] to skip bad messages. diff --git a/docs/src/main/paradox/consumer-metadata.md b/docs/src/main/paradox/consumer-metadata.md index 71719b03d..bcbf73363 100644 --- a/docs/src/main/paradox/consumer-metadata.md +++ b/docs/src/main/paradox/consumer-metadata.md @@ -46,7 +46,7 @@ Java ## Accessing metadata using KafkaConsumerActor -To access the Kafka consumer metadata you need to create the @apidoc[akka.kafka.KafkaConsumerActor$] as described in the @ref[Consumer documentation](consumer.md#sharing-the-kafkaconsumer-instance) and send messages from @scaladoc[Metadata](akka.kafka.Metadata$) to it. +To access the Kafka consumer metadata you need to create the @apidoc[akka.kafka.KafkaConsumerActor$] as described in the @ref[Consumer documentation](consumer.md#sharing-the-kafkaconsumer-instance) and send messages from @apidoc[Metadata$] to it. ## Supported metadata by KafkaConsumerActor diff --git a/docs/src/main/paradox/consumer.md b/docs/src/main/paradox/consumer.md index 914b12bb8..9bc179de8 100644 --- a/docs/src/main/paradox/consumer.md +++ b/docs/src/main/paradox/consumer.md @@ -14,7 +14,7 @@ Alpakka Kafka offers a large variety of consumers that connect to Kafka and stre ### Consumers -These factory methods are part of the @scala[@scaladoc[Consumer API](akka.kafka.scaladsl.Consumer$)]@java[@scaladoc[Consumer API](akka.kafka.javadsl.Consumer$)]. +These factory methods are part of the @apidoc[Consumer$] API. | Offsets handling | Partition aware | Subscription | Shared consumer | Factory method | Stream element type | |-----------------------------------------|-----------------|---------------------|-----------------|----------------|---------------------| @@ -33,7 +33,7 @@ These factory methods are part of the @scala[@scaladoc[Consumer API](akka.kafka. ### Transactional consumers -These factory methods are part of the @scala[@scaladoc[Transactional API](akka.kafka.scaladsl.Transactional$)]@java[@scaladoc[Transactional API](akka.kafka.javadsl.Transactional$)]. For details see @ref[Transactions](transactions.md). +These factory methods are part of the @apidoc[Transactional$]. For details see @ref[Transactions](transactions.md). | Offsets handling | Partition aware | Shared consumer | Factory method | Stream element type | |-----------------------------------|-----------------|-----------------|----------------|---------------------| @@ -55,7 +55,7 @@ Alpakka Kafka's defaults for all settings are defined in `reference.conf` which Important consumer settings : | Setting | Description | |-------------|----------------------------------------------| -| stop-timeout | The stage will delay stopping the internal actor to allow processing of messages already in the stream (required for successful committing). This can be set to 0 for streams using @scala[@scaladoc[DrainingControl](akka.kafka.scaladsl.Consumer$$DrainingControl)]@java[@scaladoc[DrainingControl](akka.kafka.javadsl.Consumer$$DrainingControl)] | +| stop-timeout | The stage will delay stopping the internal actor to allow processing of messages already in the stream (required for successful committing). This can be set to 0 for streams using @apidoc[Consumer.DrainingControl] | | kafka-clients | Section for properties passed unchanged to the Kafka client (see @extref:[Kafka's Consumer Configs](kafka:/documentation.html#consumerconfigs)) | | connection-checker | Configuration to let the stream fail if the connection to the Kafka broker fails. | @@ -205,7 +205,7 @@ Java In some cases you may wish to use external offset storage as your primary means to manage offsets, but also commit offsets to Kafka. This gives you all the benefits of controlling offsets described in @ref:[Offset Storage external to Kafka](#offset-storage-external-to-kafka) and allows you to use tooling in the Kafka ecosystem to track consumer group lag. -You can use the @apidoc[Consumer.committablePartitionedManualOffsetSource](Consumer$) source, which emits a @scaladoc[CommittableMessage](akka.kafka.ConsumerMessage$$CommittableMessage), to seek to appropriate offsets on startup, do your processing, commit to external storage, and then commit offsets back to Kafka. +You can use the @apidoc[Consumer.committablePartitionedManualOffsetSource](Consumer$) source, which emits a @apidoc[ConsumerMessage.CommittableMessage], to seek to appropriate offsets on startup, do your processing, commit to external storage, and then commit offsets back to Kafka. This will only provide at-least-once guarantees for your consumer group lag monitoring because it's possible for a failure between storing your offsets externally and committing to Kafka, but it will give you a more accurate representation of consumer group lag then when turning on auto commits with the `enable.auto.commit` consumer property. ## Consume "at-most-once" @@ -228,7 +228,7 @@ How to achieve at-least-once delivery semantics is covered in @ref:[At-Least-Onc For cases when you need to read messages from one topic, transform or enrich them, and then write to another topic you can use @apidoc[Consumer.committableSource](Consumer$) and connect it to a @apidoc[Producer.committableSink](Producer$). The `committableSink` will commit the offset back to the consumer regularly. -The `committableSink` accepts implementations @scaladoc[ProducerMessage.Envelope](akka.kafka.ProducerMessage$$Envelope) that contain the offset to commit the consumption of the originating message (of type (@scaladoc[ConsumerMessage.Committable](akka.kafka.ConsumerMessage$$Committable)). See @ref[Producing messages](producer.md#producing-messages) about different implementations of @scaladoc[Envelope](akka.kafka.ProducerMessage$$Envelope). +The `committableSink` accepts implementations @apidoc[ProducerMessage.Envelope] that contain the offset to commit the consumption of the originating message (of type @apidoc[akka.kafka.ConsumerMessage.Committable]). See @ref[Producing messages](producer.md#producing-messages) about different implementations of @apidoc[ProducerMessage.Envelope]. Scala : @@ snip [snip](/tests/src/test/scala/docs/scaladsl/ConsumerExample.scala) { #consumerToProducerSink } @@ -294,8 +294,7 @@ Accessing of Kafka consumer metadata is possible as described in @ref[Consumer M ## Controlled shutdown -The @apidoc[Source] created with @apidoc[Consumer.plainSource](Consumer$) and similar methods materializes to a @scala[@scaladoc[Consumer.Control](akka.kafka.scaladsl.Consumer$$Control)]@java[@scaladoc[Consumer.Control](akka.kafka.javadsl.Consumer$$Control)] -instance. This can be used to stop the stream in a controlled manner. +The @apidoc[Source] created with @apidoc[Consumer.plainSource](Consumer$) and similar methods materializes to a @apidoc[akka.kafka.(javadsl|scaladsl).Consumer.Control] instance. This can be used to stop the stream in a controlled manner. When using external offset storage, a call to `Consumer.Control.shutdown()` suffices to complete the `Source`, which starts the completion of the stream. @@ -313,8 +312,7 @@ When you are using offset storage in Kafka, the shutdown process involves severa ### Draining control -To manage this shutdown process, use the -@scala[@scaladoc[Consumer.DrainingControl](akka.kafka.scaladsl.Consumer$$DrainingControl)]@java[@scaladoc[Consumer.DrainingControl](akka.kafka.javadsl.Consumer$$DrainingControl)] +To manage this shutdown process, use the @apidoc[Consumer.DrainingControl] by combining the `Consumer.Control` with the sink's materialized completion future in `mapMaterializedValue`. That control offers the method `drainAndShutdown` which implements the process descibed above. Note: The @apidoc[ConsumerSettings] `stop-timeout` delays stopping the Kafka Consumer and the stream, but when using `drainAndShutdown` that delay is not required and can be set to zero (as below). diff --git a/docs/src/main/paradox/producer.md b/docs/src/main/paradox/producer.md index 44969f5c9..17145706c 100644 --- a/docs/src/main/paradox/producer.md +++ b/docs/src/main/paradox/producer.md @@ -91,12 +91,12 @@ Java ## Producing messages -Sinks and flows accept implementations of @scaladoc[ProducerMessage.Envelope](akka.kafka.ProducerMessage$$Envelope) as input. They contain an extra field to pass through data, the so called `passThrough`. Its value is passed through the flow and becomes available in the @scaladoc[ProducerMessage.Results](akka.kafka.ProducerMessage$$Results)' `passThrough()`. It can for example hold a @scaldoc[ConsumerMessage.CommittableOffset](akka.kafka.ConsumerMessage$$CommittableOffset) or @scaldoc[ConsumerMessage.CommittableOffsetBatch](akka.kafka.ConsumerMessage$$CommittableOffsetBatch) (from a @apidoc[Consumer.committableSource](Consumer$)) that can be committed after publishing to Kafka. +Sinks and flows accept implementations of @apidoc[ProducerMessage.Envelope] as input. They contain an extra field to pass through data, the so called `passThrough`. Its value is passed through the flow and becomes available in the @apidoc[ProducerMessage.Results]' `passThrough()`. It can for example hold a @apidoc[ConsumerMessage.CommittableOffset] or @apidoc[ConsumerMessage.CommittableOffsetBatch] from a @apidoc[Consumer.committableSource](Consumer$) that can be committed after publishing to Kafka. ### Produce a single message to Kafka -To create one message to a Kafka topic, use the @scaladoc[ProducerMessage.Message](akka.kafka.ProducerMessage$$Message) type as in +To create one message to a Kafka topic, use the @apidoc[ProducerMessage.Message] type as in Scala : @@ snip [snip](/tests/src/test/scala/docs/scaladsl/ProducerExample.scala) { #singleMessage } @@ -105,7 +105,7 @@ Java : @@ snip [snip](/tests/src/test/java/docs/javadsl/ProducerWithTestcontainersTest.java) { #singleMessage } -For flows the @scaladoc[ProducerMessage.Message](akka.kafka.ProducerMessage$$Message)s continue as @scaladoc[ProducerMessage.Result](akka.kafka.ProducerMessage$$Result) elements containing: +For flows the @apidoc[ProducerMessage.Message]s continue as @apidoc[ProducerMessage.Result] elements containing: 1. the original input message, 1. the record metadata (Kafka @javadoc[RecordMetadata](org.apache.kafka.clients.producer.RecordMetadata) API), and @@ -114,7 +114,7 @@ For flows the @scaladoc[ProducerMessage.Message](akka.kafka.ProducerMessage$$Mes ### Let one stream element produce multiple messages to Kafka -The @scaladoc[ProducerMessage.MultiMessage](akka.kafka.ProducerMessage$$MultiMessage) contains a list of @javadoc[ProducerRecord](org.apache.kafka.clients.producer.ProducerRecord)s to produce multiple messages to Kafka topics. +The @apidoc[ProducerMessage.MultiMessage] contains a list of @javadoc[ProducerRecord](org.apache.kafka.clients.producer.ProducerRecord)s to produce multiple messages to Kafka topics. Scala : @@ snip [snip](/tests/src/test/scala/docs/scaladsl/ProducerExample.scala) { #multiMessage } @@ -122,9 +122,9 @@ Scala Java : @@ snip [snip](/tests/src/test/java/docs/javadsl/ProducerWithTestcontainersTest.java) { #multiMessage } -For flows the @scaladoc[ProducerMessage.MultiMessage](akka.kafka.ProducerMessage$$MultiMessage)s continue as @scaladoc[ProducerMessage.MultiResult](akka.kafka.ProducerMessage$$MultiResult) elements containing: +For flows the @apidoc[ProducerMessage.MultiMessage]s continue as @apidoc[ProducerMessage.MultiResult] elements containing: - 1. a list of @scaladoc[ProducerMessage.MultiResultPart](akka.kafka.ProducerMessage$$MultiResultPart) with + 1. a list of @apidoc[ProducerMessage.MultiResultPart] with 1. the original input message, 1. the record metadata (Kafka @javadoc[RecordMetadata](org.apache.kafka.clients.producer.RecordMetadata) API), and 1. the `passThrough` data. @@ -133,7 +133,7 @@ For flows the @scaladoc[ProducerMessage.MultiMessage](akka.kafka.ProducerMessage ### Let a stream element pass through, without producing a message to Kafka -The @scaladoc[ProducerMessage.PassThroughMessage](akka.kafka.ProducerMessage$$PassThroughMessage) allows to let an element pass through a Kafka flow without producing a new message to a Kafka topic. This is primarily useful with Kafka commit offsets and transactions, so that these can be committed without producing new messages. +The @apidoc[ProducerMessage.PassThroughMessage] allows to let an element pass through a Kafka flow without producing a new message to a Kafka topic. This is primarily useful with Kafka commit offsets and transactions, so that these can be committed without producing new messages. Scala : @@ snip [snip](/tests/src/test/scala/docs/scaladsl/ProducerExample.scala) { #passThroughMessage } @@ -142,13 +142,13 @@ Java : @@ snip [snip](/tests/src/test/java/docs/javadsl/ProducerWithTestcontainersTest.java) { #passThroughMessage } -For flows the @scaladoc[ProducerMessage.PassThroughMessage](akka.kafka.ProducerMessage$$PassThroughMessage)s continue as @scaladoc[ProducerMessage.PassThroughResult](akka.kafka.ProducerMessage$$PassThroughResult) elements containing the `passThrough` data. +For flows the @apidoc[ProducerMessage.PassThroughMessage]s continue as @apidoc[ProducerMessage.PassThroughResult] elements containing the `passThrough` data. ## Producer as a Flow @apidoc[Producer.flexiFlow](Producer$) { java="#flexiFlow[K,V,PassThrough](settings:akka.kafka.ProducerSettings[K,V]):akka.stream.javadsl.Flow[akka.kafka.ProducerMessage.Envelope[K,V,PassThrough],akka.kafka.ProducerMessage.Results[K,V,PassThrough],akka.NotUsed]" scala="#flexiFlow[K,V,PassThrough](settings:akka.kafka.ProducerSettings[K,V]):akka.stream.scaladsl.Flow[akka.kafka.ProducerMessage.Envelope[K,V,PassThrough],akka.kafka.ProducerMessage.Results[K,V,PassThrough],akka.NotUsed]" } -allows the stream to continue after publishing messages to Kafka. It accepts implementations of @scaladoc[ProducerMessage.Envelope](akka.kafka.ProducerMessage$$Envelope) as input, which continue in the flow as implementations of @scaladoc[ProducerMessage.Results](akka.kafka.ProducerMessage$$Results). +allows the stream to continue after publishing messages to Kafka. It accepts implementations of @apidoc[ProducerMessage.Envelope] as input, which continue in the flow as implementations of @apidoc[ProducerMessage.Results]. Scala @@ -160,7 +160,7 @@ Java ## Connecting a Producer to a Consumer -The `passThrough` can for example hold a @scaladoc[ConsumerMessage.Committable](akka.kafka.ConsumerMessage$$Committable) that can be committed after publishing to Kafka. +The `passThrough` can for example hold a @apidoc[akka.kafka.ConsumerMessage.Committable] that can be committed after publishing to Kafka. Scala : @@ snip [snip](/tests/src/test/scala/docs/scaladsl/ConsumerExample.scala) { #consumerToProducerSink } diff --git a/docs/src/main/paradox/subscription.md b/docs/src/main/paradox/subscription.md index 2effb786d..56c48f3f8 100644 --- a/docs/src/main/paradox/subscription.md +++ b/docs/src/main/paradox/subscription.md @@ -7,7 +7,7 @@ Consumer Sources are created with different types of subscriptions, which contro Subscriptions are grouped into two categories: with automatic partition assignment and with manual control of partition assignment. -Factory methods for all subscriptions can be found in the @scaladoc[Subscriptions](akka.kafka.Subscriptions$) factory. +Factory methods for all subscriptions can be found in the @apidoc[Subscriptions$] factory. ## Automatic Partition Assignment diff --git a/docs/src/main/paradox/testing-testcontainers.md b/docs/src/main/paradox/testing-testcontainers.md index 49247767f..dfaf585c8 100644 --- a/docs/src/main/paradox/testing-testcontainers.md +++ b/docs/src/main/paradox/testing-testcontainers.md @@ -9,8 +9,8 @@ Testcontainers also allow you to create a complete Kafka cluster (using Docker c ## Settings -You can override testcontainers settings to create multi-broker Kafka clusters, or to finetune Kafka Broker and ZooKeeper configuration, by updating @scaladoc[KafkaTestkitTestcontainersSettings](akka.kafka.testkit.KafkaTestkitTestcontainersSettings) in code or configuration. -The @scaladoc[KafkaTestkitTestcontainersSettings](akka.kafka.testkit.KafkaTestkitTestcontainersSettings) type can be used to perform actions such as: +You can override testcontainers settings to create multi-broker Kafka clusters, or to finetune Kafka Broker and ZooKeeper configuration, by updating @apidoc[KafkaTestkitTestcontainersSettings] in code or configuration. +The @apidoc[KafkaTestkitTestcontainersSettings] type can be used to perform actions such as: * Set the version of Confluent Platform docker images to use * Define number of Kafka brokers @@ -33,7 +33,7 @@ Java : @@snip [snip](/tests/src/test/java/docs/javadsl/TestkitTestcontainersTest.java) { #testcontainers-settings } -To see what options are available for configuring testcontainers using `configureKafka` and `configureZooKeeper` in @scaladoc[KafkaTestkitTestcontainersSettings](akka.kafka.testkit.KafkaTestkitTestcontainersSettings) see the API docs for [`KafkaContainer`](https://static.javadoc.io/org.testcontainers/kafka/$testcontainers.version$/org/testcontainers/containers/KafkaContainer.html) and @javadoc[GenericContainer](org.testcontainers.containers.GenericContainer). +To see what options are available for configuring testcontainers using `configureKafka` and `configureZooKeeper` in @apidoc[KafkaTestkitTestcontainersSettings] see the API docs for [`KafkaContainer`](https://static.javadoc.io/org.testcontainers/kafka/$testcontainers.version$/org/testcontainers/containers/KafkaContainer.html) and @javadoc[GenericContainer](org.testcontainers.containers.GenericContainer). ## Testing with a Docker Kafka cluster from Java code diff --git a/docs/src/main/paradox/transactions.md b/docs/src/main/paradox/transactions.md index 1fb54f141..1ba896682 100644 --- a/docs/src/main/paradox/transactions.md +++ b/docs/src/main/paradox/transactions.md @@ -9,7 +9,7 @@ For full details on how transactions are achieved in Kafka you may wish to revie ## Transactional Source -The @apidoc[Transactional.source](Transactional$) emits a @scaladoc[ConsumerMessage.TransactionalMessage](akka.kafka.ConsumerMessage$$TransactionalMessage) which contains topic, partition, and offset information required by the producer during the commit process. Unlike with @scaladoc[ConsumerMessage.CommittableMessage](akka.kafka.ConsumerMessage$$CommittableMessage), the user is not responsible for committing transactions, this is handled by a @apidoc[Transactional.flow](Transactional$) or @apidoc[Transactional.sink](Transactional$). +The @apidoc[Transactional.source](Transactional$) emits a @apidoc[ConsumerMessage.TransactionalMessage] which contains topic, partition, and offset information required by the producer during the commit process. Unlike with @apidoc[ConsumerMessage.CommittableMessage], the user is not responsible for committing transactions, this is handled by a @apidoc[Transactional.flow](Transactional$) or @apidoc[Transactional.sink](Transactional$). This source overrides the Kafka consumer property `isolation.level` to `read_committed`, so that only committed messages can be consumed. @@ -20,7 +20,7 @@ Only use this source if you have the intention to connect it to a @apidoc[Transa