Skip to content

Commit

Permalink
suqash-7
Browse files Browse the repository at this point in the history
  • Loading branch information
Tilmann Zäschke committed May 29, 2024
1 parent 289ee6f commit d10cac2
Show file tree
Hide file tree
Showing 27 changed files with 368 additions and 191 deletions.
55 changes: 30 additions & 25 deletions src/main/java/org/scion/jpan/AbstractDatagramChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.nio.channels.ClosedChannelException;
import java.nio.channels.DatagramChannel;
import java.nio.channels.NotYetConnectedException;
import java.time.Instant;
import java.util.Objects;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
Expand Down Expand Up @@ -180,13 +179,13 @@ public InetSocketAddress getLocalAddress() throws IOException {
* Returns the remote address.
*
* @see DatagramChannel#getRemoteAddress()
* @return The remote address.
* @return The remote address as ScionSocketAddress.
* @throws IOException If an I/O error occurs
*/
public InetSocketAddress getRemoteAddress() throws IOException {
public ScionSocketAddress getRemoteAddress() throws IOException {
Path path = getConnectionPath();
if (path != null) {
return new InetSocketAddress(path.getRemoteAddress(), path.getRemotePort());
return ScionSocketAddress.fromPath(path);
}
return null;
}
Expand Down Expand Up @@ -232,6 +231,10 @@ public C connect(SocketAddress addr) throws IOException {
throw new IllegalArgumentException(
"connect() requires an InetSocketAddress or a ScionSocketAddress.");
}
if (addr instanceof ScionSocketAddress) {
// TODO check if this is a RequestPath?
return connect((RequestPath) ((ScionSocketAddress) addr).getPath());
}
return connect(pathPolicy.filter(getOrCreateService().getPaths((InetSocketAddress) addr)));
}
}
Expand Down Expand Up @@ -398,13 +401,14 @@ public void setOverrideSourceAddress(InetSocketAddress address) {
this.overrideExternalAddress = address;
}

protected int sendRaw(ByteBuffer buffer, InetSocketAddress address, Path path)
protected int sendRaw(ByteBuffer buffer, InetSocketAddress firstHop, Path path)
throws IOException {
if (cfgRemoteDispatcher && path != null && path.getRawPath().length == 0) {
// TODO remove this once dispatcher is removed
return channel.send(
buffer, new InetSocketAddress(address.getAddress(), Constants.DISPATCHER_PORT));
buffer, new InetSocketAddress(firstHop.getAddress(), Constants.DISPATCHER_PORT));

Check warning on line 409 in src/main/java/org/scion/jpan/AbstractDatagramChannel.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/scion/jpan/AbstractDatagramChannel.java#L409

Added line #L409 was not covered by tests
}
return channel.send(buffer, address);
return channel.send(buffer, firstHop);
}

protected int sendRaw(ByteBuffer buffer, InetSocketAddress address) throws IOException {
Expand Down Expand Up @@ -534,20 +538,22 @@ protected final ByteBuffer getBufferReceive(int requiredSize) {
}

/**
* @param path path
* @param dst Destination address
* @param payloadLength payload length
* @return argument path or a new path if the argument path was expired
* @throws IOException in case of IOException.
*/
protected Path checkPathAndBuildHeader(
ByteBuffer buffer, Path path, int payloadLength, InternalConstants.HdrTypes hdrType)
protected void checkPathAndBuildHeader(
ByteBuffer buffer,
ScionSocketAddress dst,
int payloadLength,
InternalConstants.HdrTypes hdrType)
throws IOException {
synchronized (stateLock) {
if (path instanceof RequestPath) {
path = ensureUpToDate((RequestPath) path);
if (dst.isRequestAddress()) {
ensureUpToDate(dst);
}
buildHeader(buffer, path, payloadLength, hdrType);
return path;
buildHeader(buffer, dst.getPath(), payloadLength, hdrType);
}
}

Expand Down Expand Up @@ -610,8 +616,7 @@ protected void buildHeader(
rawPath.length,
srcIA,
srcAddress.getAddress(),
path.getRemoteIsdAs(),
path.getRemoteAddress().getAddress(),
path,
hdrType,
cfgTrafficClass);
ScionHeaderParser.writePath(buffer, rawPath);
Expand All @@ -623,17 +628,13 @@ protected void buildHeader(
}
}

protected RequestPath ensureUpToDate(RequestPath path) throws IOException {
protected void ensureUpToDate(ScionSocketAddress address) throws IOException {
synchronized (stateLock) {
if (Instant.now().getEpochSecond() + cfgExpirationSafetyMargin <= path.getExpiration()) {
return path;
}
// expired, get new path
RequestPath newPath = pathPolicy.filter(getOrCreateService().getPaths(path));
if (isConnected()) {
updateConnection(newPath, true);
if (address.refreshPath(getOrCreateService(), pathPolicy, cfgExpirationSafetyMargin)) {
if (isConnected()) {
updateConnection(address.getPath().asRequestPath(), true);
}
}
return newPath;
}
}

Expand Down Expand Up @@ -673,4 +674,8 @@ protected ReentrantLock readLock() {
protected ReentrantLock writeLock() {
return writeLock;
}

protected int getCfgExpirySafetyMargin() {
return cfgExpirationSafetyMargin;
}
}
4 changes: 4 additions & 0 deletions src/main/java/org/scion/jpan/Path.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ public long getRemoteIsdAs() {
return dstIsdAs;
}

RequestPath asRequestPath() {
return (RequestPath) this;
}

@Override
public String toString() {
return "Path{"
Expand Down
38 changes: 26 additions & 12 deletions src/main/java/org/scion/jpan/ScionDatagramChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public boolean isBlocking() {
return super.isBlocking();
}

public ResponsePath receive(ByteBuffer userBuffer) throws IOException {
public ScionSocketAddress receive(ByteBuffer userBuffer) throws IOException {
readLock().lock();
try {
ByteBuffer buffer = getBufferReceive(userBuffer.capacity());
Expand All @@ -67,7 +67,7 @@ public ResponsePath receive(ByteBuffer userBuffer) throws IOException {
}
ScionHeaderParser.extractUserPayload(buffer, userBuffer);
buffer.clear();
return receivePath;
return ScionSocketAddress.fromPath(receivePath);
} finally {
readLock().unlock();
}
Expand All @@ -84,48 +84,62 @@ public ResponsePath receive(ByteBuffer userBuffer) throws IOException {
* cannot be resolved to an ISD/AS.
* @see java.nio.channels.DatagramChannel#send(ByteBuffer, SocketAddress)
*/
public void send(ByteBuffer srcBuffer, SocketAddress destination) throws IOException {
public int send(ByteBuffer srcBuffer, SocketAddress destination) throws IOException {
if (!(destination instanceof InetSocketAddress)) {
throw new IllegalArgumentException("Address must be of type InetSocketAddress.");
}
if (destination instanceof ScionSocketAddress) {
ScionSocketAddress ssa = (ScionSocketAddress) destination;
send(srcBuffer, ssa);
return 12345; // TODO

Check warning on line 94 in src/main/java/org/scion/jpan/ScionDatagramChannel.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/scion/jpan/ScionDatagramChannel.java#L92-L94

Added lines #L92 - L94 were not covered by tests
}
InetSocketAddress dst = (InetSocketAddress) destination;
Path path = getPathPolicy().filter(getOrCreateService().getPaths(dst));
send(srcBuffer, path);
send(srcBuffer, ScionSocketAddress.fromPath(path));
return 12345; // TODO
}

/**
* Attempts to send the content of the buffer to the destinationAddress.
*
* @param srcBuffer Data to send
* @param path Path to destination. If this is a Request path and it is expired then it will
* @param address Path to destination. If this is a Request path and it is expired then it will
* automatically be replaced with a new path. Expiration of ResponsePaths is not checked
* @return either the path argument or a new path if the path was an expired RequestPath. Note
* that ResponsePaths are not checked for expiration.
* @throws IOException if an error occurs, e.g. if the destinationAddress is an IP address that
* cannot be resolved to an ISD/AS.
* @see java.nio.channels.DatagramChannel#send(ByteBuffer, SocketAddress)
*/
public Path send(ByteBuffer srcBuffer, Path path) throws IOException {
public void send(ByteBuffer srcBuffer, ScionSocketAddress address) throws IOException {
writeLock().lock();
try {
ByteBuffer buffer = getBufferSend(srcBuffer.remaining());
// + 8 for UDP overlay header length
Path actualPath =
checkPathAndBuildHeader(
buffer, path, srcBuffer.remaining() + 8, InternalConstants.HdrTypes.UDP);
int len = srcBuffer.remaining() + 8;
checkPathAndBuildHeader(buffer, address, len, InternalConstants.HdrTypes.UDP);
try {
buffer.put(srcBuffer);
} catch (BufferOverflowException e) {
throw new IOException("Packet is larger than max send buffer size.");
}
buffer.flip();
sendRaw(buffer, actualPath.getFirstHopAddress(), actualPath);
return actualPath;
sendRaw(buffer, address.getPath().getFirstHopAddress(), address.getPath());
// TODO consider TRICK
// - If read()/write() is used then we have no problem, getCurrentPath() works fine
// - receive() always contains a ResponsePath -> cannot expire
// - send() on server -> we have a ResponsePath -> cannot be refreshed
// - send() on client -> we have a RequestPath -> use with getCurrentPath() ???
} finally {
writeLock().unlock();
}
}

@Deprecated // Please use another send method instead
public void send(ByteBuffer srcBuffer, Path path) throws IOException {
send(srcBuffer, ScionSocketAddress.fromPath(path));
}

/**
* Read data from the connected stream.
*
Expand Down Expand Up @@ -169,7 +183,7 @@ public int write(ByteBuffer src) throws IOException {
ByteBuffer buffer = getBufferSend(src.remaining());
int len = src.remaining();
// + 8 for UDP overlay header length
checkPathAndBuildHeader(buffer, getConnectionPath(), len + 8, InternalConstants.HdrTypes.UDP);
checkPathAndBuildHeader(buffer, getRemoteAddress(), len + 8, InternalConstants.HdrTypes.UDP);
buffer.put(src);
buffer.flip();

Expand Down
95 changes: 50 additions & 45 deletions src/main/java/org/scion/jpan/ScionDatagramSocket.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.nio.channels.AlreadyBoundException;
import java.nio.channels.DatagramChannel;
import java.nio.channels.IllegalBlockingModeException;
import java.time.Instant;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
Expand Down Expand Up @@ -56,7 +55,8 @@ public class ScionDatagramSocket extends java.net.DatagramSocket {

private final SelectingDatagramChannel channel;
private boolean isBound = false;
private final SimpleCache<InetSocketAddress, Path> pathCache = new SimpleCache<>(100);
private final SimpleCache<InetSocketAddress, ScionSocketAddress> pathCache =
new SimpleCache<>(100);
private final Object closeLock = new Object();

public ScionDatagramSocket() throws SocketException {
Expand Down Expand Up @@ -257,7 +257,7 @@ public int getPort() {
}

@Override
public SocketAddress getRemoteSocketAddress() {
public ScionSocketAddress getRemoteSocketAddress() {
try {
return channel.getRemoteAddress();
} catch (IOException e) {
Expand All @@ -283,34 +283,37 @@ public void send(DatagramPacket packet) throws IOException {

// Synchronize on packet because this is what the Java DatagramSocket does.
synchronized (packet) {
// TODO synchronize also on writeLock()!
if (isConnected() && !channel.getRemoteAddress().equals(packet.getSocketAddress())) {
throw new IllegalArgumentException("Packet address does not match connected address");
}
channel.writeLock().lock();
try {
if (isConnected() && !channel.getRemoteAddress().equals(packet.getSocketAddress())) {
throw new IllegalArgumentException("Packet address does not match connected address");
}

Path path;
if (channel.isConnected()) {
path = channel.getConnectionPath();
} else {
InetSocketAddress addr = (InetSocketAddress) packet.getSocketAddress();
synchronized (pathCache) {
path = pathCache.get(addr);
if (path == null) {
path = channel.getPathPolicy().filter(channel.getOrCreateService2().getPaths(addr));
} else if (path instanceof RequestPath
&& ((RequestPath) path).getExpiration() > Instant.now().getEpochSecond()) {
// check expiration only for RequestPaths
RequestPath request = (RequestPath) path;
path = channel.getPathPolicy().filter(channel.getOrCreateService2().getPaths(request));
}
if (path == null) {
throw new IOException("Address is not resolvable in SCION: " + packet.getAddress());
ScionSocketAddress dstAddress;
if (channel.isConnected()) {
dstAddress = channel.getRemoteAddress();
} else {
InetSocketAddress addr = (InetSocketAddress) packet.getSocketAddress();
synchronized (pathCache) {
dstAddress = pathCache.get(addr);
if (dstAddress == null) {
dstAddress =
channel
.getOrCreateService2()
.lookupSocketAddress(packet.getAddress().getHostName(), packet.getPort());
}
dstAddress.refreshPath(
channel.getOrCreateService2(),
channel.getPathPolicy(),
channel.getCfgExpirySafetyMargin());
pathCache.put(addr, dstAddress);
}
pathCache.put(addr, path);
}
ByteBuffer buf = ByteBuffer.wrap(packet.getData(), packet.getOffset(), packet.getLength());
channel.send(buf, dstAddress);
} finally {
channel.writeLock().unlock();
}
ByteBuffer buf = ByteBuffer.wrap(packet.getData(), packet.getOffset(), packet.getLength());
channel.send(buf, path);
}
}

Expand All @@ -321,25 +324,27 @@ public synchronized void receive(DatagramPacket packet) throws IOException {

// We synchronize on the packet because that is what the Java socket does.
synchronized (packet) {
ByteBuffer receiveBuffer =
ByteBuffer.wrap(packet.getData(), packet.getOffset(), packet.getLength());
ResponsePath path = channel.receive(receiveBuffer);
if (path == null) {
// timeout occurred
throw new SocketTimeoutException();
}
// TODO this is not ideal, a client may not be connected. Use getService()==null?
if (!channel.isConnected()) {
synchronized (pathCache) {
InetAddress ip = path.getRemoteAddress();
InetSocketAddress addr = new InetSocketAddress(ip, path.getRemotePort());
pathCache.put(addr, path);
channel.readLock().lock();
try {
ByteBuffer receiveBuffer =
ByteBuffer.wrap(packet.getData(), packet.getOffset(), packet.getLength());
ScionSocketAddress path = channel.receive(receiveBuffer);
if (path == null) {
// timeout occurred
throw new SocketTimeoutException();
}
// TODO this is not ideal, a client may not be connected. Use getService()==null?
if (!channel.isConnected()) {
synchronized (pathCache) {
pathCache.put(path, path); // TODO use Set i.o. Map
}
}
receiveBuffer.flip();
packet.setLength(receiveBuffer.limit());
packet.setSocketAddress(path);
} finally {
channel.readLock().unlock();
}
receiveBuffer.flip();
packet.setLength(receiveBuffer.limit());
packet.setAddress(path.getRemoteAddress());
packet.setPort(path.getRemotePort());
}
}

Expand Down Expand Up @@ -545,7 +550,7 @@ public RequestPath getConnectionPath() {
* @return the cached Path or `null` if not path is found.
* @see #setPathCacheCapacity
*/
public synchronized Path getCachedPath(InetSocketAddress address) {
public synchronized ScionSocketAddress getCachedPath(InetSocketAddress address) {
synchronized (pathCache) {
return pathCache.get(address);
}
Expand Down
Loading

0 comments on commit d10cac2

Please sign in to comment.