diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/FakeHandler.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/FakeHandler.java index 2124ce2dbc..19680dbc23 100644 --- a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/FakeHandler.java +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/FakeHandler.java @@ -38,7 +38,7 @@ import static uk.co.real_logic.artio.library.FixLibrary.NO_MESSAGE_REPLAY; public class FakeHandler - implements SessionHandler, SessionAcquireHandler, SessionExistsHandler + implements SessionHandler, SessionAcquireHandler, SessionExistsHandler, LibraryConnectHandler { public static final String SESSION_START_ERROR = "Unexpected Error"; @@ -61,6 +61,8 @@ public class FakeHandler private boolean throwInOnSessionStart = false; private boolean onSessionStartCalled = false; + private int onLibraryConnectCalled = 0; + private int onLibraryDisconnectCalled = 0; { heartbeatEncoder.testReqID("abc"); @@ -388,4 +390,26 @@ public boolean onSessionStartCalled() { return onSessionStartCalled; } + + @Override + public void onConnect(final FixLibrary library) + { + onLibraryConnectCalled++; + } + + @Override + public void onDisconnect(final FixLibrary library) + { + onLibraryDisconnectCalled++; + } + + public int isOnLibraryDisconnectCalled() + { + return onLibraryDisconnectCalled; + } + + public int isOnLibraryConnectCalled() + { + return onLibraryConnectCalled; + } } diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java index a652417deb..ca54a011f6 100644 --- a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/MessageBasedAcceptorSystemTest.java @@ -19,6 +19,7 @@ import org.hamcrest.Matchers; import org.junit.Test; import org.mockito.ArgumentMatchers; + import uk.co.real_logic.artio.*; import uk.co.real_logic.artio.builder.*; import uk.co.real_logic.artio.decoder.*; @@ -110,6 +111,102 @@ public void shouldNotNotifyLibraryOfSessionUntilLoggedOn() throws IOException } } + /** + * This test is to track Issue 529 + */ + @Test(timeout = TEST_TIMEOUT_IN_MS) + public void shouldNotReceiveDuplicateMsgsAfterRequestingSessionFromEngine() throws IOException + { + setup(true, true); + + final FakeOtfAcceptor fakeOtfAcceptor = new FakeOtfAcceptor(); + final FakeHandler fakeHandler = new FakeHandler(fakeOtfAcceptor); + try (FixLibrary library = newAcceptingLibrary(fakeHandler, nanoClock)) + { + final TestSystem ts = new TestSystem(library); + try (FixConnection connection = FixConnection.initiate(port)) + { + library.poll(10); + assertFalse(fakeHandler.hasSeenSession()); + + + //////////// GIVEN + // the client connects, + // acceptor library requests client session, + // the client sends one message to the acceptor + // library releases the session to the engine + + logon(connection); + final long sessionId = fakeHandler.awaitSessionIdFor(INITIATOR_ID, ACCEPTOR_ID, () -> + library.poll(2), 1000); + hasCompleted(library.requestSession(sessionId, + 0, + 0, + 2000L), ts); + // initiator sends a 35=8 to the acceptor + connection.sendExecutionReport(2, false); + connection.msgSeqNum(3); + ts.awaitMessageOf(fakeOtfAcceptor, "8"); + // acceptor releases the session + hasCompleted(library.releaseToGateway(library.sessions().get(0), 1000), ts); + + + + //////////// WHEN + // the library times out, + // the library reconnects, + // the library requests the same session from the engine + + fakeOtfAcceptor.messages().clear(); + + // acceptor library times out and then reconnects + sleep(2000); + ts.await("library not timed out", () -> fakeHandler.isOnLibraryDisconnectCalled() == 1); + ts.await("library not timed out", () -> fakeHandler.isOnLibraryConnectCalled() == 2); + hasCompleted(library.requestSession(sessionId, + 0, + 0, + 2000L), ts); + + + + //////////// THEN + // the acceptor library receives the 35=A twice (BUG) !!!!! + + assertEquals(3, fakeOtfAcceptor.messages().size()); + assertEquals("A", fakeOtfAcceptor.messages().get(0).msgType()); + assertEquals("A", fakeOtfAcceptor.messages().get(1).msgType()); + assertEquals("8", fakeOtfAcceptor.messages().get(2).msgType()); + + + //////////// WHEN 2 + // the requestSession is sent with seqno 2 + hasCompleted(library.releaseToGateway(library.sessions().get(0), 1000), ts); + fakeOtfAcceptor.messages().clear(); + sleep(2000); + ts.await("library not timed out", () -> fakeHandler.isOnLibraryDisconnectCalled() == 2); + ts.await("library not timed out", () -> fakeHandler.isOnLibraryConnectCalled() == 3); + hasCompleted(library.requestSession(sessionId, + 2, + 0, + 2000L), ts); + + //////////// THEN + // it works fine + assertEquals(1, fakeOtfAcceptor.messages().size()); + assertEquals("8", fakeOtfAcceptor.messages().get(0).msgType()); + + + } + } + + } + + private static void hasCompleted(final Reply sessionReplyStatusReply, final TestSystem ts) + { + ts.await("not received session " + sessionReplyStatusReply, sessionReplyStatusReply::hasCompleted); + } + @Test(timeout = TEST_TIMEOUT_IN_MS) public void shouldRejectExceptionalLogonMessageAndLogout() throws IOException { diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/SystemTestUtil.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/SystemTestUtil.java index 8e4090174a..2832e0b8b5 100644 --- a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/SystemTestUtil.java +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/SystemTestUtil.java @@ -322,6 +322,7 @@ static LibraryConfiguration acceptingLibraryConfig( setupCommonConfig(ACCEPTOR_ID, INITIATOR_ID, nanoClock, libraryConfiguration); libraryConfiguration + .libraryConnectHandler(sessionHandler) .sessionExistsHandler(sessionHandler) .sessionAcquireHandler(sessionHandler) .libraryAeronChannels(singletonList(IPC_CHANNEL))