You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Describe the bug
Re-creating SubSource for partition using the KafkaConsumer.CommittablePartitionedSource in parallel.
[WARNING][28.09.2021 18:15:03][Thread 0004][akka://system/system/kafka-consumer-2] RequestMessages from topic/partition ipmrequeststatistics [[2]] already requested by other stage ipmrequeststatistics [[2]]
Such errors only occur if there is a slow stage in the pipeline.
Also, such errors lead to data loss.
To Reproduce
Steps to reproduce the behavior:
Configuration
Configuration
akka {
loglevel = DEBUG
stream {
# Default flow materializer settings
materializer {
# Initial size of buffers used in stream elements
# Note: If you change this value also change the fallback value in ActorMaterializerSettings
initial-input-buffer-size = 4
# Maximum size of buffers used in stream elements
# Note: If you change this value also change the fallback value in ActorMaterializerSettings
max-input-buffer-size = 16
# Fully qualified config path which holds the dispatcher configuration
# to be used by FlowMaterialiser when creating Actors.
# When this value is left empty, the default-dispatcher will be used.
# Note: If you change this value also change the fallback value in ActorMaterializerSettings
dispatcher = ""
blocking-io-dispatcher = "akka.actor.default-blocking-io-dispatcher"
# Cleanup leaked publishers and subscribers when they are not used within a given
# deadline
subscription-timeout {
# when the subscription timeout is reached one of the following strategies on
# the "stale" publisher:
# cancel - cancel it (via `onError` or subscribing to the publisher and
# `cancel()`ing the subscription right away
# warn - log a warning statement about the stale element (then drop the
# reference to it)
# noop - do nothing (not recommended)
# Note: If you change this value also change the fallback value in StreamSubscriptionTimeoutSettings
mode = cancel
# time after which a subscriber / publisher is considered stale and eligible
# for cancelation (see `akka.stream.subscription-timeout.mode`)
# Note: If you change this value also change the fallback value in StreamSubscriptionTimeoutSettings
timeout = 5s
}
# Enable additional troubleshooting logging at DEBUG log level
# Note: If you change this value also change the fallback value in ActorMaterializerSettings
debug-logging = on
loglevel = DEBUG
# Maximum number of elements emitted in batch if downstream signals large demand
# Note: If you change this value also change the fallback value in ActorMaterializerSettings
output-burst-limit = 1000
# Enable automatic fusing of all graphs that are run. For short-lived streams
# this may cause an initial runtime overhead, but most of the time fusing is
# desirable since it reduces the number of Actors that are created.
# Note: If you change this value also change the fallback value in ActorMaterializerSettings
auto-fusing = off
# Those stream elements which have explicit buffers (like mapAsync, mapAsyncUnordered,
# buffer, flatMapMerge, Source.actorRef, Source.queue, etc.) will preallocate a fixed
# buffer upon stream materialization if the requested buffer size is less than this
# configuration parameter. The default is very high because failing early is better
# than failing under load.
#
# Buffers sized larger than this will dynamically grow/shrink and consume more memory
# per element than the fixed size buffers.
# Note: If you change this value also change the fallback value in ActorMaterializerSettings
max-fixed-buffer-size = 1000000000
# Maximum number of sync messages that actor can process for stream to substream communication.
# Parameter allows to interrupt synchronous processing to get upsteam/downstream messages.
# Allows to accelerate message processing that happening withing same actor but keep system responsive.
# Note: If you change this value also change the fallback value in ActorMaterializerSettings
sync-processing-limit = 1000
debug {
# Enables the fuzzing mode which increases the chance of race conditions
# by aggressively reordering events and making certain operations more
# concurrent than usual.
# This setting is for testing purposes, NEVER enable this in a production
# environment!
# To get the best results, try combining this setting with a throughput
# of 1 on the corresponding dispatchers.
# Note: If you change this value also change the fallback value in ActorMaterializerSettings
fuzzing-mode = off
}
stream-ref {
# Buffer of a SinkRef that is used to batch Request elements from the other side of the stream ref
#
# The buffer will be attempted to be filled eagerly even while the local stage did not request elements,
# because the delay of requesting over network boundaries is much higher.
buffer-capacity = 32
# Demand is signalled by sending a cumulative demand message ("requesting messages until the n-th sequence number)
# Using a cumulative demand model allows us to re-deliver the demand message in case of message loss (which should
# be very rare in any case, yet possible -- mostly under connection break-down and re-establishment).
#
# The semantics of handling and updating the demand however are in-line with what Reactive Streams dictates.
#
# In normal operation, demand is signalled in response to arriving elements, however if no new elements arrive
# within `demand-redelivery-interval` a re-delivery of the demand will be triggered, assuming that it may have gotten lost.
demand-redelivery-interval = 1 second
# Subscription timeout, during which the "remote side" MUST subscribe (materialize) the handed out stream ref.
# This timeout does not have to be very low in normal situations, since the remote side may also need to
# prepare things before it is ready to materialize the reference. However the timeout is needed to avoid leaking
# in-active streams which are never subscribed to.
subscription-timeout = 30 seconds
# In order to guard the receiving end of a stream ref from never terminating (since awaiting a Completion or Failed
# message) after / before a Terminated is seen, a special timeout is applied once Terminated is received by it.
# This allows us to terminate stream refs that have been targeted to other nodes which are Downed, and as such the
# other side of the stream ref would never send the "final" terminal message.
#
# The timeout specifically means the time between the Terminated signal being received and when the local SourceRef
# determines to fail itself, assuming there was message loss or a complete partition of the completion signal.
final-termination-signal-deadline = 2 seconds
}
}
# Deprecated, left here to not break Akka HTTP which refers to it
blocking-io-dispatcher = "akka.actor.default-blocking-io-dispatcher"
# Deprecated, will not be used unless user code refer to it, use 'akka.stream.materializer.blocking-io-dispatcher'
# instead, or if from code, prefer the 'ActorAttributes.IODispatcher' attribute
default-blocking-io-dispatcher = "akka.actor.default-blocking-io-dispatcher"
}
# configure overrides to ssl-configuration here (to be used by akka-streams, and akka-http – i.e. when serving https connections)
ssl-config {
protocol = "TLSv1"
}
actor {
serializers {
akka-stream-ref = "Akka.Streams.Serialization.StreamRefSerializer, Akka.Streams"
}
serialization-bindings {
"Akka.Streams.Implementation.StreamRef.SinkRefImpl, Akka.Streams" = akka-stream-ref
"Akka.Streams.Implementation.StreamRef.SourceRefImpl, Akka.Streams" = akka-stream-ref
"Akka.Streams.Implementation.StreamRef.IStreamRefsProtocol, Akka.Streams" = akka-stream-ref
}
serialization-identifiers {
"Akka.Streams.Serialization.StreamRefSerializer, Akka.Streams" = 30
}
}
}
# Properties for akka.kafka.ConsumerSettings can be
# defined in this section or a configuration section with
# the same layout.
akka.kafka.consumer {
# Config path of Akka Discovery method
# "akka.discovery" to use the Akka Discovery method configured for the ActorSystem
discovery-method = akka.discovery
# Set a service name for use with Akka Discovery
# https://doc.akka.io/docs/alpakka-kafka/current/discovery.html
service-name = ""
# Timeout for getting a reply from the discovery-method lookup
resolve-timeout = 3 seconds
# Tuning property of scheduled polls.
# Controls the interval from one scheduled poll to the next.
poll-interval = 500ms
# Tuning property of the `KafkaConsumer.poll` parameter.
# Note that non-zero value means that the thread that
# is executing the stage will be blocked. See also the `wakup-timeout` setting below.
poll-timeout = 500ms
# The stage will delay stopping the internal actor to allow processing of
# messages already in the stream (required for successful committing).
# This can be set to 0 for streams using `DrainingControl`.
stop-timeout = 30s
# Duration to wait for `KafkaConsumer.close` to finish.
close-timeout = 20s
# If offset commit requests are not completed within this timeout
# the returned Future is completed `CommitTimeoutException`.
# The `Transactional.source` waits this ammount of time for the producer to mark messages as not
# being in flight anymore as well as waiting for messages to drain, when rebalance is triggered.
commit-timeout = 15s
# If commits take longer than this time a warning is logged
commit-time-warning = 1s
# Not relevant for Kafka after version 2.1.0.
# If set to a finite duration, the consumer will re-send the last committed offsets periodically
# for all assigned partitions. See https://issues.apache.org/jira/browse/KAFKA-4682.
commit-refresh-interval = infinite
# Fully qualified config path which holds the dispatcher configuration
# to be used by the KafkaConsumerActor. Some blocking may occur.
use-dispatcher = "akka.kafka.default-dispatcher"
# Properties defined by org.apache.kafka.clients.consumer.ConsumerConfig
# can be defined in this configuration section.
kafka-clients {
# Disable auto-commit by default
enable.auto.commit = false
}
# Time to wait for pending requests when a partition is closed
wait-close-partition = 500ms
# Limits the query to Kafka for a topic's position
position-timeout = 5s
# When using `AssignmentOffsetsForTimes` subscriptions: timeout for the
# call to Kafka's API
offset-for-times-timeout = 5s
# Timeout for akka.kafka.Metadata requests
# This value is used instead of Kafka's default from `default.api.timeout.ms`
# which is 1 minute.
metadata-request-timeout = 5s
# Interval for checking that transaction was completed before closing the consumer.
# Used in the transactional flow for exactly-once-semantics processing.
eos-draining-check-interval = 30ms
# Issue warnings when a call to a partition assignment handler method takes
# longer than this.
partition-handler-warning = 5s
# Settings for checking the connection to the Kafka broker. Connection checking uses `listTopics` requests with the timeout
# configured by `consumer.metadata-request-timeout`
connection-checker {
#Flag to turn on connection checker
enable = false
# Amount of attempts to be performed after a first connection failure occurs
# Required, non-negative integer
max-retries = 3
# Interval for the connection check. Used as the base for exponential retry.
check-interval = 15s
# Check interval multiplier for backoff interval
# Required, positive number
backoff-factor = 2.0
}
# Protect against server-side bugs that cause Kafka to temporarily "lose" the latest offset for a consumer, which
# then causes the Kafka consumer to follow its normal 'auto.offset.reset' behavior. For 'earliest', these settings
# allow the client to detect and attempt to recover from this issue. For 'none' and 'latest', these settings will
# only add overhead. See
# for more information
offset-reset-protection {
# turns on reset protection
enable = false
# if consumer gets a record with an offset that is more than this number of offsets back from the previously
# requested offset, it is considered a reset
offset-threshold = 9223372036854775807
# if the record is more than this duration earlier the last received record, it is considered a reset
time-threshold = 100000 days
}
}
# The dispatcher that will be used by default by consumer and
# producer stages.
akka.kafka.default-dispatcher {
type = "Dispatcher"
executor = "default-executor"
}
# Committer flows use this settings to make batch commits
akka.kafka.committer {
# Set maximum number of messages to commit at once
max-batch = 100000
# Set maximum interval between commits
max-interval = 10s
# Set parallelism for async committing
parallelism = 16
}
FWIW, unrelated to this, but came up during my investigation: .WithAttributes(ActorAttributes.CreateSupervisionStrategy(decider)) does not help restart failed Kafka Source stages when a downstream fails, which seems off to me. Going to file a separate bug for it.
Version Information
Describe the bug
Re-creating SubSource for partition using the
KafkaConsumer.CommittablePartitionedSource
in parallel.Such errors only occur if there is a slow stage in the pipeline.
Also, such errors lead to data loss.
To Reproduce
Steps to reproduce the behavior:
Configuration
Prepare code
Flow
Logs
Logs
Expected behavior
Successfully consumer data from kafka partitions in parallel without warning and data loss
Actual behavior
Re-creating SubSource for partition and data loss.
Environment
Windows, .NET5.0
Additional context
Please help to configure Akka Streams for parallel reading data from partitions without warnings and data loss.
The text was updated successfully, but these errors were encountered: