Skip to content

Commit

Permalink
SCMP responder (#78)
Browse files Browse the repository at this point in the history
* SCMP responder

---------

Co-authored-by: Tilmann Zäschke <tilmann.zaeschke@inf.ethz.ch>
  • Loading branch information
tzaeschke and Tilmann Zäschke authored May 29, 2024
1 parent 940a03d commit 0a0def2
Show file tree
Hide file tree
Showing 13 changed files with 310 additions and 52 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

## [Unreleased]

## Added
- SCMP echo responder [#78](https://github.com/scionproto-contrib/jpan/pull/78)

### Changed
- Some API changes: [#67](https://github.com/scionproto-contrib/jpan/pull/67)
- Rename `DatagramChannel` to `ScionDatagramChannel`
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/org/scion/jpan/AbstractDatagramChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public InetSocketAddress getRemoteAddress() throws IOException {

public void disconnect() throws IOException {
synchronized (stateLock) {
channel.disconnect();
channel.disconnect(); // TODO Why ? We shouldn´t do that...?
connectionPath = null;
}
}
Expand Down Expand Up @@ -401,7 +401,8 @@ public void setOverrideSourceAddress(InetSocketAddress address) {
protected int sendRaw(ByteBuffer buffer, InetSocketAddress address, Path path)
throws IOException {
if (cfgRemoteDispatcher && path != null && path.getRawPath().length == 0) {
return channel.send(buffer, new InetSocketAddress(address.getAddress(), 30041));
return channel.send(
buffer, new InetSocketAddress(address.getAddress(), Constants.DISPATCHER_PORT));
}
return channel.send(buffer, address);
}
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/org/scion/jpan/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@
package org.scion.jpan;

public final class Constants {
public static final int SCMP_PORT = 30041;
/**
* @deprecated Dispatcher support will be removed soon.
*/
@Deprecated public static final int DISPATCHER_PORT = 30041;

public static final String PROPERTY_DAEMON = "org.scion.daemon";
public static final String ENV_DAEMON = "SCION_DAEMON";
public static final String DEFAULT_DAEMON = "localhost:30255";
Expand Down
107 changes: 101 additions & 6 deletions src/main/java/org/scion/jpan/ScmpChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,19 @@
import java.util.Iterator;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.scion.jpan.internal.InternalConstants;
import org.scion.jpan.internal.PathHeaderParser;
import org.scion.jpan.internal.ScionHeaderParser;
import org.scion.jpan.internal.ScmpParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ScmpChannel implements AutoCloseable {
private static final Logger log = LoggerFactory.getLogger(ScmpChannel.class);
private int timeOutMs = 1000;
private final InternalChannel channel;
@Deprecated private RequestPath path;
@Deprecated private final RequestPath path;

ScmpChannel() throws IOException {
this(Scion.defaultService(), 12345);
Expand Down Expand Up @@ -158,17 +162,46 @@ public Consumer<Scmp.Message> setScmpErrorListener(Consumer<Scmp.Message> listen
return channel.setScmpErrorListener(listener);
}

/**
* Install a listener for echo messages. The listener is called for every incoming echo request
* message. A response will be sent iff the listener returns 'true'. Any time spent in the
* listener counts towards the RTT of the echo request.
*
* <p>The listener will only be called for messages received during `setUpScmpEchoResponder()`.
*
* @param listener THe listener function
* @return Any previously installed listener or 'null' if none was installed.
* @see #setUpScmpEchoResponder()
*/
public Predicate<Scmp.EchoMessage> setScmpEchoListener(Predicate<Scmp.EchoMessage> listener) {
return channel.setScmpEchoListener(listener);
}

public <T> void setOption(SocketOption<T> option, T t) throws IOException {
channel.setOption(option, t);
}

/**
* Install an SCMP echo responder. This method blocks until interrupted. While blocking, it will
* answer all valid SCMP echo requests.
*
* <p>SCMP requests can be monitored and intercepted through a listener, see {@link
* #setScmpEchoListener(Predicate)}.
*
* @throws IOException If an IO exception occurs.
*/
public void setUpScmpEchoResponder() throws IOException {
this.channel.sendEchoResponses();
}

@FunctionalInterface
private interface IOCallable<V> {
V call() throws IOException;
}

private class InternalChannel extends AbstractDatagramChannel<InternalChannel> {
private final Selector selector;
private Predicate<Scmp.EchoMessage> echoListener;

protected InternalChannel(ScionService service, int port) throws IOException {
super(service);
Expand All @@ -192,12 +225,13 @@ Scmp.TimedMessage sendEchoRequest(Scmp.EchoMessage request) throws IOException {
int len = 8 + request.getData().length;
buildHeader(buffer, request.getPath(), len, InternalConstants.HdrTypes.SCMP);
int localPort = super.getLocalAddress().getPort();
ScmpParser.buildScmpPing(buffer, localPort, request.getSequenceNumber(), request.getData());
ScmpParser.buildScmpPing(
buffer, Scmp.Type.INFO_128, localPort, request.getSequenceNumber(), request.getData());
buffer.flip();
request.setSizeSent(buffer.remaining());
sendRaw(buffer, path.getFirstHopAddress());

int sizeReceived = receiveRequest(request);
int sizeReceived = receive(request);
request.setSizeReceived(sizeReceived);
return request;
} finally {
Expand All @@ -220,7 +254,7 @@ Scmp.TimedMessage sendTracerouteRequest(
buildHeader(buffer, path, len, InternalConstants.HdrTypes.SCMP);
int interfaceNumber = request.getSequenceNumber();
int localPort = super.getLocalAddress().getPort();
ScmpParser.buildScmpTraceroute(buffer, localPort, interfaceNumber);
ScmpParser.buildScmpTraceroute(buffer, Scmp.Type.INFO_130, localPort, interfaceNumber);
buffer.flip();

// Set flags for border routers to return SCMP packet
Expand All @@ -229,7 +263,7 @@ Scmp.TimedMessage sendTracerouteRequest(

sendRaw(buffer, path.getFirstHopAddress());

receiveRequest(request);
receive(request);
return request;
} finally {
writeLock().unlock();
Expand All @@ -239,7 +273,7 @@ Scmp.TimedMessage sendTracerouteRequest(
}
}

int receiveRequest(Scmp.TimedMessage request) throws IOException {
private int receive(Scmp.TimedMessage request) throws IOException {
readLock().lock();
try {
ByteBuffer buffer = getBufferReceive(DEFAULT_BUFFER_SIZE);
Expand Down Expand Up @@ -291,6 +325,67 @@ private ResponsePath receiveWithTimeout(ByteBuffer buffer) throws IOException {
}
}

void sendEchoResponses() throws IOException {
readLock().lock();
writeLock().lock();
int timeOut = timeOutMs;
setTimeOut(Integer.MAX_VALUE);
try {
while (true) {
ByteBuffer buffer = getBufferReceive(DEFAULT_BUFFER_SIZE);
ResponsePath path = receiveWithTimeout(buffer);
if (path == null) {
return; // interrupted
}

Scmp.Type type = ScmpParser.extractType(buffer);
log.info("Received SCMP message {} from {}", type, path.getRemoteAddress());
if (type == Scmp.Type.INFO_128) {
Scmp.EchoMessage msg = (Scmp.EchoMessage) Scmp.createMessage(Scmp.Type.INFO_128, path);
ScmpParser.consume(buffer, msg);

if (!checkEchoListener(msg)) {
continue;
}

// EchoHeader = 8 + data
int len = 8 + msg.getData().length;
buildHeader(buffer, msg.getPath(), len, InternalConstants.HdrTypes.SCMP);
int port = msg.getIdentifier();
ScmpParser.buildScmpPing(
buffer, Scmp.Type.INFO_129, port, msg.getSequenceNumber(), msg.getData());
buffer.flip();
msg.setSizeSent(buffer.remaining());
sendRaw(buffer, path.getFirstHopAddress());
log.info("Responded to SCMP {} from {}", type, path.getRemoteAddress());
} else {
log.info("Dropped SCMP message with type {} from {}", type, path.getRemoteAddress());
}
}
} finally {
setTimeOut(timeOut);
writeLock().unlock();
readLock().unlock();
}
}

protected boolean checkEchoListener(Scmp.EchoMessage scmpMsg) {
synchronized (this) {
if (echoListener != null && scmpMsg.getTypeCode() == Scmp.TypeCode.TYPE_128) {
return echoListener.test(scmpMsg);
}
}
return true;
}

public Predicate<Scmp.EchoMessage> setScmpEchoListener(Predicate<Scmp.EchoMessage> listener) {
synchronized (this) {
Predicate<Scmp.EchoMessage> old = echoListener;
echoListener = listener;
return old;
}
}

@Override
public void close() throws IOException {
super.close();
Expand Down
64 changes: 50 additions & 14 deletions src/main/java/org/scion/jpan/internal/ScionHeaderParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

import java.net.*;
import java.nio.ByteBuffer;
import org.scion.jpan.Constants;
import org.scion.jpan.ResponsePath;
import org.scion.jpan.Scmp;

/** Utility methods for reading and writing the Common Header and Address Header. */
public class ScionHeaderParser {
Expand Down Expand Up @@ -87,6 +89,16 @@ public static ResponsePath extractResponsePath(
byte[] bytesSrc = new byte[(sl + 1) * 4];
data.get(bytesSrc);

InetAddress srcIP;
InetAddress dstIP;
try {
srcIP = InetAddress.getByAddress(bytesSrc);
dstIP = InetAddress.getByAddress(bytesDst);
} catch (UnknownHostException e) {
// this cannot happen
throw new IllegalStateException(e);
}

// raw path
byte[] path = new byte[hdrLenBytes - data.position()];
if (path.length > 0) {
Expand All @@ -99,16 +111,7 @@ public static ResponsePath extractResponsePath(
data.position(hdrLenBytes);
int srcPort = Short.toUnsignedInt(data.getShort());
int dstPort = Short.toUnsignedInt(data.getShort());

InetAddress srcIP;
InetAddress dstIP;
try {
srcIP = InetAddress.getByAddress(bytesSrc);
dstIP = InetAddress.getByAddress(bytesDst);
} catch (UnknownHostException e) {
// this cannot happen
throw new IllegalStateException(e);
}
// TODO handle SCMP packets

// rewind to original offset
data.position(pos);
Expand All @@ -132,6 +135,8 @@ public static InetSocketAddress extractDestinationSocketAddress(ByteBuffer data)
throws UnknownHostException {
int start = data.position();

InternalConstants.HdrTypes hdrType = extractNextHeader(data);

int i1 = data.getInt(start + 4); // nextHeader, hdrLen, payLoadLen
int i2 = data.getInt(start + 8); // pathType, dt, dl, st, sl
int hdrLen = ByteUtil.readInt(i1, 8, 8);
Expand All @@ -150,16 +155,47 @@ public static InetSocketAddress extractDestinationSocketAddress(ByteBuffer data)
byte[] bytesDst = new byte[(dl + 1) * 4];
data.get(bytesDst);
InetAddress dstIP = InetAddress.getByAddress(bytesDst);

// get remote port from UDP overlay
data.position(start + hdrLenBytes + 2);
int dstPort = Short.toUnsignedInt(data.getShort());
int dstPort = extractDstPort(data, start + hdrLenBytes, hdrType);

// rewind to original offset
data.position(start);
return new InetSocketAddress(dstIP, dstPort);
}

private static int extractDstPort(
ByteBuffer data, int scmpHdrOffset, InternalConstants.HdrTypes hdrType) {
int dstPort;
if (hdrType == InternalConstants.HdrTypes.UDP) {
// get remote port from UDP overlay
data.position(scmpHdrOffset + 2);
dstPort = Short.toUnsignedInt(data.getShort());
} else if (hdrType == InternalConstants.HdrTypes.SCMP) {
// get remote port from SCMP header
data.position(scmpHdrOffset);
int type = ByteUtil.toUnsigned(data.get());
Scmp.Type t = Scmp.Type.parse(type);
if (t == Scmp.Type.INFO_128 || t == Scmp.Type.INFO_130) {
// request -> port is 30041
dstPort = Constants.SCMP_PORT;
} else if (t == Scmp.Type.INFO_129 || t == Scmp.Type.INFO_131) {
// response -> get port from SCMP identifier
data.position(scmpHdrOffset + 4);
dstPort = Short.toUnsignedInt(data.getShort());
} else {
int code = ByteUtil.toUnsigned(data.get());
Scmp.TypeCode tc = Scmp.TypeCode.parse(type, code);
if (tc.isError()) {
// TODO try extracting port from attached packet (may be UDP or SCMP)
return -1;
}
throw new UnsupportedOperationException(hdrType.name() + " " + tc.getText());
}
} else {
throw new UnsupportedOperationException();
}
return dstPort;
}

/**
* Extract the header length without changing the buffer's position.
*
Expand Down
20 changes: 5 additions & 15 deletions src/main/java/org/scion/jpan/internal/ScmpParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,27 +30,18 @@ public static void buildExtensionHeader(ByteBuffer buffer, InternalConstants.Hdr
}

public static void buildScmpPing(
ByteBuffer buffer, int identifier, int sequenceNumber, byte[] data) {
buffer.put(ByteUtil.toByte(Scmp.Type.INFO_128.id()));
ByteBuffer buffer, Scmp.Type type, int identifier, int sequenceNumber, byte[] data) {
buffer.put(ByteUtil.toByte(type.id()));
buffer.put(ByteUtil.toByte(0));
buffer.putShort((short) 0); // TODO checksum
buffer.putShort((short) identifier); // unsigned
buffer.putShort((short) sequenceNumber); // unsigned
buffer.put(data);
}

public static void buildScmpPing(
ByteBuffer buffer, int identifier, int sequenceNumber, ByteBuffer data) {
buffer.put(ByteUtil.toByte(Scmp.Type.INFO_128.id()));
buffer.put(ByteUtil.toByte(0));
buffer.putShort((short) 0); // TODO checksum
buffer.putShort((short) identifier); // unsigned
buffer.putShort((short) sequenceNumber); // unsigned
buffer.put(data);
}

public static void buildScmpTraceroute(ByteBuffer buffer, int identifier, int sequenceNumber) {
buffer.put(ByteUtil.toByte(Scmp.Type.INFO_130.id()));
public static void buildScmpTraceroute(
ByteBuffer buffer, Scmp.Type type, int identifier, int sequenceNumber) {
buffer.put(ByteUtil.toByte(type.id()));
buffer.put(ByteUtil.toByte(0));
buffer.putShort((short) 0); // TODO checksum
buffer.putShort((short) identifier); // unsigned
Expand Down Expand Up @@ -80,7 +71,6 @@ public static Scmp.Type extractType(ByteBuffer data) {
*
* @param data packet data
* @param holder SCMP message holder
* @return ScmpMessage object
*/
public static void consume(ByteBuffer data, Scmp.Message holder) {
int type = ByteUtil.toUnsigned(data.get());
Expand Down
Loading

0 comments on commit 0a0def2

Please sign in to comment.