Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add NioGroup for use in different transports #27737

Merged
merged 14 commits into from
Dec 15, 2017
Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.transport.nio;

import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.transport.nio.channel.ChannelFactory;
import org.elasticsearch.transport.nio.channel.NioServerSocketChannel;
import org.elasticsearch.transport.nio.channel.NioSocketChannel;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* The NioGroup is a group of selectors for interfacing with java nio. When it is started it will create the
* configured number of socket and acceptor selectors. Each selector will be running in a dedicated thread.
* Server connections can be bound using the {@link #bindServerChannel(InetSocketAddress, ChannelFactory)}
* method. Client connections can be opened using the {@link #openChannel(InetSocketAddress, ChannelFactory)}
* method.
* <p>
* The logic specific to a particular channel is provided by the {@link ChannelFactory} passed to the method
* when the channel is created. This is what allows an NioGroup to support different channel types.
*/
public class NioGroup implements AutoCloseable {


private final ArrayList<AcceptingSelector> acceptors;
private final RoundRobinSupplier<AcceptingSelector> acceptorSupplier;

private final ArrayList<SocketSelector> socketSelectors;
private final RoundRobinSupplier<SocketSelector> socketSelectorSupplier;

private final AtomicBoolean isOpen = new AtomicBoolean(true);

public NioGroup(Logger logger, ThreadFactory acceptorThreadFactory, int acceptorCount,
BiFunction<Logger, Supplier<SocketSelector>, AcceptorEventHandler> acceptorEventHandlerFunction,
ThreadFactory socketSelectorThreadFactory, int socketSelectorCount,
Function<Logger, SocketEventHandler> socketEventHandlerFunction) throws IOException {
acceptors = new ArrayList<>(acceptorCount);
socketSelectors = new ArrayList<>(socketSelectorCount);

try {
for (int i = 0; i < socketSelectorCount; ++i) {
SocketSelector selector = new SocketSelector(socketEventHandlerFunction.apply(logger));
socketSelectors.add(selector);
}
startSelectors(socketSelectors, socketSelectorThreadFactory);

for (int i = 0; i < acceptorCount; ++i) {
SocketSelector[] childSelectors = this.socketSelectors.toArray(new SocketSelector[this.socketSelectors.size()]);
Supplier<SocketSelector> selectorSupplier = new RoundRobinSupplier<>(childSelectors);
AcceptingSelector acceptor = new AcceptingSelector(acceptorEventHandlerFunction.apply(logger, selectorSupplier));
acceptors.add(acceptor);
}
startSelectors(acceptors, acceptorThreadFactory);
} catch (Exception e) {
try {
close();
} catch (Exception e1) {
e.addSuppressed(e1);
}
throw e;
}

socketSelectorSupplier = new RoundRobinSupplier<>(socketSelectors.toArray(new SocketSelector[socketSelectors.size()]));
acceptorSupplier = new RoundRobinSupplier<>(acceptors.toArray(new AcceptingSelector[acceptors.size()]));
}

public <S extends NioServerSocketChannel> S bindServerChannel(InetSocketAddress address, ChannelFactory<S, ?> factory)
throws IOException {
ensureOpen();
if (acceptors.isEmpty()) {
throw new IllegalArgumentException("There are no acceptors configured. Without acceptors, server channels are not supported.");
}
return factory.openNioServerSocketChannel(address, acceptorSupplier.get());
}

public <S extends NioSocketChannel> S openChannel(InetSocketAddress address, ChannelFactory<?, S> factory) throws IOException {
ensureOpen();
return factory.openNioChannel(address, socketSelectorSupplier.get());
}

@Override
public void close() throws IOException {
if (isOpen.compareAndSet(true, false)) {
IOUtils.close(Stream.concat(acceptors.stream(), socketSelectors.stream()).collect(Collectors.toList()));
}
}

private static <S extends ESSelector> void startSelectors(Iterable<S> selectors, ThreadFactory threadFactory) {
for (ESSelector acceptor : selectors) {
if (acceptor.isRunning() == false) {
threadFactory.newThread(acceptor::runLoop).start();
acceptor.isRunningFuture().actionGet();
}
}
}

private void ensureOpen() {
if (isOpen.get() == false) {
throw new IllegalStateException("NioGroup is closed.");
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.transport.nio;

import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -46,9 +47,7 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadFactory;
import java.util.function.Consumer;
import java.util.function.Supplier;

Expand All @@ -71,11 +70,8 @@ public class NioTransport extends TcpTransport {

private final PageCacheRecycler pageCacheRecycler;
private final ConcurrentMap<String, TcpChannelFactory> profileToChannelFactory = newConcurrentMap();
private final ArrayList<AcceptingSelector> acceptors = new ArrayList<>();
private final ArrayList<SocketSelector> socketSelectors = new ArrayList<>();
private RoundRobinSelectorSupplier clientSelectorSupplier;
private TcpChannelFactory clientChannelFactory;
private int acceptorNumber;
private volatile NioGroup nioGroup;
private volatile TcpChannelFactory clientChannelFactory;

public NioTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays,
PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry,
Expand All @@ -87,14 +83,13 @@ public NioTransport(Settings settings, ThreadPool threadPool, NetworkService net
@Override
protected TcpNioServerSocketChannel bind(String name, InetSocketAddress address) throws IOException {
TcpChannelFactory channelFactory = this.profileToChannelFactory.get(name);
AcceptingSelector selector = acceptors.get(++acceptorNumber % NioTransport.NIO_ACCEPTOR_COUNT.get(settings));
return channelFactory.openNioServerSocketChannel(address, selector);
return nioGroup.bindServerChannel(address, channelFactory);
}

@Override
protected TcpNioSocketChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener<Void> connectListener)
throws IOException {
TcpNioSocketChannel channel = clientChannelFactory.openNioChannel(node.getAddress().address(), clientSelectorSupplier.get());
TcpNioSocketChannel channel = nioGroup.openChannel(node.getAddress().address(), clientChannelFactory);
channel.addConnectListener(connectListener);
return channel;
}
Expand All @@ -103,42 +98,19 @@ protected TcpNioSocketChannel initiateChannel(DiscoveryNode node, TimeValue conn
protected void doStart() {
boolean success = false;
try {
int workerCount = NioTransport.NIO_WORKER_COUNT.get(settings);
for (int i = 0; i < workerCount; ++i) {
SocketSelector selector = new SocketSelector(getSocketEventHandler());
socketSelectors.add(selector);
int acceptorCount = 0;
boolean useNetworkServer = NetworkService.NETWORK_SERVER.get(settings);
if (useNetworkServer) {
acceptorCount = NioTransport.NIO_ACCEPTOR_COUNT.get(settings);
}
nioGroup = new NioGroup(logger, daemonThreadFactory(this.settings, TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX), acceptorCount,
AcceptorEventHandler::new, daemonThreadFactory(this.settings, TRANSPORT_WORKER_THREAD_NAME_PREFIX),
NioTransport.NIO_WORKER_COUNT.get(settings), this::getSocketEventHandler);

for (SocketSelector selector : socketSelectors) {
if (selector.isRunning() == false) {
ThreadFactory threadFactory = daemonThreadFactory(this.settings, TRANSPORT_WORKER_THREAD_NAME_PREFIX);
threadFactory.newThread(selector::runLoop).start();
selector.isRunningFuture().actionGet();
}
}

Consumer<NioSocketChannel> clientContextSetter = getContextSetter("client-socket");
clientSelectorSupplier = new RoundRobinSelectorSupplier(socketSelectors);
ProfileSettings clientProfileSettings = new ProfileSettings(settings, "default");
clientChannelFactory = new TcpChannelFactory(clientProfileSettings, clientContextSetter, getServerContextSetter());

if (NetworkService.NETWORK_SERVER.get(settings)) {
int acceptorCount = NioTransport.NIO_ACCEPTOR_COUNT.get(settings);
for (int i = 0; i < acceptorCount; ++i) {
Supplier<SocketSelector> selectorSupplier = new RoundRobinSelectorSupplier(socketSelectors);
AcceptorEventHandler eventHandler = new AcceptorEventHandler(logger, selectorSupplier);
AcceptingSelector acceptor = new AcceptingSelector(eventHandler);
acceptors.add(acceptor);
}

for (AcceptingSelector acceptor : acceptors) {
if (acceptor.isRunning() == false) {
ThreadFactory threadFactory = daemonThreadFactory(this.settings, TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX);
threadFactory.newThread(acceptor::runLoop).start();
acceptor.isRunningFuture().actionGet();
}
}
clientChannelFactory = new TcpChannelFactory(clientProfileSettings, getContextSetter("client"), getServerContextSetter());

if (useNetworkServer) {
// loop through all profiles and start them up, special handling for default one
for (ProfileSettings profileSettings : profileSettings) {
String profileName = profileSettings.profileName;
Expand All @@ -162,14 +134,15 @@ protected void doStart() {

@Override
protected void stopInternal() {
NioShutdown nioShutdown = new NioShutdown(logger);
nioShutdown.orderlyShutdown(acceptors, socketSelectors);

try {
nioGroup.close();
} catch (Exception e) {
logger.warn("unexpected exception while stopping nio group", e);
}
profileToChannelFactory.clear();
socketSelectors.clear();
}

protected SocketEventHandler getSocketEventHandler() {
protected SocketEventHandler getSocketEventHandler(Logger logger) {
return new SocketEventHandler(logger);
}

Expand All @@ -189,8 +162,7 @@ private Consumer<NioSocketChannel> getContextSetter(String profileName) {
}

private void acceptChannel(NioSocketChannel channel) {
TcpNioSocketChannel tcpChannel = (TcpNioSocketChannel) channel;
serverAcceptedChannel(tcpChannel);
serverAcceptedChannel((TcpNioSocketChannel) channel);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,21 @@

package org.elasticsearch.transport.nio;

import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

public class RoundRobinSelectorSupplier implements Supplier<SocketSelector> {
public class RoundRobinSupplier<S> implements Supplier<S> {

private final ArrayList<SocketSelector> selectors;
private final S[] selectors;
private final int count;
private AtomicInteger counter = new AtomicInteger(0);

public RoundRobinSelectorSupplier(ArrayList<SocketSelector> selectors) {
this.count = selectors.size();
public RoundRobinSupplier(S[] selectors) {
this.count = selectors.length;
this.selectors = selectors;
}

public SocketSelector get() {
return selectors.get(counter.getAndIncrement() % count);
public S get() {
return selectors[counter.getAndIncrement() % count];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void setUpHandler() throws IOException {
acceptedChannelCallback = mock(Consumer.class);
ArrayList<SocketSelector> selectors = new ArrayList<>();
selectors.add(socketSelector);
handler = new AcceptorEventHandler(logger, new RoundRobinSelectorSupplier(selectors));
handler = new AcceptorEventHandler(logger, new RoundRobinSupplier<>(selectors.toArray(new SocketSelector[selectors.size()])));

AcceptingSelector selector = mock(AcceptingSelector.class);
channel = new DoNotRegisterServerChannel(mock(ServerSocketChannel.class), channelFactory, selector);
Expand Down
Loading