From 5dc7c6db544d3d3c9ce711ca3f5777dc47b89dfa Mon Sep 17 00:00:00 2001 From: alyokaz Date: Thu, 31 Aug 2023 20:53:13 +0100 Subject: [PATCH] Enable servers to self select open port on startup Port specification is currently mandatory. This is unnecessary and causes problems when running the intergration tests in CI/DI servers. Calling the no-arg constructor on AKTorrent will now create the servers with an open port chosen at runtime and set this to the port variable. --- .../java/com/alyokaz/aktorrent/AKTorrent.java | 32 ++--- .../intergration/CLIIntegrationTests.java | 51 ++++---- .../java/intergration/IntegrationTest.java | 120 ++++++++---------- 3 files changed, 100 insertions(+), 103 deletions(-) diff --git a/src/main/java/com/alyokaz/aktorrent/AKTorrent.java b/src/main/java/com/alyokaz/aktorrent/AKTorrent.java index 10952c4..efd37a2 100644 --- a/src/main/java/com/alyokaz/aktorrent/AKTorrent.java +++ b/src/main/java/com/alyokaz/aktorrent/AKTorrent.java @@ -7,7 +7,6 @@ import java.io.File; import java.io.IOException; import java.net.*; -import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.*; @@ -15,7 +14,7 @@ public class AKTorrent { static final int BUFFER_SIZE = 1000000; - private final int PORT; + private int port; private final ExecutorService executor = Executors.newCachedThreadPool(); private Server server; private PingServer udpServer; @@ -23,7 +22,11 @@ public class AKTorrent { private final FileService fileService = new FileService(peerService); public AKTorrent(int port) { - this.PORT = port; + this.port = port; + } + + public AKTorrent() { + this.port = 0; } public static void main(String[] args) throws IOException { @@ -38,9 +41,9 @@ public void startClient() { executor.execute(new DownloadHandler(address, fileService)))); } - public void seedFile(File file) { + public int seedFile(File file) { fileService.addFile(file); - startServer(); + return startServer(); } public void downloadFile(FileInfo fileInfo) { @@ -58,25 +61,24 @@ public Optional getFile(String filename) { return Optional.of(file); } - public void startServer() { + public int startServer() { if (this.server == null) { try { - this.server = new Server(new ServerSocket(PORT), peerService, fileService); + ServerSocket serverSocket = new ServerSocket(port); + this.port = serverSocket.getLocalPort(); + this.server = new Server(serverSocket, peerService, fileService); this.peerService.discoverPeers(); this.fileService.updateAvailableFiles(); server.start(); + if (this.udpServer == null) { + this.udpServer = new PingServer(new DatagramSocket(port)); + this.udpServer.start(); + } } catch (IOException e) { throw new RuntimeException(e); } } - if (this.udpServer == null) { - try { - this.udpServer = new PingServer(new DatagramSocket(PORT)); - this.udpServer.start(); - } catch (SocketException e) { - throw new RuntimeException(e); - } - } + return this.port; } public void shutDown() { diff --git a/src/test/java/intergration/CLIIntegrationTests.java b/src/test/java/intergration/CLIIntegrationTests.java index b212d8d..425ec5f 100644 --- a/src/test/java/intergration/CLIIntegrationTests.java +++ b/src/test/java/intergration/CLIIntegrationTests.java @@ -9,7 +9,9 @@ import java.net.InetSocketAddress; import java.nio.file.Files; import java.util.Optional; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -17,23 +19,15 @@ public class CLIIntegrationTests { private static final String LOCAL_HOST = "127.0.0.1"; - private static final int NODE_A_PORT = 4441; - private static final int NODE_B_PORT = 4442; - - private static final int NODE_C_PORT = 4443; - - private static final int NODE_D_PORT = 4444; - private static final int BUFFER_SIZE = 1000000; - private static final String FILENAME = "test_file.mp4"; - private static final String FILENAME_2 = "test_file_2.mp4"; @Test public void testCliSendReceiveFile() throws IOException, InterruptedException { CountDownLatch countDownLatch_A = new CountDownLatch(1); CountDownLatch countDownLatch_B = new CountDownLatch(1); + BlockingQueue port = new LinkedBlockingQueue<>(); new Thread(()-> { InputStream in = new InputStream() { @@ -58,7 +52,9 @@ public int read() { }; PrintStream out = new PrintStream(new ByteArrayOutputStream(1024)); - CLI cli = new CLI(in, out, new AKTorrent(NODE_A_PORT)); + AKTorrent node = new AKTorrent(); + port.add(node.startServer()); + CLI cli = new CLI(in, out, node); try { cli.start(); } catch (IOException e) { @@ -68,8 +64,8 @@ public int read() { countDownLatch_B.await(); - AKTorrent client = new AKTorrent(NODE_B_PORT); - client.addPeer(LOCAL_HOST, NODE_A_PORT); + AKTorrent client = new AKTorrent(); + client.addPeer(LOCAL_HOST, port.take()); File file = getFile(FILENAME); client.downloadFile(FileService.getFileInfo(file)); @@ -86,13 +82,18 @@ public int read() { public void canDownloadFileByNumber() throws InterruptedException { CountDownLatch exit = new CountDownLatch(1); - AKTorrent server = new AKTorrent(NODE_A_PORT); + AKTorrent server = new AKTorrent(); File file = getFile(FILENAME); - server.seedFile(file); + BlockingQueue port = new LinkedBlockingQueue<>(); + port.add(server.seedFile(file)); Thread clientThread = new Thread(() -> { - AKTorrent client = new AKTorrent(NODE_B_PORT); - client.addPeer(LOCAL_HOST, NODE_A_PORT); + AKTorrent client = new AKTorrent(); + try { + client.addPeer(LOCAL_HOST, port.take()); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } CLI cli = new CLI( new ByteArrayInputStream(("2\n1").getBytes()), new PrintStream(new ByteArrayOutputStream()), @@ -126,23 +127,27 @@ public void canDownloadFileByNumber() throws InterruptedException { @Test public void canAddPeer() throws InterruptedException { CountDownLatch exitLatch = new CountDownLatch(1); + BlockingQueue port = new LinkedBlockingQueue<>(); - AKTorrent server = new AKTorrent(NODE_A_PORT); - server.startServer(); + AKTorrent server = new AKTorrent(); + port.add(server.startServer()); - AKTorrent client = new AKTorrent(NODE_B_PORT); + AKTorrent client = new AKTorrent(); new Thread(() -> { - CLI cli = new CLI(new ByteArrayInputStream(("3\n" + LOCAL_HOST + " " + NODE_A_PORT).getBytes()), + try { + final int serverPort = port.take(); + CLI cli = new CLI(new ByteArrayInputStream(("3\n" + LOCAL_HOST + " " + serverPort).getBytes()), new PrintStream(new ByteArrayOutputStream()), client); - try { + cli.start(); - assertTrue(client.getConnectedPeers().contains(new InetSocketAddress(LOCAL_HOST, NODE_A_PORT))); + assertTrue(client.getConnectedPeers().contains(new InetSocketAddress(LOCAL_HOST, serverPort))); exitLatch.countDown(); - } catch (IOException e) { + } catch (IOException | InterruptedException e) { throw new RuntimeException(e); } }).start(); + exitLatch.await(); server.shutDown(); } diff --git a/src/test/java/intergration/IntegrationTest.java b/src/test/java/intergration/IntegrationTest.java index 1eadb63..c4935d7 100644 --- a/src/test/java/intergration/IntegrationTest.java +++ b/src/test/java/intergration/IntegrationTest.java @@ -12,6 +12,7 @@ import java.util.HashSet; import java.util.Optional; import java.util.Set; +import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.junit.jupiter.api.Assertions.*; @@ -19,27 +20,18 @@ public class IntegrationTest { private static final String LOCAL_HOST = "127.0.0.1"; - private static final int NODE_A_PORT = 4441; - private static final int NODE_B_PORT = 4442; - - private static final int NODE_C_PORT = 4443; - - private static final int NODE_D_PORT = 4444; - private static final int BUFFER_SIZE = 1000000; - private static final String FILENAME = "test_file.mp4"; - private static final String FILENAME_2 = "test_file_2.mp4"; @Test public void canSeedAndReceiveFile() throws IOException { File file = getFile(FILENAME); - AKTorrent server = new AKTorrent(NODE_A_PORT); - server.seedFile(file); + AKTorrent server = new AKTorrent(); + final int serverPort = server.seedFile(file); - AKTorrent client = new AKTorrent(NODE_B_PORT); - client.addPeer(LOCAL_HOST, NODE_A_PORT); + AKTorrent client = new AKTorrent(); + client.addPeer(LOCAL_HOST, serverPort); client.downloadFile(FileService.getFileInfo(file)); Optional completedFile; //TODO add some form of timeout @@ -55,48 +47,47 @@ public void canSeedAndReceiveFile() throws IOException { public void canDownloadFromTwoPeers() throws IOException { File file = getFile(FILENAME); - AKTorrent ak1 = new AKTorrent(NODE_A_PORT); - AKTorrent ak2 = new AKTorrent(NODE_B_PORT); - AKTorrent ak3 = new AKTorrent(NODE_C_PORT); - AKTorrent ak4 = new AKTorrent(NODE_D_PORT); + AKTorrent nodeA = new AKTorrent(); + AKTorrent nodeB = new AKTorrent(); + AKTorrent nodeC = new AKTorrent(); + AKTorrent nodeD = new AKTorrent(); - ak1.seedFile(file); - ak2.seedFile(file); - ak3.seedFile(file); + final int nodeAPort = nodeA.seedFile(file); + final int nodeBPort = nodeB.seedFile(file); + final int nodeCPort = nodeC.seedFile(file); - ak4.addPeer(LOCAL_HOST, NODE_A_PORT); - ak4.addPeer(LOCAL_HOST, NODE_B_PORT); - ak4.addPeer(LOCAL_HOST, NODE_C_PORT); + nodeD.addPeer(LOCAL_HOST, nodeAPort); + nodeD.addPeer(LOCAL_HOST, nodeBPort); + nodeD.addPeer(LOCAL_HOST, nodeCPort); - ak4.downloadFile(FileService.getFileInfo(file)); + nodeD.downloadFile(FileService.getFileInfo(file)); Optional downloadedFile; do { - downloadedFile = ak4.getFile(FILENAME); + downloadedFile = nodeD.getFile(FILENAME); } while (downloadedFile.isEmpty()); assertEquals(-1, Files.mismatch(file.toPath(), downloadedFile.get().toPath())); - ak1.shutDown(); - ak2.shutDown(); - ak3.shutDown(); - ak4.shutDown(); + nodeA.shutDown(); + nodeB.shutDown(); + nodeC.shutDown(); + nodeD.shutDown(); } @Test public void canDownloadFromMultiplePeers() throws IOException { - final int minPort = 4444; - final int maxPort = minPort + 10; - final int clientPort = maxPort + 1; + final int min = 0; + final int max = 10; File file = getFile(FILENAME); Set nodes = new HashSet<>(); - IntStream.range(minPort, maxPort).forEach(port -> nodes.add(new AKTorrent(port))); - nodes.forEach(node -> node.seedFile(file)); + IntStream.range(0, 10).forEach(port -> nodes.add(new AKTorrent())); + Set ports = nodes.stream().map(node -> node.seedFile(file)).collect(Collectors.toSet()); - AKTorrent client = new AKTorrent(clientPort); + AKTorrent client = new AKTorrent(); - IntStream.range(minPort, maxPort).forEach(port -> client.addPeer(LOCAL_HOST, port)); + ports.forEach(port -> client.addPeer(LOCAL_HOST, port)); client.downloadFile(FileService.getFileInfo(file)); @@ -114,17 +105,17 @@ public void getAvailableFiles() throws InterruptedException { File testFileA = getFile(FILENAME); File testFileB = getFile(FILENAME_2); - AKTorrent node_A = new AKTorrent(NODE_A_PORT); + AKTorrent node_A = new AKTorrent(); - node_A.seedFile(testFileA); + final int nodeAPort = node_A.seedFile(testFileA); node_A.seedFile(testFileB); - AKTorrent node_B = new AKTorrent(NODE_B_PORT); - node_B.addPeer(LOCAL_HOST, NODE_A_PORT); - node_B.startServer(); + AKTorrent node_B = new AKTorrent(); + node_B.addPeer(LOCAL_HOST, nodeAPort); + final int nodeBPort = node_B.startServer(); - AKTorrent client = new AKTorrent(NODE_C_PORT); - client.addPeer(LOCAL_HOST, NODE_B_PORT); + AKTorrent client = new AKTorrent(); + client.addPeer(LOCAL_HOST, nodeBPort); Set files = client.getAvailableFiles(); @@ -139,22 +130,22 @@ public void getAvailableFiles() throws InterruptedException { @Test public void testDiscoverTransientPeers() throws IOException { - AKTorrent nodeA = new AKTorrent(NODE_A_PORT); - AKTorrent nodeB = new AKTorrent(NODE_B_PORT); - AKTorrent nodeC = new AKTorrent(NODE_C_PORT); + AKTorrent nodeA = new AKTorrent(); + AKTorrent nodeB = new AKTorrent(); + AKTorrent nodeC = new AKTorrent(); File file = getFile(FILENAME); - nodeC.seedFile(file); + final int nodeCPort = nodeC.seedFile(file); - nodeB.addPeer(LOCAL_HOST, NODE_C_PORT); - nodeB.startServer(); + nodeB.addPeer(LOCAL_HOST, nodeCPort); + final int nodeBPort = nodeB.startServer(); - nodeA.addPeer(LOCAL_HOST, NODE_B_PORT); - nodeA.startServer(); + nodeA.addPeer(LOCAL_HOST, nodeBPort); + final int nodeAPort = nodeA.startServer(); - AKTorrent client = new AKTorrent(NODE_D_PORT); + AKTorrent client = new AKTorrent(); - client.addPeer(LOCAL_HOST, NODE_A_PORT); + client.addPeer(LOCAL_HOST, nodeAPort); client.downloadFile(FileService.getFileInfo(file)); @@ -171,24 +162,24 @@ public void testDiscoverTransientPeers() throws IOException { @Test public void pingPeerWhenAdded() { - AKTorrent server = new AKTorrent(NODE_A_PORT); - server.startServer(); - AKTorrent client = new AKTorrent(NODE_B_PORT); - client.addPeer(LOCAL_HOST, NODE_A_PORT); - assertTrue(client.getConnectedPeers().contains(new InetSocketAddress(LOCAL_HOST, NODE_A_PORT))); + AKTorrent server = new AKTorrent(); + final int serverPort = server.startServer(); + AKTorrent client = new AKTorrent(); + client.addPeer(LOCAL_HOST, serverPort); + assertTrue(client.getConnectedPeers().contains(new InetSocketAddress(LOCAL_HOST, serverPort))); server.shutDown(); } @Test public void pingByMultipleNodes() { - AKTorrent server = new AKTorrent(NODE_A_PORT); - server.startServer(); + AKTorrent server = new AKTorrent(); + final int serverPort = server.startServer(); Set nodes = new HashSet<>(); - IntStream.range(0, 1000).forEach(i -> nodes.add(new AKTorrent(NODE_B_PORT + i))); - nodes.forEach(node -> node.addPeer(LOCAL_HOST, NODE_A_PORT)); + IntStream.range(0, 1000).forEach(i -> nodes.add(new AKTorrent())); + nodes.forEach(node -> node.addPeer(LOCAL_HOST, serverPort)); - InetSocketAddress expectedAddress = new InetSocketAddress(LOCAL_HOST, NODE_A_PORT); + InetSocketAddress expectedAddress = new InetSocketAddress(LOCAL_HOST, serverPort); nodes.forEach(node -> assertTrue(node.getConnectedPeers().contains(expectedAddress))); server.shutDown(); } @@ -196,6 +187,5 @@ public void pingByMultipleNodes() { private File getFile(String filename) { return new File(getClass().getResource("/" + filename).getFile()); } - - + }