Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Docs: use apidoc for most classes #1002

Merged
merged 2 commits into from
Dec 12, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions docs/src/main/paradox/atleastonce.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ the flow in a particular state, and that state could be unlikely to occur.

When connecting a committable source to a producer flow, some applications may require each consumed message to produce more than one message. In that case, in order to preserve at-least-once semantics, the message offset should only be committed after all associated messages have been produced.

To achieve this, use the @scaladoc[ProducerMessage.MultiMessage](akka.kafka.ProducerMessage$$MultiMessage) implementation of @scaladoc[ProducerMessage.Envelope](akka.kafka.ProducerMessage$$Envelope):
To achieve this, use the @apidoc[ProducerMessage.MultiMessage] implementation of @apidoc[ProducerMessage.Envelope]:

Scala
: @@ snip [snip](/tests/src/test/scala/docs/scaladsl/AtLeastOnce.scala) { #oneToMany }
Expand All @@ -27,9 +27,9 @@ Java

### Batches

If committable messages are processed in batches (using `batch` or `grouped`), it is also important to commit the resulting @scaladoc[CommittableOffsetBatch](akka.kafka.ConsumerMessage$$CommittableOffsetBatch) only after all messages in the batch are fully processed.
If committable messages are processed in batches (using `batch` or `grouped`), it is also important to commit the resulting @apidoc[ConsumerMessage.CommittableOffsetBatch] only after all messages in the batch are fully processed.

Should the batch need to be split up again, using mapConcat, care should be taken to associate the @scaladoc[CommittableOffsetBatch](akka.kafka.ConsumerMessage$$CommittableOffsetBatch) only with the last message. This scenario could occur if we created batches to more efficiently update a database and then needed to split up the batches to send individual messages to a Kafka producer flow.
Should the batch need to be split up again, using mapConcat, care should be taken to associate the @apidoc[ConsumerMessage.CommittableOffsetBatch] only with the last message. This scenario could occur if we created batches to more efficiently update a database and then needed to split up the batches to send individual messages to a Kafka producer flow.

### Multiple Destinations

Expand Down Expand Up @@ -67,7 +67,7 @@ This is a significant challenge. Below we suggest a few strategies to deal with

Since @javadoc[ProducerRecord](org.apache.kafka.clients.producer.ProducerRecord) contains the destination topic, it is possible to use a single producer flow to write to any number of topics. This preserves the ordering of messages coming from the committable source. Since the destination topics likely admit different types of messages, it will be necessary to serialize the messages to the appropriate input type for the common producer flow, which could be a byte array or a string.

In case a committable message should lead to the production of multiple messages, the @scaladoc[ProducerMessage.MultiMessage](akka.kafka.ProducerMessage$$MultiMessage) is available. If no messages should be produced, the @scaladoc[ProducerMessage.PassThroughMessage](akka.kafka.ProducerMessage$$PassThroughMessage) can be used.
In case a committable message should lead to the production of multiple messages, the @apidoc[ProducerMessage.MultiMessage] is available. If no messages should be produced, the @apidoc[ProducerMessage.PassThroughMessage] can be used.

Scala
: @@ snip [snip](/tests/src/test/scala/docs/scaladsl/AtLeastOnce.scala) { #oneToConditional }
Expand All @@ -82,4 +82,4 @@ Failure to deserialize a message is a particular case of conditional message pro

Why can't we commit the offsets of bad messages as soon as we encounter them, instead of passing them downstream? Because the previous offsets, for messages that have deserialized successfully, may not have been committed yet. That's possible if the downstream flow includes a buffer, an asynchronous boundary or performs batching. It is then likely that some previous messages would concurrently be making their way downstream to a final committing stage.

Note that here we assume that we take the full control over the handling of messages that fail to deserialize. To do this, we should not ask for the deserialization to be performed by the committable source. We can instead create a @apidoc[ConsumerSettings$] parametrized by byte arrays. A subsequent `map` can deserialize and use @scaladoc[ProducerMessage.PassThroughMessage](akka.kafka.ProducerMessage$$PassThroughMessage) to skip bad messages.
Note that here we assume that we take the full control over the handling of messages that fail to deserialize. To do this, we should not ask for the deserialization to be performed by the committable source. We can instead create a @apidoc[ConsumerSettings$] parametrized by byte arrays. A subsequent `map` can deserialize and use @apidoc[ProducerMessage.PassThroughMessage] to skip bad messages.
2 changes: 1 addition & 1 deletion docs/src/main/paradox/consumer-metadata.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ Java

## Accessing metadata using KafkaConsumerActor

To access the Kafka consumer metadata you need to create the @apidoc[akka.kafka.KafkaConsumerActor$] as described in the @ref[Consumer documentation](consumer.md#sharing-the-kafkaconsumer-instance) and send messages from @scaladoc[Metadata](akka.kafka.Metadata$) to it.
To access the Kafka consumer metadata you need to create the @apidoc[akka.kafka.KafkaConsumerActor$] as described in the @ref[Consumer documentation](consumer.md#sharing-the-kafkaconsumer-instance) and send messages from @apidoc[Metadata$] to it.

## Supported metadata by KafkaConsumerActor

Expand Down
16 changes: 7 additions & 9 deletions docs/src/main/paradox/consumer.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Alpakka Kafka offers a large variety of consumers that connect to Kafka and stre

### Consumers

These factory methods are part of the @scala[@scaladoc[Consumer API](akka.kafka.scaladsl.Consumer$)]@java[@scaladoc[Consumer API](akka.kafka.javadsl.Consumer$)].
These factory methods are part of the @apidoc[Consumer$] API.

| Offsets handling | Partition aware | Subscription | Shared consumer | Factory method | Stream element type |
|-----------------------------------------|-----------------|---------------------|-----------------|----------------|---------------------|
Expand All @@ -33,7 +33,7 @@ These factory methods are part of the @scala[@scaladoc[Consumer API](akka.kafka.

### Transactional consumers

These factory methods are part of the @scala[@scaladoc[Transactional API](akka.kafka.scaladsl.Transactional$)]@java[@scaladoc[Transactional API](akka.kafka.javadsl.Transactional$)]. For details see @ref[Transactions](transactions.md).
These factory methods are part of the @apidoc[Transactional$]. For details see @ref[Transactions](transactions.md).

| Offsets handling | Partition aware | Shared consumer | Factory method | Stream element type |
|-----------------------------------|-----------------|-----------------|----------------|---------------------|
Expand All @@ -55,7 +55,7 @@ Alpakka Kafka's defaults for all settings are defined in `reference.conf` which
Important consumer settings
: | Setting | Description |
|-------------|----------------------------------------------|
| stop-timeout | The stage will delay stopping the internal actor to allow processing of messages already in the stream (required for successful committing). This can be set to 0 for streams using @scala[@scaladoc[DrainingControl](akka.kafka.scaladsl.Consumer$$DrainingControl)]@java[@scaladoc[DrainingControl](akka.kafka.javadsl.Consumer$$DrainingControl)] |
| stop-timeout | The stage will delay stopping the internal actor to allow processing of messages already in the stream (required for successful committing). This can be set to 0 for streams using @apidoc[Consumer.DrainingControl] |
| kafka-clients | Section for properties passed unchanged to the Kafka client (see @extref:[Kafka's Consumer Configs](kafka:/documentation.html#consumerconfigs)) |
| connection-checker | Configuration to let the stream fail if the connection to the Kafka broker fails. |

Expand Down Expand Up @@ -205,7 +205,7 @@ Java

In some cases you may wish to use external offset storage as your primary means to manage offsets, but also commit offsets to Kafka.
This gives you all the benefits of controlling offsets described in @ref:[Offset Storage external to Kafka](#offset-storage-external-to-kafka) and allows you to use tooling in the Kafka ecosystem to track consumer group lag.
You can use the @apidoc[Consumer.committablePartitionedManualOffsetSource](Consumer$) source, which emits a @scaladoc[CommittableMessage](akka.kafka.ConsumerMessage$$CommittableMessage), to seek to appropriate offsets on startup, do your processing, commit to external storage, and then commit offsets back to Kafka.
You can use the @apidoc[Consumer.committablePartitionedManualOffsetSource](Consumer$) source, which emits a @apidoc[ConsumerMessage.CommittableMessage], to seek to appropriate offsets on startup, do your processing, commit to external storage, and then commit offsets back to Kafka.
This will only provide at-least-once guarantees for your consumer group lag monitoring because it's possible for a failure between storing your offsets externally and committing to Kafka, but it will give you a more accurate representation of consumer group lag then when turning on auto commits with the `enable.auto.commit` consumer property.

## Consume "at-most-once"
Expand All @@ -228,7 +228,7 @@ How to achieve at-least-once delivery semantics is covered in @ref:[At-Least-Onc

For cases when you need to read messages from one topic, transform or enrich them, and then write to another topic you can use @apidoc[Consumer.committableSource](Consumer$) and connect it to a @apidoc[Producer.committableSink](Producer$). The `committableSink` will commit the offset back to the consumer regularly.

The `committableSink` accepts implementations @scaladoc[ProducerMessage.Envelope](akka.kafka.ProducerMessage$$Envelope) that contain the offset to commit the consumption of the originating message (of type (@scaladoc[ConsumerMessage.Committable](akka.kafka.ConsumerMessage$$Committable)). See @ref[Producing messages](producer.md#producing-messages) about different implementations of @scaladoc[Envelope](akka.kafka.ProducerMessage$$Envelope).
The `committableSink` accepts implementations @apidoc[ProducerMessage.Envelope] that contain the offset to commit the consumption of the originating message (of type @apidoc[akka.kafka.ConsumerMessage.Committable]). See @ref[Producing messages](producer.md#producing-messages) about different implementations of @apidoc[ProducerMessage.Envelope].

Scala
: @@ snip [snip](/tests/src/test/scala/docs/scaladsl/ConsumerExample.scala) { #consumerToProducerSink }
Expand Down Expand Up @@ -294,8 +294,7 @@ Accessing of Kafka consumer metadata is possible as described in @ref[Consumer M


## Controlled shutdown
The @apidoc[Source] created with @apidoc[Consumer.plainSource](Consumer$) and similar methods materializes to a @scala[@scaladoc[Consumer.Control](akka.kafka.scaladsl.Consumer$$Control)]@java[@scaladoc[Consumer.Control](akka.kafka.javadsl.Consumer$$Control)]
instance. This can be used to stop the stream in a controlled manner.
The @apidoc[Source] created with @apidoc[Consumer.plainSource](Consumer$) and similar methods materializes to a @apidoc[akka.kafka.(javadsl|scaladsl).Consumer.Control] instance. This can be used to stop the stream in a controlled manner.

When using external offset storage, a call to `Consumer.Control.shutdown()` suffices to complete the `Source`, which starts the completion of the stream.

Expand All @@ -313,8 +312,7 @@ When you are using offset storage in Kafka, the shutdown process involves severa

### Draining control

To manage this shutdown process, use the
@scala[@scaladoc[Consumer.DrainingControl](akka.kafka.scaladsl.Consumer$$DrainingControl)]@java[@scaladoc[Consumer.DrainingControl](akka.kafka.javadsl.Consumer$$DrainingControl)]
To manage this shutdown process, use the @apidoc[Consumer.DrainingControl]
by combining the `Consumer.Control` with the sink's materialized completion future in `mapMaterializedValue`. That control offers the method `drainAndShutdown` which implements the process descibed above.

Note: The @apidoc[ConsumerSettings] `stop-timeout` delays stopping the Kafka Consumer and the stream, but when using `drainAndShutdown` that delay is not required and can be set to zero (as below).
Expand Down
20 changes: 10 additions & 10 deletions docs/src/main/paradox/producer.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,12 @@ Java

## Producing messages

Sinks and flows accept implementations of @scaladoc[ProducerMessage.Envelope](akka.kafka.ProducerMessage$$Envelope) as input. They contain an extra field to pass through data, the so called `passThrough`. Its value is passed through the flow and becomes available in the @scaladoc[ProducerMessage.Results](akka.kafka.ProducerMessage$$Results)' `passThrough()`. It can for example hold a @scaldoc[ConsumerMessage.CommittableOffset](akka.kafka.ConsumerMessage$$CommittableOffset) or @scaldoc[ConsumerMessage.CommittableOffsetBatch](akka.kafka.ConsumerMessage$$CommittableOffsetBatch) (from a @apidoc[Consumer.committableSource](Consumer$)) that can be committed after publishing to Kafka.
Sinks and flows accept implementations of @apidoc[ProducerMessage.Envelope] as input. They contain an extra field to pass through data, the so called `passThrough`. Its value is passed through the flow and becomes available in the @apidoc[ProducerMessage.Results]' `passThrough()`. It can for example hold a @apidoc[akka.kafka.ConsumerMessage.CommittableOffset] or @apidoc[ConsumerMessage.CommittableOffsetBatch] from a @apidoc[Consumer.committableSource](Consumer$) that can be committed after publishing to Kafka.
ennru marked this conversation as resolved.
Show resolved Hide resolved


### Produce a single message to Kafka

To create one message to a Kafka topic, use the @scaladoc[ProducerMessage.Message](akka.kafka.ProducerMessage$$Message) type as in
To create one message to a Kafka topic, use the @apidoc[ProducerMessage.Message] type as in

Scala
: @@ snip [snip](/tests/src/test/scala/docs/scaladsl/ProducerExample.scala) { #singleMessage }
Expand All @@ -105,7 +105,7 @@ Java
: @@ snip [snip](/tests/src/test/java/docs/javadsl/ProducerWithTestcontainersTest.java) { #singleMessage }


For flows the @scaladoc[ProducerMessage.Message](akka.kafka.ProducerMessage$$Message)s continue as @scaladoc[ProducerMessage.Result](akka.kafka.ProducerMessage$$Result) elements containing:
For flows the @apidoc[ProducerMessage.Message]s continue as @apidoc[akka.kafka.ProducerMessage.Result] elements containing:
ennru marked this conversation as resolved.
Show resolved Hide resolved

1. the original input message,
1. the record metadata (Kafka @javadoc[RecordMetadata](org.apache.kafka.clients.producer.RecordMetadata) API), and
Expand All @@ -114,17 +114,17 @@ For flows the @scaladoc[ProducerMessage.Message](akka.kafka.ProducerMessage$$Mes

### Let one stream element produce multiple messages to Kafka

The @scaladoc[ProducerMessage.MultiMessage](akka.kafka.ProducerMessage$$MultiMessage) contains a list of @javadoc[ProducerRecord](org.apache.kafka.clients.producer.ProducerRecord)s to produce multiple messages to Kafka topics.
The @apidoc[ProducerMessage.MultiMessage] contains a list of @javadoc[ProducerRecord](org.apache.kafka.clients.producer.ProducerRecord)s to produce multiple messages to Kafka topics.

Scala
: @@ snip [snip](/tests/src/test/scala/docs/scaladsl/ProducerExample.scala) { #multiMessage }

Java
: @@ snip [snip](/tests/src/test/java/docs/javadsl/ProducerWithTestcontainersTest.java) { #multiMessage }

For flows the @scaladoc[ProducerMessage.MultiMessage](akka.kafka.ProducerMessage$$MultiMessage)s continue as @scaladoc[ProducerMessage.MultiResult](akka.kafka.ProducerMessage$$MultiResult) elements containing:
For flows the @apidoc[ProducerMessage.MultiMessage]s continue as @apidoc[akka.kafka.ProducerMessage.MultiResult] elements containing:
ennru marked this conversation as resolved.
Show resolved Hide resolved

1. a list of @scaladoc[ProducerMessage.MultiResultPart](akka.kafka.ProducerMessage$$MultiResultPart) with
1. a list of @apidoc[ProducerMessage.MultiResultPart] with
1. the original input message,
1. the record metadata (Kafka @javadoc[RecordMetadata](org.apache.kafka.clients.producer.RecordMetadata) API), and
1. the `passThrough` data.
Expand All @@ -133,7 +133,7 @@ For flows the @scaladoc[ProducerMessage.MultiMessage](akka.kafka.ProducerMessage

### Let a stream element pass through, without producing a message to Kafka

The @scaladoc[ProducerMessage.PassThroughMessage](akka.kafka.ProducerMessage$$PassThroughMessage) allows to let an element pass through a Kafka flow without producing a new message to a Kafka topic. This is primarily useful with Kafka commit offsets and transactions, so that these can be committed without producing new messages.
The @apidoc[ProducerMessage.PassThroughMessage] allows to let an element pass through a Kafka flow without producing a new message to a Kafka topic. This is primarily useful with Kafka commit offsets and transactions, so that these can be committed without producing new messages.

Scala
: @@ snip [snip](/tests/src/test/scala/docs/scaladsl/ProducerExample.scala) { #passThroughMessage }
Expand All @@ -142,13 +142,13 @@ Java
: @@ snip [snip](/tests/src/test/java/docs/javadsl/ProducerWithTestcontainersTest.java) { #passThroughMessage }


For flows the @scaladoc[ProducerMessage.PassThroughMessage](akka.kafka.ProducerMessage$$PassThroughMessage)s continue as @scaladoc[ProducerMessage.PassThroughResult](akka.kafka.ProducerMessage$$PassThroughResult) elements containing the `passThrough` data.
For flows the @apidoc[ProducerMessage.PassThroughMessage]s continue as @apidoc[ProducerMessage.PassThroughResult] elements containing the `passThrough` data.


## Producer as a Flow

@apidoc[Producer.flexiFlow](Producer$) { java="#flexiFlow[K,V,PassThrough](settings:akka.kafka.ProducerSettings[K,V]):akka.stream.javadsl.Flow[akka.kafka.ProducerMessage.Envelope[K,V,PassThrough],akka.kafka.ProducerMessage.Results[K,V,PassThrough],akka.NotUsed]" scala="#flexiFlow[K,V,PassThrough](settings:akka.kafka.ProducerSettings[K,V]):akka.stream.scaladsl.Flow[akka.kafka.ProducerMessage.Envelope[K,V,PassThrough],akka.kafka.ProducerMessage.Results[K,V,PassThrough],akka.NotUsed]" }
allows the stream to continue after publishing messages to Kafka. It accepts implementations of @scaladoc[ProducerMessage.Envelope](akka.kafka.ProducerMessage$$Envelope) as input, which continue in the flow as implementations of @scaladoc[ProducerMessage.Results](akka.kafka.ProducerMessage$$Results).
allows the stream to continue after publishing messages to Kafka. It accepts implementations of @apidoc[ProducerMessage.Envelope] as input, which continue in the flow as implementations of @apidoc[ProducerMessage.Results].


Scala
Expand All @@ -160,7 +160,7 @@ Java

## Connecting a Producer to a Consumer

The `passThrough` can for example hold a @scaladoc[ConsumerMessage.Committable](akka.kafka.ConsumerMessage$$Committable) that can be committed after publishing to Kafka.
The `passThrough` can for example hold a @apidoc[akka.kafka.ConsumerMessage.Committable] that can be committed after publishing to Kafka.

Scala
: @@ snip [snip](/tests/src/test/scala/docs/scaladsl/ConsumerExample.scala) { #consumerToProducerSink }
Expand Down
2 changes: 1 addition & 1 deletion docs/src/main/paradox/subscription.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Consumer Sources are created with different types of subscriptions, which contro

Subscriptions are grouped into two categories: with automatic partition assignment and with manual control of partition assignment.

Factory methods for all subscriptions can be found in the @scaladoc[Subscriptions](akka.kafka.Subscriptions$) factory.
Factory methods for all subscriptions can be found in the @apidoc[Subscriptions$] factory.

## Automatic Partition Assignment

Expand Down
Loading