Skip to content

Commit

Permalink
Add more methods for shutdown the stream (java-native-access#214)
Browse files Browse the repository at this point in the history
Motivation:

Shutdown the stream is something that has some "meaning" in quic so we should follow it.

Modifications:

- Add more overloads for shutting down the stream
- Deprecate QuicStreamChannel.SHUTDOWN_OUTPUT and users should use QuicStreamChannel.SEND_FIN

Result:

Clearer API / better semantics
  • Loading branch information
normanmaurer authored Mar 8, 2021
1 parent cb1d8f7 commit f781200
Show file tree
Hide file tree
Showing 7 changed files with 143 additions and 84 deletions.
62 changes: 57 additions & 5 deletions src/main/java/io/netty/incubator/codec/quic/QuicStreamChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import io.netty.channel.ChannelPromise;
import io.netty.channel.socket.DuplexChannel;

import java.net.Socket;

/**
* A QUIC stream.
*/
Expand All @@ -31,7 +29,57 @@ public interface QuicStreamChannel extends DuplexChannel {
* Should be added to a {@link ChannelFuture} when the FIN should be sent to the remote peer and no more
* writes will happen.
*/
ChannelFutureListener SHUTDOWN_OUTPUT = f -> ((QuicStreamChannel) f.channel()).shutdownOutput();
ChannelFutureListener WRITE_FIN = f -> f.channel().writeAndFlush(QuicStreamFrame.EMPTY_FIN);

/**
* @deprecated use {@link #WRITE_FIN}
*/
@Deprecated
ChannelFutureListener SHUTDOWN_OUTPUT = WRITE_FIN;

@Override
default ChannelFuture shutdownInput() {
return shutdownInput(0);
}

@Override
default ChannelFuture shutdownInput(ChannelPromise promise) {
return shutdownInput(0, promise);
}

@Override
default ChannelFuture shutdownOutput() {
return shutdownOutput(0);
}

@Override
default ChannelFuture shutdownOutput(ChannelPromise promise) {
return shutdownOutput(0, promise);
}

@Override
default ChannelFuture shutdown() {
return shutdown(newPromise());
}

/**
* Shortcut for calling {@link #shutdownInput(int)} and {@link #shutdownInput(int)}.
*
* @param error the error to send.
* @return the future that is notified on completion.
*/
default ChannelFuture shutdown(int error) {
return shutdown(error, newPromise());
}

/**
* Shortcut for calling {@link #shutdownInput(int, ChannelPromise)} and {@link #shutdownInput(int, ChannelPromise)}.
*
* @param error the error to send.
* @param promise will be notified on completion.
* @return the future that is notified on completion.
*/
ChannelFuture shutdown(int error, ChannelPromise promise);

/**
* Shutdown the input of the stream with the given error code. This means a {@code STOP_SENDING} frame will
Expand All @@ -40,7 +88,9 @@ public interface QuicStreamChannel extends DuplexChannel {
* @param error the error to send.
* @return the future that is notified on completion.
*/
ChannelFuture shutdownInput(int error);
default ChannelFuture shutdownInput(int error) {
return shutdownInput(error, newPromise());
}

/**
* Shutdown the input of the stream with the given error code. This means a {@code STOP_SENDING} frame will
Expand All @@ -59,7 +109,9 @@ public interface QuicStreamChannel extends DuplexChannel {
* @param error the error to send.
* @return the future that is notified on completion.
*/
ChannelFuture shutdownOutput(int error);
default ChannelFuture shutdownOutput(int error) {
return shutdownOutput(error, newPromise());
}

/**
* Shutdown the output of the stream with the given error code. This means a {@code RESET_STREAM} frame will
Expand Down
71 changes: 71 additions & 0 deletions src/main/java/io/netty/incubator/codec/quic/QuicStreamFrame.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,83 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.buffer.Unpooled;

/**
* A QUIC STREAM_FRAME.
*/
public interface QuicStreamFrame extends ByteBufHolder {

/**
* An empty {@link QuicStreamFrame} that has the {@code FIN} flag set.
*/
QuicStreamFrame EMPTY_FIN = new QuicStreamFrame() {
@Override
public boolean hasFin() {
return true;
}

@Override
public QuicStreamFrame copy() {
return this;
}

@Override
public QuicStreamFrame duplicate() {
return this;
}

@Override
public QuicStreamFrame retainedDuplicate() {
return this;
}

@Override
public QuicStreamFrame replace(ByteBuf content) {
return new DefaultQuicStreamFrame(content, hasFin());
}

@Override
public QuicStreamFrame retain() {
return this;
}

@Override
public QuicStreamFrame retain(int increment) {
return this;
}

@Override
public QuicStreamFrame touch() {
return this;
}

@Override
public QuicStreamFrame touch(Object hint) {
return this;
}

@Override
public ByteBuf content() {
return Unpooled.EMPTY_BUFFER;
}

@Override
public int refCnt() {
return 1;
}

@Override
public boolean release() {
return false;
}

@Override
public boolean release(int decrement) {
return false;
}
};

/**
* Returns {@code true} if the frame has the FIN set, which means it notifies the remote peer that
* there will be no more writing happen. {@code false} otherwise.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -676,15 +676,7 @@ QuicStreamType streamType(long streamId) {
return (streamId & 0x2) == 0 ? QuicStreamType.BIDIRECTIONAL : QuicStreamType.UNIDIRECTIONAL;
}

void streamShutdownRead(long streamId, int err, ChannelPromise promise) {
streamShutdown0(streamId, true, false, err, promise);
}

void streamShutdownWrite(long streamId, int err, ChannelPromise promise) {
streamShutdown0(streamId, false, true, err, promise);
}

private void streamShutdown0(long streamId, boolean read, boolean write, int err, ChannelPromise promise) {
void streamShutdown(long streamId, boolean read, boolean write, int err, ChannelPromise promise) {
final long connectionAddress;
try {
connectionAddress = connectionAddressChecked();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,21 +153,6 @@ public boolean isInputShutdown() {
return inputShutdown;
}

@Override
public ChannelFuture shutdownInput() {
return shutdownInput(newPromise());
}

@Override
public ChannelFuture shutdownInput(ChannelPromise channelPromise) {
return shutdownInput(0, channelPromise);
}

@Override
public ChannelFuture shutdownInput(int error) {
return shutdownInput(0);
}

@Override
public ChannelFuture shutdownInput(int error, ChannelPromise promise) {
if (eventLoop().inEventLoop()) {
Expand All @@ -178,11 +163,6 @@ public ChannelFuture shutdownInput(int error, ChannelPromise promise) {
return promise;
}

@Override
public ChannelFuture shutdownOutput(int error) {
return shutdownOutput(error, newPromise());
}

@Override
public ChannelFuture shutdownOutput(int error, ChannelPromise promise) {
if (eventLoop().inEventLoop()) {
Expand All @@ -200,7 +180,7 @@ public QuicheQuicChannel parent() {

private void shutdownInput0(int err, ChannelPromise channelPromise) {
inputShutdown = true;
parent().streamShutdownRead(streamId(), err, channelPromise);
parent().streamShutdown(streamId(), true, false, err, channelPromise);
closeIfDone();
}

Expand All @@ -209,37 +189,8 @@ public boolean isOutputShutdown() {
return outputShutdown;
}

@Override
public ChannelFuture shutdownOutput() {
return shutdownOutput(newPromise());
}

@Override
public ChannelFuture shutdownOutput(ChannelPromise channelPromise) {
if (eventLoop().inEventLoop()) {
shutdownOutput0(channelPromise);
} else {
eventLoop().execute(() -> shutdownOutput0(channelPromise));
}
return channelPromise;
}

private void shutdownOutput0(ChannelPromise channelPromise) {
try {
// Just send a FIN to shutdown the output as we don't want to drop the already queued packets in the
// quic connection for this stream.
sendFinIfNeeded();
} catch (Throwable e) {
channelPromise.setFailure(e);
return;
}
channelPromise.setSuccess();
outputShutdown = true;
closeIfDone();
}

private void shutdownOutput0(int error, ChannelPromise channelPromise) {
parent().streamShutdownWrite(streamId(), error, channelPromise);
parent().streamShutdown(streamId(), false, true, error, channelPromise);
outputShutdown = true;
closeIfDone();
}
Expand All @@ -250,32 +201,25 @@ public boolean isShutdown() {
}

@Override
public ChannelFuture shutdown() {
return shutdown(newPromise());
public ChannelFuture shutdown(ChannelPromise channelPromise) {
return shutdown(0, channelPromise);
}

@Override
public ChannelFuture shutdown(ChannelPromise channelPromise) {
public ChannelFuture shutdown(int error, ChannelPromise promise) {
if (eventLoop().inEventLoop()) {
shutdown0(channelPromise);
shutdown0(error, promise);
} else {
eventLoop().execute(() -> shutdown0(channelPromise));
eventLoop().execute(() -> shutdown0(error, promise));
}
return channelPromise;
return promise;
}

private void shutdown0(ChannelPromise channelPromise) {
try {
// Just send a FIN to shutdown the output as we don't want to drop the already queued packets in the
// quic connection for this stream.
sendFinIfNeeded();
} catch (Throwable e) {
channelPromise.setFailure(e);
return;
}
private void shutdown0(int error, ChannelPromise channelPromise) {

inputShutdown = true;
outputShutdown = true;
parent().streamShutdownRead(streamId(), 0, channelPromise);
parent().streamShutdown(streamId(), true, true, error, channelPromise);
closeIfDone();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void channelActive(ChannelHandlerContext ctx) {
public void channelActive(ChannelHandlerContext ctx) {
// Do the write and close the channel
ctx.writeAndFlush(Unpooled.buffer().writeZero(8))
.addListener(QuicStreamChannel.SHUTDOWN_OUTPUT);
.addListener(QuicStreamChannel.WRITE_FIN);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
}).sync().getNow();
// Write the data and send the FIN. After this its not possible anymore to write any more data.
streamChannel.writeAndFlush(Unpooled.copiedBuffer("GET /\r\n", CharsetUtil.US_ASCII))
.addListener(QuicStreamChannel.SHUTDOWN_OUTPUT);
.addListener(QuicStreamChannel.WRITE_FIN);

// Wait for the stream channel and quic channel to be closed (this will happen after we received the FIN).
// After this is done we will close the underlying datagram channel.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buffer = ctx.alloc().directBuffer();
buffer.writeCharSequence("Hello World!\r\n", CharsetUtil.US_ASCII);
// Write the buffer and shutdown the output by writing a FIN.
ctx.writeAndFlush(buffer).addListener(QuicStreamChannel.SHUTDOWN_OUTPUT);
ctx.writeAndFlush(buffer).addListener(QuicStreamChannel.WRITE_FIN);
}
} finally {
byteBuf.release();
Expand Down

0 comments on commit f781200

Please sign in to comment.