Skip to content

Commit

Permalink
Remove manual tracking of registered channels (#27445)
Browse files Browse the repository at this point in the history
This is related to #27260. Currently, every ESSelector keeps track of
all channels that are registered with it. ESSelector is just an
abstraction over a raw java nio selector. The java nio selector already
tracks its own selection keys. This commit removes our tracking and
relies on the java nio selector tracking.
  • Loading branch information
Tim-Brooks committed Nov 17, 2017
1 parent fbf1344 commit 085669f
Show file tree
Hide file tree
Showing 7 changed files with 16 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,9 @@
import org.elasticsearch.transport.nio.channel.NioServerSocketChannel;

import java.io.IOException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
Expand Down Expand Up @@ -93,7 +89,6 @@ private void setUpNewServerChannels() {
newChannel.register();
SelectionKey selectionKey = newChannel.getSelectionKey();
selectionKey.attach(newChannel);
addRegisteredChannel(newChannel);
eventHandler.serverChannelRegistered(newChannel);
} else {
eventHandler.registrationException(newChannel, new ClosedChannelException());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,13 @@
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Collections;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

/**
* This is a basic selector abstraction used by {@link org.elasticsearch.transport.nio.NioTransport}. This
Expand All @@ -56,7 +55,6 @@ public abstract class ESSelector implements Closeable {
private final CountDownLatch exitedLoop = new CountDownLatch(1);
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final PlainActionFuture<Boolean> isRunningFuture = PlainActionFuture.newFuture();
private final Set<NioChannel> registeredChannels = Collections.newSetFromMap(new ConcurrentHashMap<NioChannel, Boolean>());
private volatile Thread thread;

ESSelector(EventHandler eventHandler) throws IOException {
Expand Down Expand Up @@ -134,7 +132,7 @@ void singleLoop() {

void cleanupAndCloseChannels() {
cleanup();
channelsToClose.addAll(registeredChannels);
channelsToClose.addAll(selector.keys().stream().map(sk -> (NioChannel) sk.attachment()).collect(Collectors.toList()));
closePendingChannels();
}

Expand Down Expand Up @@ -171,19 +169,6 @@ void wakeup() {
selector.wakeup();
}

public Set<NioChannel> getRegisteredChannels() {
return registeredChannels;
}

public void addRegisteredChannel(NioChannel channel) {
assert registeredChannels.contains(channel) == false : "Should only register channel once";
registeredChannels.add(channel);
}

public void removeRegisteredChannel(NioChannel channel) {
registeredChannels.remove(channel);
}

@Override
public void close() throws IOException {
if (isClosed.compareAndSet(false, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ private void setupChannel(NioSocketChannel newChannel) {
try {
if (newChannel.isOpen()) {
newChannel.register();
addRegisteredChannel(newChannel);
SelectionKey key = newChannel.getSelectionKey();
key.attach(newChannel);
eventHandler.handleRegistration(newChannel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,6 @@ public void closeFromSelector() throws IOException {
} catch (IOException e) {
closeContext.completeExceptionally(e);
throw e;
} finally {
// There is no problem with calling this multiple times
selector.removeRegisteredChannel(this);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.elasticsearch.transport.nio;

import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.nio.channel.NioChannel;
import org.elasticsearch.transport.nio.channel.NioServerSocketChannel;
import org.elasticsearch.transport.nio.utils.TestSelectionKey;
import org.junit.Before;
Expand All @@ -30,8 +29,8 @@
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.security.PrivilegedActionException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

import static org.mockito.Matchers.any;
import static org.mockito.Matchers.same;
Expand All @@ -46,6 +45,7 @@ public class AcceptingSelectorTests extends ESTestCase {
private NioServerSocketChannel serverChannel;
private AcceptorEventHandler eventHandler;
private TestSelectionKey selectionKey;
private Selector rawSelector;

@Before
public void setUp() throws Exception {
Expand All @@ -54,7 +54,7 @@ public void setUp() throws Exception {
eventHandler = mock(AcceptorEventHandler.class);
serverChannel = mock(NioServerSocketChannel.class);

Selector rawSelector = mock(Selector.class);
rawSelector = mock(Selector.class);
selector = new AcceptingSelector(eventHandler, rawSelector);
this.selector.setThread();

Expand All @@ -71,9 +71,6 @@ public void testRegisteredChannel() throws IOException, PrivilegedActionExceptio
selector.preSelect();

verify(eventHandler).serverChannelRegistered(serverChannel);
Set<NioChannel> registeredChannels = selector.getRegisteredChannels();
assertEquals(1, registeredChannels.size());
assertTrue(registeredChannels.contains(serverChannel));
}

public void testClosedChannelWillNotBeRegistered() throws Exception {
Expand All @@ -83,10 +80,6 @@ public void testClosedChannelWillNotBeRegistered() throws Exception {
selector.preSelect();

verify(eventHandler).registrationException(same(serverChannel), any(ClosedChannelException.class));

Set<NioChannel> registeredChannels = selector.getRegisteredChannels();
assertEquals(0, registeredChannels.size());
assertFalse(registeredChannels.contains(serverChannel));
}

public void testRegisterChannelFailsDueToException() throws Exception {
Expand All @@ -98,10 +91,6 @@ public void testRegisterChannelFailsDueToException() throws Exception {
selector.preSelect();

verify(eventHandler).registrationException(serverChannel, closedChannelException);

Set<NioChannel> registeredChannels = selector.getRegisteredChannels();
assertEquals(0, registeredChannels.size());
assertFalse(registeredChannels.contains(serverChannel));
}

public void testAcceptEvent() throws IOException {
Expand All @@ -128,7 +117,9 @@ public void testCleanup() throws IOException {

selector.preSelect();

assertEquals(1, selector.getRegisteredChannels().size());
TestSelectionKey key = new TestSelectionKey(0);
key.attach(serverChannel);
when(rawSelector.keys()).thenReturn(new HashSet<>(Collections.singletonList(key)));

selector.cleanupAndCloseChannels();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,12 @@ public void setUp() throws Exception {
public void testQueueChannelForClosed() throws IOException {
NioChannel channel = mock(NioChannel.class);
when(channel.getSelector()).thenReturn(selector);
selector.addRegisteredChannel(channel);

selector.queueChannelClose(channel);

assertEquals(1, selector.getRegisteredChannels().size());

selector.singleLoop();

verify(handler).handleClose(channel);
// Will be called in the channel close method
selector.removeRegisteredChannel(channel);

assertEquals(0, selector.getRegisteredChannels().size());
}

public void testSelectorClosedExceptionIsNotCaughtWhileRunning() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Set;
import java.util.Collections;
import java.util.HashSet;

import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
Expand All @@ -54,6 +55,7 @@ public class SocketSelectorTests extends ESTestCase {
private WriteContext writeContext;
private ActionListener<NioChannel> listener;
private NetworkBytesReference bufferReference = NetworkBytesReference.wrap(new BytesArray(new byte[1]));
private Selector rawSelector;

@Before
@SuppressWarnings("unchecked")
Expand All @@ -65,7 +67,7 @@ public void setUp() throws Exception {
listener = mock(ActionListener.class);
selectionKey = new TestSelectionKey(0);
selectionKey.attach(channel);
Selector rawSelector = mock(Selector.class);
rawSelector = mock(Selector.class);

this.socketSelector = new SocketSelector(eventHandler, rawSelector);
this.socketSelector.setThread();
Expand All @@ -83,10 +85,6 @@ public void testRegisterChannel() throws Exception {
socketSelector.preSelect();

verify(eventHandler).handleRegistration(channel);

Set<NioChannel> registeredChannels = socketSelector.getRegisteredChannels();
assertEquals(1, registeredChannels.size());
assertTrue(registeredChannels.contains(channel));
}

public void testClosedChannelWillNotBeRegistered() throws Exception {
Expand All @@ -97,10 +95,6 @@ public void testClosedChannelWillNotBeRegistered() throws Exception {

verify(eventHandler).registrationException(same(channel), any(ClosedChannelException.class));
verify(channel, times(0)).finishConnect();

Set<NioChannel> registeredChannels = socketSelector.getRegisteredChannels();
assertEquals(0, registeredChannels.size());
assertFalse(registeredChannels.contains(channel));
}

public void testRegisterChannelFailsDueToException() throws Exception {
Expand All @@ -113,10 +107,6 @@ public void testRegisterChannelFailsDueToException() throws Exception {

verify(eventHandler).registrationException(channel, closedChannelException);
verify(channel, times(0)).finishConnect();

Set<NioChannel> registeredChannels = socketSelector.getRegisteredChannels();
assertEquals(0, registeredChannels.size());
assertFalse(registeredChannels.contains(channel));
}

public void testSuccessfullyRegisterChannelWillConnect() throws Exception {
Expand Down Expand Up @@ -309,6 +299,10 @@ public void testCleanup() throws Exception {
socketSelector.queueWrite(new WriteOperation(mock(NioSocketChannel.class), networkBuffer, listener));
socketSelector.scheduleForRegistration(unRegisteredChannel);

TestSelectionKey testSelectionKey = new TestSelectionKey(0);
testSelectionKey.attach(channel);
when(rawSelector.keys()).thenReturn(new HashSet<>(Collections.singletonList(testSelectionKey)));

socketSelector.cleanupAndCloseChannels();

verify(listener).onFailure(any(ClosedSelectorException.class));
Expand Down

0 comments on commit 085669f

Please sign in to comment.