Skip to content

Commit

Permalink
[Wisp] Blocking io support for SocketChannel and Pipe
Browse files Browse the repository at this point in the history
Summary: Hook blocking entry at pipeChannel and SocketChannel,
current WispTask would block itself to wait for next scheduling.

Test Plan: jtreg BlockIngIOTest.

Reviewed-by: zhengxiaolinX, yuleil

Issue: dragonwell-project/dragonwell8#143
  • Loading branch information
joeyleeeeeee97 authored and joeylee.lz committed Nov 23, 2020
1 parent 6fd834d commit 1ae35b8
Show file tree
Hide file tree
Showing 13 changed files with 364 additions and 15 deletions.
36 changes: 36 additions & 0 deletions src/linux/classes/com/alibaba/wisp/engine/WispEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@
import sun.misc.JavaLangAccess;
import sun.misc.SharedSecrets;
import sun.misc.WispEngineAccess;
import sun.nio.ch.Net;

import java.dyn.Coroutine;
import java.dyn.CoroutineExitException;
import java.dyn.CoroutineSupport;
import java.io.IOException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -344,6 +346,40 @@ public void getCpuTime(long[] ids, long[] times) {
}
}
}

@Override
public int poll(SelectableChannel channel, int interestOps) throws IOException {
return poll(channel, interestOps, -1);
}

@Override
public int poll(SelectableChannel channel, int interestOps, long millsTimeOut) throws IOException {
assert interestOps == Net.POLLIN || interestOps == Net.POLLCONN || interestOps == Net.POLLOUT;
WispTask task = WispCarrier.current().getCurrentTask();
if (millsTimeOut > 0) {
task.carrier.addTimer(System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(millsTimeOut),
false);
}
try {
task.carrier.registerEvent(channel, translateToSelectionKey(interestOps));
park(-1);
return millsTimeOut > 0 && task.timeOut.expired() ? 0 : 1;
} finally {
if (millsTimeOut > 0) {
task.carrier.cancelTimer();
}
unregisterEvent();
}
}

int translateToSelectionKey(int event) {
if (Net.POLLIN == event) {
return SelectionKey.OP_READ;
} else if (Net.POLLCONN == event || Net.POLLOUT == event) {
return SelectionKey.OP_WRITE;
}
return 0;
}
});
}

Expand Down
10 changes: 10 additions & 0 deletions src/macosx/classes/com/alibaba/wisp/engine/WispEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,16 @@ public StackTraceElement[] getStackTrace(WispTask task) {
public void getCpuTime(long[] ids, long[] times) {
throw new UnsupportedOperationException();
}

@Override
public int poll(SelectableChannel channel, int interestOps, long timeout) throws IOException {
throw new UnsupportedOperationException();
}

@Override
public int poll(SelectableChannel channel, int interestOps) throws IOException {
throw new UnsupportedOperationException();
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@

package java.nio.channels.spi;

import com.alibaba.wisp.engine.WispEngine;
import sun.nio.ch.IOUtil;

import java.io.FileDescriptor;
import java.io.IOException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
Expand All @@ -34,6 +38,8 @@
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;


/**
Expand Down Expand Up @@ -267,6 +273,11 @@ protected final void implCloseChannel() throws IOException {
*/
protected abstract void implCloseSelectableChannel() throws IOException;

protected void configureAsNonBlockingForWisp(FileDescriptor fd) throws IOException {
if (WispEngine.transparentWispSwitch()) {
IOUtil.configureBlocking(fd, false);
}
}

// -- Blocking --

Expand Down Expand Up @@ -296,7 +307,10 @@ public final SelectableChannel configureBlocking(boolean block)
if (block != blocking) {
if (block && haveValidKeys())
throw new IllegalBlockingModeException();
implConfigureBlocking(block);
if (!(WispEngine.transparentWispSwitch()
&& (this instanceof ServerSocketChannel || this instanceof SocketChannel))) {
implConfigureBlocking(block);
}
nonBlocking = !block;
}
}
Expand Down
5 changes: 5 additions & 0 deletions src/share/classes/sun/misc/WispEngineAccess.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,9 @@ int epollWait(int epfd, long pollArray, int arraySize, long timeout,
StackTraceElement[] getStackTrace(WispTask task);

void getCpuTime(long[] ids, long[] times);

int poll(SelectableChannel channel, int interestOps, long timeout) throws IOException;

int poll(SelectableChannel channel, int interestOps) throws IOException;

}
8 changes: 8 additions & 0 deletions src/share/classes/sun/nio/ch/IOStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,12 @@ public static boolean checkAll(long n) {
return ((n > EOF) || (n < UNSUPPORTED_CASE));
}

/**
* Returns true if the error code is UNAVAILABLE or INTERRUPTED, the
* error codes to indicate that an I/O operation can be retried.
*/
static boolean okayToRetry(long n) {
return (n == IOStatus.UNAVAILABLE) || (n == IOStatus.INTERRUPTED);
}

}
2 changes: 2 additions & 0 deletions src/share/classes/sun/nio/ch/ServerSocketAdaptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@

package sun.nio.ch;

import com.alibaba.wisp.engine.WispEngine;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
Expand Down
25 changes: 12 additions & 13 deletions src/share/classes/sun/nio/ch/ServerSocketChannelImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ class ServerSocketChannelImpl
this.fd = Net.serverSocket(true);
this.fdVal = IOUtil.fdVal(fd);
this.state = ST_INUSE;
configureAsNonBlockingForWisp(fd);
}

ServerSocketChannelImpl(SelectorProvider sp,
Expand All @@ -104,6 +105,7 @@ class ServerSocketChannelImpl
this.state = ST_INUSE;
if (bound)
localAddress = Net.localAddress(fd);
configureAsNonBlockingForWisp(fd);
}

public ServerSocket socket() {
Expand Down Expand Up @@ -245,31 +247,22 @@ public SocketChannel accept() throws IOException {
FileDescriptor newfd = new FileDescriptor();
InetSocketAddress[] isaa = new InetSocketAddress[1];

final boolean wispAndBlocking = WispEngine.transparentWispSwitch() && isBlocking() &&
WEA.usingWispEpoll();
try {
begin();
if (!isOpen())
return null;
thread = NativeThread.current();
if (wispAndBlocking) {
IOUtil.configureBlocking(fd, false);
}
for (;;) {
n = accept(this.fd, newfd, isaa);
if ((n == IOStatus.INTERRUPTED) && isOpen())
continue;
if (wispAndBlocking && n < 0) {
WEA.registerEvent(this, SelectionKey.OP_ACCEPT);
WEA.park(-1);
if (WispEngine.transparentWispSwitch() && isBlocking() && IOStatus.okayToRetry(n)) {
WEA.poll(this, Net.POLLIN);
continue;
}
break;
}
} finally {
if (wispAndBlocking) {
IOUtil.configureBlocking(fd, true);
}
thread = 0;
end(n > 0);
assert IOStatus.check(n);
Expand All @@ -278,7 +271,9 @@ public SocketChannel accept() throws IOException {
if (n < 1)
return null;

IOUtil.configureBlocking(newfd, true);
if (!WispEngine.transparentWispSwitch()) {
IOUtil.configureBlocking(newfd, true);
}
InetSocketAddress isa = isaa[0];
sc = new SocketChannelImpl(provider(), newfd, isa);
SecurityManager sm = System.getSecurityManager();
Expand Down Expand Up @@ -377,7 +372,11 @@ int poll(int events, long timeout) throws IOException {
return 0;
thread = NativeThread.current();
}
n = Net.poll(fd, events, timeout);
if (WispEngine.transparentWispSwitch()) {
n = WEA.poll(this, events, timeout);
} else {
n = Net.poll(fd, events, timeout);
}
} finally {
thread = 0;
end(n > 0);
Expand Down
2 changes: 2 additions & 0 deletions src/share/classes/sun/nio/ch/SocketAdaptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@

package sun.nio.ch;

import com.alibaba.wisp.engine.WispEngine;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand Down
38 changes: 37 additions & 1 deletion src/share/classes/sun/nio/ch/SocketChannelImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@
import java.nio.channels.*;
import java.nio.channels.spi.*;
import java.util.*;

import com.alibaba.wisp.engine.WispEngine;
import sun.misc.SharedSecrets;
import sun.misc.WispEngineAccess;
import sun.net.NetHooks;
import sun.net.ExtendedOptionsImpl;
import sun.net.ExtendedOptionsHelper;
Expand All @@ -45,6 +49,7 @@ class SocketChannelImpl
extends SocketChannel
implements SelChImpl
{
private static final WispEngineAccess WEA = SharedSecrets.getWispEngineAccess();

// Used to make native read and write calls
private static NativeDispatcher nd;
Expand Down Expand Up @@ -103,6 +108,7 @@ class SocketChannelImpl
this.fd = Net.socket(true);
this.fdVal = IOUtil.fdVal(fd);
this.state = ST_UNCONNECTED;
configureAsNonBlockingForWisp(fd);
}

SocketChannelImpl(SelectorProvider sp,
Expand All @@ -116,6 +122,7 @@ class SocketChannelImpl
this.state = ST_UNCONNECTED;
if (bound)
this.localAddress = Net.localAddress(fd);
configureAsNonBlockingForWisp(fd);
}

// Constructor for sockets obtained from server sockets
Expand All @@ -130,6 +137,7 @@ class SocketChannelImpl
this.state = ST_CONNECTED;
this.localAddress = Net.localAddress(fd);
this.remoteAddress = remote;
configureAsNonBlockingForWisp(fd);
}

public Socket socket() {
Expand Down Expand Up @@ -382,6 +390,10 @@ public int read(ByteBuffer buf) throws IOException {
// is still open, so retry
continue;
}
if (isBlocking() && WispEngine.transparentWispSwitch() && IOStatus.okayToRetry(n)) {
WEA.poll(this, Net.POLLIN);
continue;
}
return IOStatus.normalize(n);
}

Expand Down Expand Up @@ -439,6 +451,10 @@ public long read(ByteBuffer[] dsts, int offset, int length)
n = IOUtil.read(fd, dsts, offset, length, nd);
if ((n == IOStatus.INTERRUPTED) && isOpen())
continue;
if (isBlocking() && WispEngine.transparentWispSwitch() && IOStatus.okayToRetry(n)) {
WEA.poll(this, Net.POLLIN);
continue;
}
return IOStatus.normalize(n);
}
} finally {
Expand Down Expand Up @@ -470,6 +486,10 @@ public int write(ByteBuffer buf) throws IOException {
n = IOUtil.write(fd, buf, -1, nd);
if ((n == IOStatus.INTERRUPTED) && isOpen())
continue;
if (isBlocking() && WispEngine.transparentWispSwitch() && IOStatus.okayToRetry(n)) {
WEA.poll(this, Net.POLLOUT);
continue;
}
return IOStatus.normalize(n);
}
} finally {
Expand Down Expand Up @@ -503,6 +523,10 @@ public long write(ByteBuffer[] srcs, int offset, int length)
n = IOUtil.write(fd, srcs, offset, length, nd);
if ((n == IOStatus.INTERRUPTED) && isOpen())
continue;
if (isBlocking() && WispEngine.transparentWispSwitch() && IOStatus.okayToRetry(n)) {
WEA.poll(this, Net.POLLOUT);
continue;
}
return IOStatus.normalize(n);
}
} finally {
Expand Down Expand Up @@ -533,6 +557,10 @@ int sendOutOfBandData(byte b) throws IOException {
n = sendOutOfBandData(fd, b);
if ((n == IOStatus.INTERRUPTED) && isOpen())
continue;
if (isBlocking() && WispEngine.transparentWispSwitch() && IOStatus.okayToRetry(n)) {
WEA.poll(this, Net.POLLOUT);
continue;
}
return IOStatus.normalize(n);
}
} finally {
Expand Down Expand Up @@ -650,6 +678,10 @@ public boolean connect(SocketAddress sa) throws IOException {
if ( (n == IOStatus.INTERRUPTED)
&& isOpen())
continue;
if (isBlocking() && WispEngine.transparentWispSwitch() && IOStatus.okayToRetry(n)) {
WEA.poll(this, Net.POLLOUT);
continue;
}
break;
}

Expand Down Expand Up @@ -950,7 +982,11 @@ int poll(int events, long timeout) throws IOException {
return 0;
readerThread = NativeThread.current();
}
n = Net.poll(fd, events, timeout);
if (WispEngine.transparentWispSwitch()) {
n = WEA.poll(this, events, timeout);
} else {
n = Net.poll(fd, events, timeout);
}
} finally {
readerCleanup();
end(n > 0);
Expand Down
Loading

0 comments on commit 1ae35b8

Please sign in to comment.