Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

topo analyzer #121

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
- SocketConcurrency test takes too long. [#108](https://github.com/scionproto-contrib/jpan/issues/108)
- Fixed useless error message when providing incorrect daemon address.
Also: made port optional (default = 30255) [#114](https://github.com/scionproto-contrib/jpan/pull/114)
- Fixed SCMP packet loss on SCMP receive [#120](https://github.com/scionproto-contrib/jpan/pull/120)

### Removed
- Removed some useless IP printing functions.
Expand Down
Binary file added default-ihaquwv8.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added mytopo-8w67kdb8.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
96 changes: 96 additions & 0 deletions mytopo.topo
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
--- # My Topology
ASes:
"64-2:0:23":
core: true
voting: true
authoritative: true
cert_issuer: 64-2:0:13
"64-2:0:13":
core: true
voting: true
authoritative: true
issuing: true
"64-0:0:e816":
cert_issuer: 64-2:0:13
"64-0:0:ce7":
core: true
voting: true
authoritative: true
issuing: true
"64-0:0:3d07":
core: true
voting: true
authoritative: true
cert_issuer: 64-2:0:13
"64-0:0:1a4a":
core: true
voting: true
authoritative: true
cert_issuer: 64-2:0:13
"64-2:0:9":
cert_issuer: 64-2:0:13
"64-0:0:303e":
core: true
voting: true
authoritative: true
cert_issuer: 64-2:0:13
"64-0:0:22f":
core: true
voting: true
authoritative: true
cert_issuer: 64-2:0:13
links:
- {a: "64-0:0:ce7#19", b: "64-0:0:3d07#5", linkAtoB: CORE}
- {a: "64-0:0:303e#5", b: "64-0:0:3d07#1", linkAtoB: CORE}
- {a: "64-0:0:ce7#22", b: "64-0:0:303e#7", linkAtoB: CORE}
- {a: "64-0:0:ce7#10", b: "64-0:0:1a4a#8", linkAtoB: CORE}
- {a: "64-0:0:3d07#2", b: "64-2:0:13#24", linkAtoB: CORE}
- {a: "64-0:0:3d07#11", b: "64-2:0:13#29", linkAtoB: CORE}
- {a: "64-0:0:3d07#3", b: "64-2:0:13#22", linkAtoB: CORE}
- {a: "64-0:0:3d07#27", b: "64-2:0:13#46", linkAtoB: CORE}
- {a: "64-0:0:1a4a#22", b: "64-0:0:3d07#13", linkAtoB: CORE}
- {a: "64-0:0:22f#17", b: "64-0:0:ce7#1", linkAtoB: CORE}
- {a: "64-2:0:13#19", b: "64-2:0:23#2", linkAtoB: CORE}
- {a: "64-0:0:e816#35", b: "64-2:0:28#4", linkAtoB: CORE}
- {a: "64-0:0:3d07#17", b: "64-2:0:28#1", linkAtoB: CORE}
- {a: "64-0:0:e816#8", b: "64-2:0:13#33", linkAtoB: CORE}
- {a: "64-0:0:22f#26", b: "64-0:0:303e#4", linkAtoB: CORE}
- {a: "64-0:0:22f#22", b: "64-0:0:3d07#9", linkAtoB: CORE}
- {a: "64-0:0:3d07#20", b: "64-2:0:13#23", linkAtoB: CORE}
- {a: "64-0:0:3d07#24", b: "64-0:0:e816#3", linkAtoB: CORE}
- {a: "64-0:0:ce7#20", b: "64-0:0:3d07#12", linkAtoB: CORE}
- {a: "64-0:0:ce7#11", b: "64-0:0:1a4a#11", linkAtoB: CORE}
- {a: "64-0:0:22f#31", b: "64-2:0:13#31", linkAtoB: CORE}
- {a: "64-0:0:1a4a#3", b: "64-2:0:13#11", linkAtoB: CORE}
- {a: "64-0:0:22f#20", b: "64-2:0:13#16", linkAtoB: CORE}
- {a: "64-0:0:3d07#28", b: "64-2:0:13#44", linkAtoB: CORE}
- {a: "64-0:0:3d07#18", b: "64-2:0:28#2", linkAtoB: CORE}
- {a: "64-0:0:1a4a#35", b: "64-0:0:e816#9", linkAtoB: CORE}
- {a: "64-0:0:ce7#2", b: "64-2:0:13#7", linkAtoB: CORE}
- {a: "64-0:0:22f#21", b: "64-0:0:3d07#8", linkAtoB: CORE}
- {a: "64-0:0:22f#4", b: "64-2:0:13#15", linkAtoB: CORE}
- {a: "64-0:0:1a4a#34", b: "64-0:0:e816#4", linkAtoB: CORE}
- {a: "64-0:0:3d07#23", b: "64-0:0:e816#1", linkAtoB: CORE}
- {a: "64-2:0:13#30", b: "64-2:0:23#7", linkAtoB: CORE}
- {a: "64-0:0:22f#35", b: "64-0:0:303e#20003", linkAtoB: CORE}
- {a: "64-0:0:303e#2", b: "64-2:0:13#28", linkAtoB: CORE}
- {a: "64-0:0:22f#19", b: "64-0:0:ce7#9", linkAtoB: CORE}
- {a: "64-0:0:22f#5", b: "64-2:0:9#1", linkAtoB: CORE}
- {a: "64-2:0:13#21", b: "64-2:0:23#4", linkAtoB: CORE}
- {a: "64-0:0:3d07#29", b: "64-2:0:13#45", linkAtoB: CORE}
- {a: "64-0:0:22f#34", b: "64-0:0:e816#5", linkAtoB: CORE}
- {a: "64-0:0:22f#16", b: "64-0:0:1a4a#10", linkAtoB: CORE}
- {a: "64-0:0:303e#6", b: "64-0:0:3d07#10", linkAtoB: CORE}
- {a: "64-0:0:3d07#22", b: "64-0:0:e816#2", linkAtoB: CORE}
- {a: "64-0:0:22f#36", b: "64-0:0:303e#20004", linkAtoB: CORE}
- {a: "64-0:0:303e#1", b: "64-2:0:13#26", linkAtoB: CORE}
- {a: "64-0:0:1a4a#1", b: "64-2:0:13#9", linkAtoB: CORE}
- {a: "64-0:0:ce7#4", b: "64-2:0:13#18", linkAtoB: CORE}
- {a: "64-0:0:22f#6", b: "64-2:0:9#2", linkAtoB: CORE}
- {a: "64-0:0:1a4a#27", b: "64-0:0:3d07#16", linkAtoB: CORE}
- {a: "64-0:0:e816#34", b: "64-2:0:28#3", linkAtoB: CORE}
- {a: "64-0:0:22f#15", b: "64-0:0:1a4a#9", linkAtoB: CORE}
- {a: "64-0:0:e816#7", b: "64-2:0:13#34", linkAtoB: CORE}
- {a: "64-0:0:22f#25", b: "64-0:0:303e#3", linkAtoB: CORE}
- {a: "64-0:0:ce7#16", b: "64-2:0:23#5", linkAtoB: CORE}
- {a: "64-0:0:22f#33", b: "64-0:0:e816#6", linkAtoB: CORE}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

<scion.bnd-maven-plugin.version>6.4.0</scion.bnd-maven-plugin.version>
<scion.build-helper-maven-plugin.version>3.3.0</scion.build-helper-maven-plugin.version>
<scion.fmt-maven.version>2.20</scion.fmt-maven.version>
<scion.fmt-maven.version>2.23</scion.fmt-maven.version>
<scion.maven-clean-plugin.version>3.2.0</scion.maven-clean-plugin.version>
<scion.maven-compiler-plugin.version>3.11.0</scion.maven-compiler-plugin.version>
<scion.maven-deploy-plugin.version>3.1.1</scion.maven-deploy-plugin.version>
Expand Down
66 changes: 32 additions & 34 deletions src/main/java/org/scion/jpan/ScmpSenderAsync.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.scion.jpan.internal.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -37,8 +38,7 @@
private final AtomicInteger sequenceIDs = new AtomicInteger(0);
private final ConcurrentHashMap<Integer, TimeOutTask> timers = new ConcurrentHashMap<>();
private final Timer timer = new Timer(true);
private Thread receiver;
private final CountDownLatch receiverBarrier = new CountDownLatch(1);
private final Thread receiver;
private final ResponseHandler handler;

public static Builder newBuilder(ResponseHandler handler) {
Expand All @@ -64,31 +64,31 @@
throws IOException {
this.channel = new InternalChannel(service, port, channel, selector);
this.handler = handler;
startReceiver();
this.receiver = startHandler(this::receiveTask, "ScmpSender-receiver");
}

private void startReceiver() {
this.receiver = new Thread(this::handleReceive, "ScmpSender-receiver");
this.receiver.setDaemon(true);
this.receiver.start();
private Thread startHandler(Consumer<CountDownLatch> task, String name) {
CountDownLatch barrier = new CountDownLatch(1);
Thread thread = new Thread(() -> task.accept(barrier), name);
thread.setDaemon(true);
thread.start();
try {
if (!this.receiverBarrier.await(1, TimeUnit.SECONDS)) {
throw new IllegalStateException("Could not start receiver thread.");
if (!barrier.await(1, TimeUnit.SECONDS)) {
throw new IllegalStateException("Could not start receiver thread: " + name);

Check warning on line 77 in src/main/java/org/scion/jpan/ScmpSenderAsync.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/scion/jpan/ScmpSenderAsync.java#L77

Added line #L77 was not covered by tests
}
return thread;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ScionRuntimeException(e);
}
}

private void stopReceiver() {
if (receiver != null) {
this.receiver.interrupt();
try {
this.receiver.join(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
private void stopHandler(Thread thread) {
thread.interrupt();
try {
thread.join(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();

Check warning on line 91 in src/main/java/org/scion/jpan/ScmpSenderAsync.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/scion/jpan/ScmpSenderAsync.java#L90-L91

Added lines #L90 - L91 were not covered by tests
}
}

Expand Down Expand Up @@ -178,9 +178,13 @@

@Override
public void close() throws IOException {
channel.close();
timer.cancel();
stopReceiver();
stopHandler(receiver);
channel.close();
}

public <T> T getOption(SocketOption<T> option) throws IOException {
return channel.getOption(option);
}

/**
Expand Down Expand Up @@ -227,7 +231,6 @@
writeLock().lock();
try {
Path path = request.getPath();
super.channel().connect(path.getFirstHopAddress());
ByteBuffer buffer = getBufferSend(DEFAULT_BUFFER_SIZE);
// EchoHeader = 8 + data
int len = 8 + request.getData().length;
Expand All @@ -241,9 +244,6 @@
sendRequest(request, buffer, path);
} finally {
writeLock().unlock();
if (super.channel().isConnected()) {
super.channel().disconnect();
}
}
}

Expand All @@ -252,7 +252,6 @@
writeLock().lock();
try {
Path path = request.getPath();
super.channel().connect(path.getFirstHopAddress());
ByteBuffer buffer = getBufferSend(DEFAULT_BUFFER_SIZE);
// TracerouteHeader = 24
int len = 24;
Expand All @@ -269,9 +268,6 @@
sendRequest(request, buffer, path);
} finally {
writeLock().unlock();
if (super.channel().isConnected()) {
super.channel().disconnect();
}
}
}

Expand Down Expand Up @@ -308,6 +304,8 @@
buffer.flip();
if (validate(buffer)) {
InternalConstants.HdrTypes hdrType = ScionHeaderParser.extractNextHeader(buffer);
ResponsePath receivePath = ScionHeaderParser.extractResponsePath(buffer, srcAddress);
int packetLength = ScionHeaderParser.extractPacketLength(buffer);
// From here on we use linear reading using the buffer's position() mechanism
buffer.position(ScionHeaderParser.extractHeaderLength(buffer));
// Check for extension headers.
Expand All @@ -318,7 +316,7 @@
if (hdrType != InternalConstants.HdrTypes.SCMP) {
return; // drop
}
handleIncomingScmp(buffer, srcAddress);
handleIncomingScmp(buffer, receivePath, packetLength);
}
} catch (ScionException e) {
// Validation problem -> ignore
Expand All @@ -328,10 +326,10 @@
}
}

private void handleIncomingScmp(ByteBuffer buffer, InetSocketAddress srcAddress) {
private void handleIncomingScmp(ByteBuffer buffer, ResponsePath receivePath, int packetLength) {
long currentNanos = System.nanoTime();
ResponsePath receivePath = ScionHeaderParser.extractResponsePath(buffer, srcAddress);
Scmp.Message msg = ScmpParser.consume(buffer, receivePath);
int bufferStart = buffer.position();
Scmp.Message msg = ScmpParser.consume(buffer, receivePath, packetLength);
if (msg.getTypeCode().isError()) {
handler.onError((Scmp.ErrorMessage) msg);
checkListeners(msg);
Expand All @@ -346,7 +344,7 @@
((Scmp.TimedMessage) msg).assignRequest(request, currentNanos);
handler.onResponse((Scmp.TimedMessage) msg);
} else if (msg.getTypeCode() == Scmp.TypeCode.TYPE_129) {
((Scmp.EchoMessage) msg).setSizeReceived(buffer.position());
((Scmp.EchoMessage) msg).setSizeReceived(buffer.position() - bufferStart);
((Scmp.TimedMessage) msg).assignRequest(request, currentNanos);
handler.onResponse((Scmp.TimedMessage) msg);
} else {
Expand Down Expand Up @@ -378,9 +376,9 @@
channel.setOverrideSourceAddress(overrideSourceAddress);
}

private void handleReceive() {
private void receiveTask(CountDownLatch barrier) {
try {
receiverBarrier.countDown();
barrier.countDown();
channel.receiveAsync();
} catch (IOException e) {
handler.onException(e);
Expand Down
1 change: 1 addition & 0 deletions src/main/java/org/scion/jpan/internal/GlobalTopology.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
public class GlobalTopology {

private final Map<Integer, Isd> world = new HashMap<>();

/**
* The topology is "empty" if it wasn't initialized with TRC file (or TRC metadata). THis can
* happen when it is initialized from a local topology file without bootstrap server.
Expand Down
13 changes: 12 additions & 1 deletion src/main/java/org/scion/jpan/internal/ScionHeaderParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,17 @@ public static void extractUserPayload(ByteBuffer data, ByteBuffer userBuffer) {
data.position(pos);
}

/**
* Extract the total packet length without changing the buffer's position.
*
* @param data The datagram to read from.
*/
public static int extractPacketLength(ByteBuffer data) {
int headerLen = extractHeaderLength(data);
int payloadLen = ByteUtil.toUnsigned(data.getShort(data.position() + 6));
return headerLen + payloadLen;
}

/**
* Extract the remote socket address and path without changing the buffer's position.
*
Expand Down Expand Up @@ -140,7 +151,7 @@ public static ResponsePath extractResponsePath(
* @return The type of the next header.
*/
public static InternalConstants.HdrTypes extractNextHeader(ByteBuffer data) {
int nextHeader = ByteUtil.toUnsigned(data.get(4));
int nextHeader = ByteUtil.toUnsigned(data.get(data.position() + 4));
return InternalConstants.HdrTypes.parse(nextHeader);
}

Expand Down
7 changes: 4 additions & 3 deletions src/main/java/org/scion/jpan/internal/ScmpParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public static void buildScmpTraceroute(

public static Scmp.Type extractType(ByteBuffer data) {
// Avoid changing the position!
return Scmp.Type.parse(ByteUtil.toUnsigned(data.get(data.position())));
int headerLength = ScionHeaderParser.extractHeaderLength(data);
return Scmp.Type.parse(ByteUtil.toUnsigned(data.get(headerLength)));
}

/**
Expand Down Expand Up @@ -113,7 +114,7 @@ public static void consume(ByteBuffer data, Scmp.Message holder) {
* @param data packet data
* @return SCMP message
*/
public static Scmp.Message consume(ByteBuffer data, ResponsePath path) {
public static Scmp.Message consume(ByteBuffer data, ResponsePath path, int packetLength) {
int type = ByteUtil.toUnsigned(data.get());
int code = ByteUtil.toUnsigned(data.get());
data.getShort(); // checksum
Expand All @@ -128,7 +129,7 @@ public static Scmp.Message consume(ByteBuffer data, ResponsePath path) {
case INFO_129:
Scmp.EchoMessage echo = Scmp.EchoMessage.createEmpty(path);
echo.setMessageArgs(sc, short1, short2);
echo.setData(new byte[data.remaining()]);
echo.setData(new byte[packetLength - data.position()]);
data.get(echo.getData());
return echo;
case INFO_130:
Expand Down
20 changes: 11 additions & 9 deletions src/test/java/org/scion/jpan/ProtobufSegmentDemo.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.grpc.StatusRuntimeException;
import java.time.Instant;
import java.util.*;
import org.scion.jpan.demo.DemoConstants;
import org.scion.jpan.proto.control_plane.Seg;
import org.scion.jpan.proto.control_plane.SegExtensions;
import org.scion.jpan.proto.control_plane.SegmentLookupServiceGrpc;
Expand All @@ -36,14 +35,17 @@ public class ProtobufSegmentDemo {
private final ManagedChannel channel;

public static void main(String[] args) throws ScionException {
// ProtobufSegmentDemo demo = new ProtobufSegmentDemo(csETH);
// demo.getSegments(iaETH, iaETH_CORE);
// demo.getSegments(toWildcard(iaETH), toWildcard(iaAnapayaHK));
ProtobufSegmentDemo demo = new ProtobufSegmentDemo(DemoConstants.csAddr110_minimal);
// demo.getSegments(ia110, ia121);
demo.getSegments(DemoConstants.ia110, DemoConstants.ia1111);
// demo.getSegments(toWildcard(ia121), ia121);
// demo.getSegments(toWildcard(ia120), toWildcard(ia210));
// // ProtobufSegmentDemo demo = new ProtobufSegmentDemo(csETH);
// // demo.getSegments(iaETH, iaETH_CORE);
// // demo.getSegments(toWildcard(iaETH), toWildcard(iaAnapayaHK));
// ProtobufSegmentDemo demo = new ProtobufSegmentDemo(DemoConstants.csAddr110_minimal);
// // demo.getSegments(ia110, ia121);
// demo.getSegments(DemoConstants.ia110, DemoConstants.ia1111);
// // demo.getSegments(toWildcard(ia121), ia121);
// // demo.getSegments(toWildcard(ia120), toWildcard(ia210));

ProtobufSegmentDemo demoLab = new ProtobufSegmentDemo("127.0.0.71:31000");
demoLab.getSegments(ScionUtil.parseIA("1-ff00:0:1001"), ScionUtil.parseIA("1-ff00:0:1007"));
}

public ProtobufSegmentDemo(String csAddress) {
Expand Down
Loading