Skip to content

Commit

Permalink
Adding a test to track issue #529
Browse files Browse the repository at this point in the history
  • Loading branch information
lucianoviana committed Dec 4, 2024
1 parent edb557a commit aef8f0c
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import static uk.co.real_logic.artio.library.FixLibrary.NO_MESSAGE_REPLAY;

public class FakeHandler
implements SessionHandler, SessionAcquireHandler, SessionExistsHandler
implements SessionHandler, SessionAcquireHandler, SessionExistsHandler, LibraryConnectHandler
{
public static final String SESSION_START_ERROR = "Unexpected Error";

Expand All @@ -61,6 +61,8 @@ public class FakeHandler

private boolean throwInOnSessionStart = false;
private boolean onSessionStartCalled = false;
private int onLibraryConnectCalled = 0;
private int onLibraryDisconnectCalled = 0;

{
heartbeatEncoder.testReqID("abc");
Expand Down Expand Up @@ -388,4 +390,26 @@ public boolean onSessionStartCalled()
{
return onSessionStartCalled;
}

@Override
public void onConnect(final FixLibrary library)
{
onLibraryConnectCalled++;
}

@Override
public void onDisconnect(final FixLibrary library)
{
onLibraryDisconnectCalled++;
}

public int isOnLibraryDisconnectCalled()
{
return onLibraryDisconnectCalled;
}

public int isOnLibraryConnectCalled()
{
return onLibraryConnectCalled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.hamcrest.Matchers;
import org.junit.Test;
import org.mockito.ArgumentMatchers;

import uk.co.real_logic.artio.*;
import uk.co.real_logic.artio.builder.*;
import uk.co.real_logic.artio.decoder.*;
Expand Down Expand Up @@ -110,6 +111,102 @@ public void shouldNotNotifyLibraryOfSessionUntilLoggedOn() throws IOException
}
}

/**
* This test is to track <a href="https://github.com/real-logic/artio/issues/529">Issue 529</a>
*/
@Test(timeout = TEST_TIMEOUT_IN_MS)
public void shouldNotReceiveDuplicateMsgsAfterRequestingSessionFromEngine() throws IOException
{
setup(true, true);

final FakeOtfAcceptor fakeOtfAcceptor = new FakeOtfAcceptor();
final FakeHandler fakeHandler = new FakeHandler(fakeOtfAcceptor);
try (FixLibrary library = newAcceptingLibrary(fakeHandler, nanoClock))
{
final TestSystem ts = new TestSystem(library);
try (FixConnection connection = FixConnection.initiate(port))
{
library.poll(10);
assertFalse(fakeHandler.hasSeenSession());


//////////// GIVEN
// the client connects,
// acceptor library requests client session,
// the client sends one message to the acceptor
// library releases the session to the engine

logon(connection);
final long sessionId = fakeHandler.awaitSessionIdFor(INITIATOR_ID, ACCEPTOR_ID, () ->
library.poll(2), 1000);
hasCompleted(library.requestSession(sessionId,
0,
0,
2000L), ts);
// initiator sends a 35=8 to the acceptor
connection.sendExecutionReport(2, false);
connection.msgSeqNum(3);
ts.awaitMessageOf(fakeOtfAcceptor, "8");
// acceptor releases the session
hasCompleted(library.releaseToGateway(library.sessions().get(0), 1000), ts);



//////////// WHEN
// the library times out,
// the library reconnects,
// the library requests the same session from the engine

fakeOtfAcceptor.messages().clear();

// acceptor library times out and then reconnects
sleep(2000);
ts.await("library not timed out", () -> fakeHandler.isOnLibraryDisconnectCalled() == 1);
ts.await("library not timed out", () -> fakeHandler.isOnLibraryConnectCalled() == 2);
hasCompleted(library.requestSession(sessionId,
0,
0,
2000L), ts);



//////////// THEN
// the acceptor library receives the 35=A twice (BUG) !!!!!

assertEquals(3, fakeOtfAcceptor.messages().size());
assertEquals("A", fakeOtfAcceptor.messages().get(0).msgType());
assertEquals("A", fakeOtfAcceptor.messages().get(1).msgType());
assertEquals("8", fakeOtfAcceptor.messages().get(2).msgType());


//////////// WHEN 2
// the requestSession is sent with seqno 2
hasCompleted(library.releaseToGateway(library.sessions().get(0), 1000), ts);
fakeOtfAcceptor.messages().clear();
sleep(2000);
ts.await("library not timed out", () -> fakeHandler.isOnLibraryDisconnectCalled() == 2);
ts.await("library not timed out", () -> fakeHandler.isOnLibraryConnectCalled() == 3);
hasCompleted(library.requestSession(sessionId,
2,
0,
2000L), ts);

//////////// THEN
// it works fine
assertEquals(1, fakeOtfAcceptor.messages().size());
assertEquals("8", fakeOtfAcceptor.messages().get(0).msgType());


}
}

}

private static void hasCompleted(final Reply<SessionReplyStatus> sessionReplyStatusReply, final TestSystem ts)
{
ts.await("not received session " + sessionReplyStatusReply, sessionReplyStatusReply::hasCompleted);
}

@Test(timeout = TEST_TIMEOUT_IN_MS)
public void shouldRejectExceptionalLogonMessageAndLogout() throws IOException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ static LibraryConfiguration acceptingLibraryConfig(
setupCommonConfig(ACCEPTOR_ID, INITIATOR_ID, nanoClock, libraryConfiguration);

libraryConfiguration
.libraryConnectHandler(sessionHandler)
.sessionExistsHandler(sessionHandler)
.sessionAcquireHandler(sessionHandler)
.libraryAeronChannels(singletonList(IPC_CHANNEL))
Expand Down

0 comments on commit aef8f0c

Please sign in to comment.