Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka unavailability blocks the event loop #480

Closed
cristianrgreco opened this issue Feb 4, 2022 · 6 comments · Fixed by #955
Closed

Kafka unavailability blocks the event loop #480

cristianrgreco opened this issue Feb 4, 2022 · 6 comments · Fixed by #955
Assignees

Comments

@cristianrgreco
Copy link

cristianrgreco commented Feb 4, 2022

Expected Behavior

According to the documentation, defining a KafkaClient as follows:

@KafkaClient
public interface Publisher {

    @Topic("topic")
    Single<String> publish(@KafkaKey String key, @MessageBody Single<String> payload);
}

Should not block, even if Kafka is unavailable.

The documentation also states:

You can return a Future or Publisher to support non-blocking message delivery.

And we have tried this as well with no difference.

Actual Behaviour

When we try to publish when Kafka is unavailable, the event loop gets blocked, for up to the configured max-block (default 60s). We can confirm via a thread dump that all event-loop threads are TIMED_WAITING with the following:

"default-nioEventLoopGroup-2-2" #37 prio=5 os_prio=0 cpu=288.15ms elapsed=55.87s tid=0x00007f3ca405f000 nid=0x35 in Object.wait()  [0x00007f3cbabd2000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
	at java.lang.Object.wait(java.base@11.0.11/Native Method)
	- waiting on <no object reference available>
	at org.apache.kafka.common.utils.SystemTime.waitObject(SystemTime.java:55)
	- waiting to re-lock in wait() <0x00000000f0563020> (a org.apache.kafka.clients.producer.internals.ProducerMetadata)
	at org.apache.kafka.clients.producer.internals.ProducerMetadata.awaitUpdate(ProducerMetadata.java:119)
	- waiting to re-lock in wait() <0x00000000f0563020> (a org.apache.kafka.clients.producer.internals.ProducerMetadata)
	at org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:1047)
	at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:906)
	at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885)
	at io.micronaut.configuration.kafka.intercept.KafkaClientIntroductionAdvice.lambda$buildSendFlowable$7(KafkaClientIntroductionAdvice.java:492)
	at io.micronaut.configuration.kafka.intercept.KafkaClientIntroductionAdvice$$Lambda$1434/0x0000000100f77840.subscribe(Unknown Source)
  ...

Looking into the Apache Kafka client internals, we see that both awaitUpdate and waitOnMetadata are blocking. The result is the application is unresponsive.

Steps To Reproduce

  1. Connectivity to Kafka brokers times out
  2. Publish enough events via the KafkaClient to consume all threads in the event-loop group.

Environment Information

OS:
CentOS Linux 7 (Core)

JDK:
openjdk version "11.0.11" 2021-04-20
OpenJDK Runtime Environment AdoptOpenJDK-11.0.11+9 (build 11.0.11+9)
OpenJDK 64-Bit Server VM AdoptOpenJDK-11.0.11+9 (build 11.0.11+9, mixed mode)

Micronaut Kafka modle:
io.micronaut.kafka:micronaut-kafka:3.3.3

Producer config:

acks = 1
batch.size = 16384
bootstrap.servers = [PLAINTEXT://localhost:55166]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = producer-1
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
internal.auto.downgrade.txn.commit = false
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer

Example Application

No response

Version

2.5.13

@mnylen
Copy link

mnylen commented Mar 25, 2022

@cristianrgreco Stumbled upon this while investigating similar issue. Have you tried configuring the max.block.ms to a lower value (by default it's set to 1 minute)? I don't know if this applies here, but at least in the "official" Kafka Java client, calling KafkaProducer#send can block for max.block.ms even when using the asynchronous invocation and passing a callback. Might be a similar thing here (and seeing from your producer config, the max.block.ms is indeed set to 60 seconds). From the KafkaProducer Javadocs:

The buffer.memory controls the total amount of memory available to the producer for buffering. If records are sent faster than they can be transmitted to the server then this buffer space will be exhausted. When the buffer space is exhausted additional send calls will block. The threshold for time to block is determined by max.block.ms after which it throws a TimeoutException.

@graemerocher
Copy link
Contributor

Kind of weird that the Kafka client supports async methods but they block

@cristianrgreco
Copy link
Author

Have you tried configuring the max.block.ms to a lower value (by default it's set to 1 minute)?

Yes we ended up setting it to 3s. The result was the application wasn't completely unresponsive, but there's still a huge performance degradation. Our app receives a large, constant stream of messages that need to be published to Kafka, as well as handling online traffic, so when there are just a couple of threads in the event-loop, the 3s block is still a killer.

@mnylen
Copy link

mnylen commented Mar 25, 2022

@cristianrgreco I ended up "fixing" this by sending KafkaProducer#send calls as tasks to a ThreadPoolExecutorService with a bounded LinkedBlockingQueue. The ExecutorService#submit doesn't block, but of course can throw RejectedExecutionException if the ExecutorService's work queue is full.

Now at least the blocking happens outside the event loop. We had a call to KafkaProducer#send in netty event loop, and basically after a few calls when the broker was down the whole event loop was blocked.

Btw, you can of course always adjust the buffer.memory setting, which I think defaults to 32 MB. Then the only concern is if all the brokers go offline.

@graemerocher
Copy link
Contributor

Sounds like Kafka operations always have to be offloaded from the event loop. You can do this globally with:

micronaut:
  server:
    thread-selection: IO
  executors:
    io:
      type: fixed
      nThreads: 75 # or whatever thread pool size works best for your app

See https://docs.micronaut.io/latest/guide/configurationreference.html#io.micronaut.http.server.HttpServerConfiguration

and https://docs.micronaut.io/latest/guide/#threadPools

@gaode103
Copy link

gaode103 commented Mar 25, 2022

@cristianrgreco I ended up "fixing" this by sending KafkaProducer#send calls as tasks to a ThreadPoolExecutorService with a bounded LinkedBlockingQueue. The ExecutorService#submit doesn't block, but of course can throw RejectedExecutionException if the ExecutorService's work queue is full.

Now at least the blocking happens outside the event loop. We had a call to KafkaProducer#send in netty event loop, and basically after a few calls when the broker was down the whole event loop was blocked.

Btw, you can of course always adjust the buffer.memory setting, which I think defaults to 32 MB. Then the only concern is if all the brokers go offline.

I (work with Cristian in the same team) did something similar that:

  1. Put a bounded LinkedBlockingQueue in front of kafka send
  2. Limit to 20ms when put task into this queue from event loop.
  3. Reduced buffer.memory size to 320kB.
  4. Reduce max.block.ms to 3 seconds.
  5. Reduce delivery.timeout.ms to 31 seconds.
  6. no retry in kafka (retires=0)
  7. Offload from kafka producer thread immediately after kafka Callback, run micronaut logic in a separated thread pool.
  8. Of course circuit breaker on the kafka send method.

This solution kind of works but not ideal, as:

  1. The circuit breaker still needs to wait for (3 + 31) * times of retry to open in the worst case scenario. Also point 2 can help with failing some requests quicker when buffer + bounded queue are filled up.
  2. This is far from using kafka 'out of the box' in micronaut.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: Done
Development

Successfully merging a pull request may close this issue.

6 participants