Skip to content
This repository has been archived by the owner on Aug 23, 2020. It is now read-only.

Fix: resolve race in networking layer assigning write operation to key #1865

Merged
merged 3 commits into from
May 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions src/main/java/com/iota/iri/network/NeighborRouterImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -355,8 +355,7 @@ private boolean handleConnect(SocketChannel channel, SelectionKey key, String id
private boolean handleWrite(SocketChannel channel, SelectionKey key, String identity, Neighbor neighbor) {
try {
switch (neighbor.write()) {
case 0:
break;
// something bad happened
case -1:
if (neighbor.getState() == NeighborState.HANDSHAKING) {
log.info("closing connection to {} as handshake packet couldn't be written", identity);
Expand All @@ -365,8 +364,14 @@ private boolean handleWrite(SocketChannel channel, SelectionKey key, String iden
closeNeighborConnection(channel, identity, selector);
}
return false;
// bytes were either written or not written to the channel
// we check whether we still have something else to send, if not we unregister write
default:
// bytes were written to the channel
synchronized (key) {
if (!neighbor.hasDataToSendTo()) {
key.interestOps(SelectionKey.OP_READ);
}
}
}
return true;
} catch (IOException ex) {
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/com/iota/iri/network/neighbor/Neighbor.java
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,11 @@ public interface Neighbor {
*/
int getProtocolVersion();

/**
* Checks if we have data (transactions) to send to the neighbor
*
* @return {@code true} if we have data to send, else returns {@code false}
*/
boolean hasDataToSendTo();

}
17 changes: 17 additions & 0 deletions src/main/java/com/iota/iri/network/neighbor/impl/NeighborImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,21 @@ private int writeMsg() throws IOException {

@Override
public void send(ByteBuffer buf) {
// first fill sendQueue to signal other threads that we have something ready to write
if (!sendQueue.offer(buf)) {
metrics.incrDroppedSendPacketsCount();
}

// re-register write interest
SelectionKey key = channel.keyFor(selector);
if (key != null) {
synchronized (key) {
if (key.isValid() && (key.interestOps() & SelectionKey.OP_WRITE) == 0) {
key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
selector.wakeup();
}
}
}
}

@Override
Expand Down Expand Up @@ -264,4 +276,9 @@ public int getProtocolVersion() {
return protocolVersion;
}

@Override
public boolean hasDataToSendTo() {
return currentToWrite != null || !sendQueue.isEmpty();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,42 @@ public void writeWithNoMessageInTheSendQueueReturnsZero() {
}
}

@Test
public void aSendWhileWriteInterestIsDisabledActivatesItAgain() {
SelectionKey fakeSelectionKey = new FakeSelectionKey() {

private int ops = SelectionKey.OP_READ;

@Override
public boolean isValid() {
return true;
}

@Override
public int interestOps() {
return ops;
}

@Override
public SelectionKey interestOps(int ops) {
this.ops = ops;
return null;
}
};
Neighbor neighbor = new NeighborImpl<>(selector, new FakeChannel() {

@Override
public SelectionKey keyFor(Selector sel) {
return fakeSelectionKey;
}
}, localAddr, serverSocketPort, pipeline);
neighbor.send(createEmptyTxPacket());

Mockito.verify(selector).wakeup();
assertEquals("should be interested in read and write readiness", SelectionKey.OP_READ | SelectionKey.OP_WRITE,
fakeSelectionKey.interestOps());
}

@Test
public void markingTheNeighborForDisconnectWillNeverMakeItReadyForMessagesAgain() {
Neighbor neighbor = new NeighborImpl<>(selector, null, localAddr, serverSocketPort, pipeline);
Expand Down