Skip to content

Commit

Permalink
NetworkEndpoint and NetworkServer
Browse files Browse the repository at this point in the history
  • Loading branch information
pull-vert committed Jan 4, 2025
1 parent 4c9c83f commit 0fb5278
Show file tree
Hide file tree
Showing 36 changed files with 507 additions and 626 deletions.
22 changes: 10 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,20 @@ dependencies {
```

```java
var freePortNumber = 54321;
try (var serverSocket = new ServerSocket(freePortNumber)) {
var serverThread = Thread.startVirtualThread(() -> {
try (var acceptedSocketEndpoint = SocketEndpoint.from(serverSocket.accept());
var serverWriter = Jayo.buffer(acceptedSocketEndpoint.getWriter())) {
// Let the system pick up a local free port
try (var listener = NetworkServer.bindTcp(new InetSocketAddress(0))) {
var serverThread = Thread.startVirtualThread(() -> {
try (var serverEndpoint = listener.accept();
var serverWriter = Jayo.buffer(serverEndpoint.getWriter())) {
serverWriter.write("The Answer to the Ultimate Question of Life is ")
.writeUtf8CodePoint('4')
.writeUtf8CodePoint('2');
} catch (IOException e) {
fail("Unexpected exception", e);
.writeUtf8CodePoint('4')
.writeUtf8CodePoint('2');
}
});
try (var clientSocketEndpoint = SocketEndpoint.from(new Socket("localhost", freePortNumber));
var clientReader = Jayo.buffer(clientSocketEndpoint.getReader())) {
try (var clientEndpoint = NetworkEndpoint.connectTcp(listener.getLocalAddress());
var clientReader = Jayo.buffer(clientEndpoint.getReader())) {
assertThat(clientReader.readString())
.isEqualTo("The Answer to the Ultimate Question of Life is 42");
.isEqualTo("The Answer to the Ultimate Question of Life is 42");
}
serverThread.join();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ package jayo.benchmarks

import jayo.Reader
import jayo.buffered
import jayo.endpoints.endpoint
import jayo.reader
import jayo.writer
import okio.BufferedSource
import okio.buffer
import okio.source
Expand Down Expand Up @@ -71,17 +72,15 @@ open class SocketReaderBenchmark {
}
clientSocket = Socket("localhost", serverSocket.localPort)
clientSocketChannel = SocketChannel.open(serverSocket.localSocketAddress)
val clientSocketEndpoint = clientSocket.endpoint()
val clientSocketChannelEndpoint = clientSocketChannel.endpoint()
when (type) {
"jayo-io" -> {
clientOutputStream = clientSocket.getOutputStream()
jayoReader = clientSocketEndpoint.reader.buffered()
jayoReader = clientSocket.reader().buffered()
}

"jayo-nio" -> {
clientOutputStream = clientSocketChannelEndpoint.writer.buffered().asOutputStream()
jayoReader = clientSocketChannelEndpoint.reader.buffered()
clientOutputStream = clientSocketChannel.writer().buffered().asOutputStream()
jayoReader = clientSocketChannel.reader().buffered()
}

"okio" -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package jayo.benchmarks

import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import jayo.buffered
import jayo.endpoints.endpoint
import jayo.kotlinx.serialization.decodeFromReader
import jayo.kotlinx.serialization.encodeToWriter
import jayo.reader
import jayo.writer
import kotlinx.serialization.ExperimentalSerializationApi
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.okio.decodeFromBufferedSource
Expand Down Expand Up @@ -92,8 +93,7 @@ open class TcpAndJsonSerializationBenchmark {
fun readerJayo() {
Socket().use { socket ->
socket.connect(senderServer.localSocketAddress)
val socketEndpoint = socket.endpoint()
socketEndpoint.reader.buffered().use { reader ->
socket.reader().buffered().use { reader ->
val decoded = kotlinxSerializationMapper.decodeFromReader(
JsonSerializationBenchmark.DefaultPixelEvent.serializer(),
reader
Expand Down Expand Up @@ -121,8 +121,7 @@ open class TcpAndJsonSerializationBenchmark {
fun senderJayo() {
val socket = Socket()
socket.connect(receiverServer.localSocketAddress)
val socketEndpoint = socket.endpoint()
socketEndpoint.writer.buffered().use { writer ->
socket.writer().buffered().use { writer ->
kotlinxSerializationMapper.encodeToWriter(
JsonSerializationBenchmark.DefaultPixelEvent.serializer(),
defaultPixelEvent,
Expand All @@ -136,8 +135,7 @@ open class TcpAndJsonSerializationBenchmark {
fun senderJayoJackson() {
val socket = Socket()
socket.connect(receiverServer.localSocketAddress)
val socketEndpoint = socket.endpoint()
socketEndpoint.writer.buffered().use { writer ->
socket.writer().buffered().use { writer ->
val output = writer.asOutputStream()
objectMapper.writeValue(output, defaultPixelEvent)
output.flush()
Expand Down
2 changes: 1 addition & 1 deletion build-logic/src/main/kotlin/jayo-commons.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ val javaVersion = catalogVersion("java").toInt()
val isCI = providers.gradleProperty("isCI")

val koverage = mapOf(
"jayo" to if (isCI.isPresent) 82 else 84,
"jayo" to if (isCI.isPresent) 82 else 82,
"jayo-3p-kotlinx-serialization" to 55,
)

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/jayo/Endpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import java.io.Closeable;

/**
* Your endpoint on an I/O connection between you and one or several peer(s).
* Your endpoint on an I/O connection between you and a peer.
* <p>
* An endpoint is plugged to an open connection, you can read incoming data thanks to {@link #getReader()} and write
* data thanks to {@link #getWriter()}.
Expand Down
44 changes: 44 additions & 0 deletions core/src/main/java/jayo/Jayo.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.jspecify.annotations.NonNull;

import java.io.*;
import java.net.Socket;
import java.nio.channels.FileChannel;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ReadableByteChannel;
Expand All @@ -42,6 +43,7 @@
import java.util.zip.Inflater;

import static java.lang.System.Logger.Level.DEBUG;
import static java.lang.System.Logger.Level.WARNING;

/**
* Essential APIs for working with Jayo.
Expand Down Expand Up @@ -121,6 +123,48 @@ private Jayo() {
return new InputStreamRawReader(in);
}

/**
* @return a raw writer that writes to {@code socket}. Prefer this over {@link #writer(OutputStream)} because this
* method honors timeouts. When the socket write times out, the socket is asynchronously closed by a watchdog
* thread.
*/
public static @NonNull RawWriter writer(final @NonNull Socket socket) {
Objects.requireNonNull(socket);
final var timeout = new RealAsyncTimeout(() -> {
try {
socket.close();
} catch (Exception e) {
LOGGER.log(WARNING, "Failed to close timed out socket " + socket, e);
}
});
try {
return timeout.writer(new OutputStreamRawWriter(socket.getOutputStream()), 0L);
} catch (IOException e) {
throw JayoException.buildJayoException(e);
}
}

/**
* @return a raw reader that reads from {@code socket}. Prefer this over {@link #reader(InputStream)} because this
* method honors timeouts. When the socket read times out, the socket is asynchronously closed by a watchdog
* thread.
*/
public static @NonNull RawReader reader(final @NonNull Socket socket) {
Objects.requireNonNull(socket);
final var timeout = new RealAsyncTimeout(() -> {
try {
socket.close();
} catch (Exception e) {
LOGGER.log(WARNING, "Failed to close timed out socket " + socket, e);
}
});
try {
return timeout.reader(new InputStreamRawReader(socket.getInputStream()), 0L);
} catch (IOException e) {
throw JayoException.buildJayoException(e);
}
}

/**
* @return a raw writer that writes to {@code out} gathering byte channel.
*/
Expand Down
13 changes: 4 additions & 9 deletions core/src/main/java/jayo/JayoClosedEndpointException.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,22 @@
import org.jspecify.annotations.NonNull;

import java.io.IOException;
import java.net.SocketException;
import java.nio.channels.ClosedChannelException;
import java.util.Objects;

/**
* Exception thrown when an attempt is made to invoke or complete an I/O operation upon endpoint that is closed, or at
* least closed to that operation.
* <p>
* It can also wrap a {@link ClosedChannelException} or a {@link SocketException} when its message is
* <i>"Socket is closed"</i> with this unchecked exception.
* We also wrap a {@linkplain java.nio.channels.ClosedChannelException ClosedChannelException} or a
* {@linkplain java.net.SocketException SocketException} when its message is <i>"Socket is closed"</i> or any
* {@link IOException} when its message is <i>"Broken Pipe"</i> with this unchecked exception.
*/
public final class JayoClosedEndpointException extends JayoException {
public JayoClosedEndpointException() {
super(new IOException());
}

public JayoClosedEndpointException(final @NonNull ClosedChannelException cause) {
super(Objects.requireNonNull(cause));
}

public JayoClosedEndpointException(final @NonNull SocketException cause) {
public JayoClosedEndpointException(final @NonNull IOException cause) {
super(Objects.requireNonNull(cause));
}
}
7 changes: 6 additions & 1 deletion core/src/main/java/jayo/JayoException.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,12 @@ public static JayoException buildJayoException(final @NonNull IOException ioExce
new JayoTlsPeerUnverifiedException(sslPeerUnverifiedException);
case SSLException sslException -> new JayoTlsException(sslException);

default -> new JayoException(ioException);
default -> {
if (BROKEN_PIPE_SOCKET_MESSAGE.equals(ioException.getMessage())) {
yield new JayoClosedEndpointException(ioException);
}
yield new JayoException(ioException);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package jayo.internal;

import jayo.Buffer;
import jayo.JayoClosedEndpointException;
import jayo.RawWriter;
import jayo.JayoException;
import jayo.external.CancelToken;
Expand Down Expand Up @@ -40,7 +41,7 @@ public void write(final @NonNull Buffer reader, final @NonNegative long byteCoun
throw new IllegalArgumentException("reader must be an instance of RealBuffer");
}
if (!out.isOpen()) {
throw new IllegalStateException("Channel is closed");
throw new JayoClosedEndpointException();
}

// get cancel token immediately, if present it will be used in all I/O calls
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,29 @@
@SuppressWarnings("RawUseOfParameterized")
public abstract sealed class NetworkEndpointBuilder<T extends NetworkEndpoint.Builder<T>>
implements NetworkEndpoint.Builder<T> {
private @Nullable Duration connectTimeout = null;
private @Nullable Duration readTimeout = null;
private @Nullable Duration writeTimeout = null;
private @NonNegative long connectTimeoutNanos = 0L;
private @NonNegative long readTimeoutNanos = 0L;
private @NonNegative long writeTimeoutNanos = 0L;
private final @NonNull Map<@NonNull SocketOption, @Nullable Object> socketOptions = new HashMap<>();

@Override
public final @NonNull T connectTimeout(final @NonNull Duration connectTimeout) {
this.connectTimeout = Objects.requireNonNull(connectTimeout);
Objects.requireNonNull(connectTimeout);
this.connectTimeoutNanos = connectTimeout.toNanos();
return getThis();
}

@Override
public final @NonNull T readTimeout(final @NonNull Duration readTimeout) {
this.readTimeout = Objects.requireNonNull(readTimeout);
Objects.requireNonNull(readTimeout);
this.readTimeoutNanos = readTimeout.toNanos();
return getThis();
}

@Override
public final @NonNull T writeTimeout(final @NonNull Duration writeTimeout) {
this.writeTimeout = Objects.requireNonNull(writeTimeout);
Objects.requireNonNull(writeTimeout);
this.writeTimeoutNanos = writeTimeout.toNanos();
return getThis();
}

Expand All @@ -52,19 +55,15 @@ public abstract sealed class NetworkEndpointBuilder<T extends NetworkEndpoint.Bu
}

@Override
public final @NonNull NetworkEndpoint connect(final @NonNull SocketAddress remote) {
Objects.requireNonNull(remote);
return connectInternal(remote,
(connectTimeout != null) ? connectTimeout.toNanos() : 0L,
(readTimeout != null) ? readTimeout.toNanos() : 0L,
(writeTimeout != null) ? writeTimeout.toNanos() : 0L,
socketOptions);
public final @NonNull NetworkEndpoint connect(final @NonNull SocketAddress peerAddress) {
Objects.requireNonNull(peerAddress);
return connectInternal(peerAddress, connectTimeoutNanos, readTimeoutNanos, writeTimeoutNanos, socketOptions);
}

abstract @NonNull T getThis();

abstract @NonNull NetworkEndpoint connectInternal(
final @NonNull SocketAddress remote,
final @NonNull SocketAddress peerAddress,
final @NonNegative long connectTimeoutNanos,
final @NonNegative long defaultReadTimeoutNanos,
final @NonNegative long defaultWriteTimeoutNanos,
Expand All @@ -88,20 +87,15 @@ Nio getThis() {

@Override
@NonNull
NetworkEndpoint connectInternal(final @NonNull SocketAddress remote,
NetworkEndpoint connectInternal(final @NonNull SocketAddress peerAddress,
final @NonNegative long connectTimeoutNanos,
final @NonNegative long defaultReadTimeoutNanos,
final @NonNegative long defaultWriteTimeoutNanos,
final @NonNull Map<@NonNull SocketOption, @Nullable Object> socketOptions) {
Objects.requireNonNull(remote);

final var connectTimeoutMillisAsLong = connectTimeoutNanos / 1_000_000L;
final var connectTimeoutMillis = (connectTimeoutMillisAsLong > Integer.MAX_VALUE)
? 0 // = infinite timeout
: (int) connectTimeoutMillisAsLong;
assert peerAddress != null;
return SocketChannelNetworkEndpoint.connect(
remote,
connectTimeoutMillis,
peerAddress,
connectTimeoutNanos,
defaultReadTimeoutNanos,
defaultWriteTimeoutNanos,
socketOptions,
Expand All @@ -119,20 +113,15 @@ Io getThis() {

@Override
@NonNull
NetworkEndpoint connectInternal(final @NonNull SocketAddress remote,
NetworkEndpoint connectInternal(final @NonNull SocketAddress peerAddress,
final @NonNegative long connectTimeoutNanos,
final @NonNegative long defaultReadTimeoutNanos,
final @NonNegative long defaultWriteTimeoutNanos,
final @NonNull Map<@NonNull SocketOption, @Nullable Object> socketOptions) {
assert remote != null;

final var connectTimeoutMillisAsLong = connectTimeoutNanos / 1_000_000L;
final var connectTimeoutMillis = (connectTimeoutMillisAsLong > Integer.MAX_VALUE)
? 0 // = infinite timeout
: (int) connectTimeoutMillisAsLong;
assert peerAddress != null;
return SocketNetworkEndpoint.connect(
remote,
connectTimeoutMillis,
peerAddress,
connectTimeoutNanos,
defaultReadTimeoutNanos,
defaultWriteTimeoutNanos,
socketOptions);
Expand Down
Loading

0 comments on commit 0fb5278

Please sign in to comment.