diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index 151a0fbbd8876..b23c80d4ef809 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -103,6 +103,7 @@ private enum CloseMode { } private final Logger log; + private final java.nio.channels.spi.SelectorProvider selectorProvider; private final java.nio.channels.Selector nioSelector; private final Map channels; private final Set explicitlyMutedChannels; @@ -157,7 +158,9 @@ public Selector(int maxReceiveSize, MemoryPool memoryPool, LogContext logContext) { try { - this.nioSelector = java.nio.channels.Selector.open(); + this.selectorProvider = SelectorProvider.provider(); + this.nioSelector = selectorProvider.openSelector(); + } catch (IOException e) { throw new KafkaException(e); } @@ -249,7 +252,7 @@ public Selector(long connectionMaxIdleMS, int failedAuthenticationDelayMs, Metri @Override public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException { ensureNotRegistered(id); - SocketChannel socketChannel = SocketChannel.open(); + SocketChannel socketChannel = selectorProvider.openSocketChannel(); SelectionKey key = null; try { configureSocketChannel(socketChannel, sendBufferSize, receiveBufferSize); diff --git a/clients/src/main/java/org/apache/kafka/common/network/SelectorProvider.java b/clients/src/main/java/org/apache/kafka/common/network/SelectorProvider.java new file mode 100644 index 0000000000000..2e85dbab3b3ea --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/network/SelectorProvider.java @@ -0,0 +1,28 @@ +package org.apache.kafka.common.network; + + +public class SelectorProvider +{ + private static ThreadLocal selectorProviderHolder = new ThreadLocal<>(); + + + public static void set(java.nio.channels.spi.SelectorProvider selectorProvider) + { + selectorProviderHolder.set(selectorProvider); + } + + + public static java.nio.channels.spi.SelectorProvider provider() + { + java.nio.channels.spi.SelectorProvider selectorProvider = selectorProviderHolder.get(); + + if (selectorProvider != null) + { + return selectorProvider; + } + else + { + return java.nio.channels.spi.SelectorProvider.provider(); + } + } +} \ No newline at end of file