Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ManagedThread helper #136

Merged
merged 2 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
### Added
- Add a SHIM, required for #130 (topo file port range support).
[#130](https://github.com/scionproto-contrib/jpan/pull/130)
- ManagedThread test helper. [#136](https://github.com/scionproto-contrib/jpan/pull/136)

### Changed
- Buildified PingPong test helper. [#132](https://github.com/scionproto-contrib/jpan/pull/132)
Expand Down
30 changes: 13 additions & 17 deletions src/test/java/org/scion/jpan/api/DatagramChannelApiTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.nio.channels.NotYetConnectedException;
import java.nio.charset.Charset;
import java.time.Instant;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import org.junit.jupiter.api.AfterAll;
Expand All @@ -36,6 +35,7 @@
import org.scion.jpan.internal.Util;
import org.scion.jpan.proto.daemon.Daemon;
import org.scion.jpan.testutil.ExamplePacket;
import org.scion.jpan.testutil.ManagedThread;
import org.scion.jpan.testutil.MockDNS;
import org.scion.jpan.testutil.MockDaemon;
import org.scion.jpan.testutil.MockDatagramChannel;
Expand Down Expand Up @@ -239,28 +239,24 @@ private void testBlocking(boolean isBlocking, ChannelConsumer fn)
throws IOException, InterruptedException {
MockDNS.install("1-ff00:0:112", "localhost", "127.0.0.1");
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 12345);
CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean wasBlocking = new AtomicBoolean(true);
try (ScionDatagramChannel channel = ScionDatagramChannel.open()) {
channel.connect(address);
channel.configureBlocking(isBlocking);
assertEquals(isBlocking, channel.isBlocking());
Thread t =
new Thread(
() -> {
try {
latch.countDown();
fn.accept(channel);
// Should only be reached with non-blocking channel
wasBlocking.getAndSet(false);
} catch (InterruptedException | IOException e) {
// ignore
}
});
t.start();
latch.await();
ManagedThread t = ManagedThread.newBuilder().build();
t.submit(
mtn -> {
try {
mtn.reportStarted();
fn.accept(channel);
// Should only be reached with non-blocking channel
wasBlocking.getAndSet(false);
} catch (InterruptedException | IOException e) {
// ignore
}
});
t.join(30);
t.interrupt();
assertEquals(isBlocking, wasBlocking.get());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.AfterAll;
Expand All @@ -34,6 +32,8 @@
import org.scion.jpan.ScionSocketOptions;
import org.scion.jpan.internal.ScionHeaderParser;
import org.scion.jpan.testutil.ExamplePacket;
import org.scion.jpan.testutil.ManagedThread;
import org.scion.jpan.testutil.ManagedThreadNews;

class DatagramChannelPacketValidationTest {

Expand All @@ -42,16 +42,12 @@ class DatagramChannelPacketValidationTest {
private final AtomicReference<SocketAddress> localAddress = new AtomicReference<>();
private final AtomicInteger receiveCount = new AtomicInteger();
private final AtomicInteger receiveBadCount = new AtomicInteger();
private final AtomicReference<Exception> failure = new AtomicReference<>();
private CountDownLatch barrier;

@BeforeEach
public void beforeEach() {
localAddress.set(null);
receiveCount.set(0);
receiveBadCount.set(0);
failure.set(null);
barrier = null;
}

@AfterAll
Expand All @@ -62,15 +58,15 @@ public static void afterAll() {

@Test
void validate_length() {
String PRE = "SCION packet validation failed: ";
String prefix = "SCION packet validation failed: ";
// packet too short
for (int i = 0; i < packetBytes.length; i++) {
ByteBuffer bb = ByteBuffer.allocate(i);
bb.put(packetBytes, 0, i);
bb.flip();
String result = ScionHeaderParser.validate(bb);
assertNotNull(result);
assertTrue(result.startsWith(PRE + "Invalid packet length:"), result);
assertTrue(result.startsWith(prefix + "Invalid packet length:"), result);
}

// correct length
Expand All @@ -86,39 +82,38 @@ void validate_length() {
bb.flip();
String result = ScionHeaderParser.validate(bb);
assertNotNull(result);
assertTrue(result.startsWith(PRE + "Invalid packet length:"), result);
assertTrue(result.startsWith(prefix + "Invalid packet length:"), result);
}
}

@Test
void receive_validationFails_nonBlocking_noThrow() throws IOException, InterruptedException {
void receive_validationFails_nonBlocking_noThrow() throws IOException {
// silently drop bad packets
receive_validationFails_isBlocking_noThrow(false, false);
}

@Test
void receive_validationFails_nonBlocking_throw() throws IOException, InterruptedException {
void receive_validationFails_nonBlocking_throw() throws IOException {
// throw exception when receiving bad packet
receive_validationFails_isBlocking_noThrow(true, false);
}

@Test
void receive_validationFails_isBlocking_noThrow() throws IOException, InterruptedException {
void receive_validationFails_isBlocking_noThrow() throws IOException {
// silently drop bad packets
receive_validationFails_isBlocking_noThrow(false, true);
}

@Test
void receive_validationFails_isBlocking_throw() throws IOException, InterruptedException {
void receive_validationFails_isBlocking_throw() throws IOException {
// throw exception when receiving bad packet
receive_validationFails_isBlocking_noThrow(true, true);
}

private void receive_validationFails_isBlocking_noThrow(boolean throwBad, boolean isBlocking)
throws IOException, InterruptedException {
barrier = new CountDownLatch(1);
Thread serverThread = startServer(throwBad, isBlocking);
barrier.await(1, TimeUnit.SECONDS); // Wait for thread to start
throws IOException {
ManagedThread serverThread = ManagedThread.newBuilder().build();
serverThread.submit(mtn -> startServer(throwBad, isBlocking, mtn));

// client - send bad message
for (int i = 0; i < 10; i++) {
Expand All @@ -137,58 +132,48 @@ private void receive_validationFails_isBlocking_noThrow(boolean throwBad, boolea
serverThread.join();

// check results
assertNull(failure.get());
assertEquals(1, receiveCount.get());
}

private Thread startServer(boolean openThrowOnBadPacket, boolean isBlocking) {
Thread serverThread =
new Thread(
() -> {
try {
// We don´t need a daemon or BR here, set service to NULL
try (ScionDatagramChannel channel = ScionDatagramChannel.open(null)) {
channel.configureBlocking(isBlocking);
if (openThrowOnBadPacket) {
channel.setOption(ScionSocketOptions.SCION_API_THROW_PARSER_FAILURE, true);
}
channel.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 12345));
localAddress.set(channel.getLocalAddress());
barrier.countDown();

ByteBuffer response = ByteBuffer.allocate(500);
// repeat until we get no exception
boolean failed;
do {
failed = false;
try {
if (isBlocking) {
assertNotNull(channel.receive(response));
} else {
while (channel.receive(response) == null) {
Thread.sleep(10);
}
}
} catch (Exception e) {
receiveBadCount.incrementAndGet();
failed = true;
}
} while (failed);

// make sure we receive exactly one message
receiveCount.incrementAndGet();

response.flip();
String pong = Charset.defaultCharset().decode(response).toString();
if (!MSG.equals(pong)) {
failure.set(new IllegalStateException(pong));
}
}
} catch (IOException e) {
failure.set(e);
}
});
serverThread.start();
return serverThread;
private void startServer(boolean openThrowOnBadPacket, boolean isBlocking, ManagedThreadNews mtn)
throws IOException {
// We don´t need a daemon or BR here, set service to NULL
try (ScionDatagramChannel channel = ScionDatagramChannel.open(null)) {
channel.configureBlocking(isBlocking);
if (openThrowOnBadPacket) {
channel.setOption(ScionSocketOptions.SCION_API_THROW_PARSER_FAILURE, true);
}
channel.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 12345));
localAddress.set(channel.getLocalAddress());
mtn.reportStarted();

ByteBuffer response = ByteBuffer.allocate(500);
// repeat until we get no exception
boolean failed;
do {
failed = false;
try {
if (isBlocking) {
assertNotNull(channel.receive(response));
} else {
while (channel.receive(response) == null) {
Thread.sleep(10);
}
}
} catch (Exception e) {
receiveBadCount.incrementAndGet();
failed = true;
}
} while (failed);

// make sure we receive exactly one message
receiveCount.incrementAndGet();

response.flip();
String pong = Charset.defaultCharset().decode(response).toString();
if (!MSG.equals(pong)) {
mtn.reportException(new IllegalStateException(pong));
}
}
}
}
39 changes: 17 additions & 22 deletions src/test/java/org/scion/jpan/api/DatagramSocketApiTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@
import java.nio.charset.Charset;
import java.time.Instant;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
Expand All @@ -39,6 +37,7 @@
import org.scion.jpan.internal.Util;
import org.scion.jpan.proto.daemon.Daemon;
import org.scion.jpan.testutil.ExamplePacket;
import org.scion.jpan.testutil.ManagedThread;
import org.scion.jpan.testutil.MockDNS;
import org.scion.jpan.testutil.MockDaemon;
import org.scion.jpan.testutil.MockNetwork;
Expand Down Expand Up @@ -186,33 +185,29 @@ void receive_timeout() throws IOException, InterruptedException {
int timeOutMs = 50;
MockDNS.install("1-ff00:0:112", "localhost", "127.0.0.1");
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 12345);
CountDownLatch latch = new CountDownLatch(1);
AtomicLong timeMs = new AtomicLong();
AtomicReference<Exception> exception = new AtomicReference<>();
try (ScionDatagramSocket socket = new ScionDatagramSocket()) {
socket.setSoTimeout(timeOutMs);
socket.connect(address);
// Running a separate thread prevents this from halting infinitely.
Thread t =
new Thread(
() -> {
latch.countDown();
long t1 = System.nanoTime();
try {
socket.receive(dummyPacket);
} catch (Exception e) {
exception.set(e);
}
long t2 = System.nanoTime();
timeMs.set((t2 - t1) / 1_000_000);
});
t.start();
latch.await();
ManagedThread t = ManagedThread.newBuilder().expectException(true).build();
t.submit(
mtn -> {
mtn.reportStarted();
long t1 = System.nanoTime();
try {
socket.receive(dummyPacket);
} catch (Exception e) {
mtn.reportException(e);
}
long t2 = System.nanoTime();
timeMs.set((t2 - t1) / 1_000_000);
});
t.join(3 * timeOutMs);
t.interrupt();
assertInstanceOf(SocketTimeoutException.class, exception.get(), exception.get().getMessage());
assertInstanceOf(
SocketTimeoutException.class, t.getException(), t.getException().getMessage());
// Verify that it waited for at least "timeout".
// We use 0.9 because Windows otherwise may somehow reports sometimes 48ms for 50ms timeout.
// We use 0.9 because Windows otherwise may somehow report sometimes 48ms for 50ms timeout.
assertTrue(timeMs.get() >= timeOutMs * 0.9, timeMs.get() + " >= " + timeOutMs);
// Verify that it waited less than te JUnit test timeout
assertTrue(timeMs.get() < 1.5 * timeOutMs, timeMs.get() + " < 1.5* " + timeOutMs);
Expand Down
Loading
Loading