Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disconnect before library logon #521

Merged
merged 4 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ public final class EngineStreamInfo
{
private final long inboundIndexSubscriptionRegistrationId;
private final long outboundIndexSubscriptionRegistrationId;
private final long librarySubscriptionRegistrationId;
private final int inboundPublicationSessionId;
private final long inboundPublicationPosition;
private final int outboundPublicationSessionId;
Expand All @@ -12,13 +13,15 @@ public final class EngineStreamInfo
EngineStreamInfo(
final long inboundIndexSubscriptionRegistrationId,
final long outboundIndexSubscriptionRegistrationId,
final long librarySubscriptionRegistrationId,
final int inboundPublicationSessionId,
final long inboundPublicationPosition,
final int outboundPublicationSessionId,
final long outboundPublicationPosition)
{
this.inboundIndexSubscriptionRegistrationId = inboundIndexSubscriptionRegistrationId;
this.outboundIndexSubscriptionRegistrationId = outboundIndexSubscriptionRegistrationId;
this.librarySubscriptionRegistrationId = librarySubscriptionRegistrationId;
this.inboundPublicationSessionId = inboundPublicationSessionId;
this.inboundPublicationPosition = inboundPublicationPosition;
this.outboundPublicationSessionId = outboundPublicationSessionId;
Expand All @@ -35,6 +38,11 @@ public long outboundIndexSubscriptionRegistrationId()
return outboundIndexSubscriptionRegistrationId;
}

public long librarySubscriptionRegistrationId()
{
return librarySubscriptionRegistrationId;
}

public int inboundPublicationSessionId()
{
return inboundPublicationSessionId;
Expand All @@ -60,6 +68,7 @@ public String toString()
return "EngineStreamInfo{" +
"inboundIndexSubscriptionRegistrationId=" + inboundIndexSubscriptionRegistrationId +
", outboundIndexSubscriptionRegistrationId=" + outboundIndexSubscriptionRegistrationId +
", librarySubscriptionRegistrationId=" + librarySubscriptionRegistrationId +
", inboundPublicationSessionId=" + inboundPublicationSessionId +
", inboundPublicationPosition=" + inboundPublicationPosition +
", outboundPublicationSessionId=" + outboundPublicationSessionId +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1715,24 +1715,17 @@ public Action onValidResendRequest(

private void checkOfflineSequenceReset(final long sessionId, final long messageType, final int sequenceIndex)
{
if (messageType == LOGON_MESSAGE_TYPE)
if (messageType == LOGON_MESSAGE_TYPE || messageType == SEQUENCE_RESET_MESSAGE_TYPE)
{
// Always a sequence reset
final Map.Entry<CompositeKey, SessionContext> entry = fixContexts.lookupById(sessionId);
if (entry != null)
{
final SessionContext context = entry.getValue();
context.onSequenceReset(clock.nanoTime());
}
}
else if (messageType == SEQUENCE_RESET_MESSAGE_TYPE)
{
// If it's not a gap-fill it's a sequence reset
final Map.Entry<CompositeKey, SessionContext> entry = fixContexts.lookupById(sessionId);
if (entry != null)
{
final SessionContext context = entry.getValue();
context.onSequenceIndex(clock.nanoTime(), sequenceIndex);
final int currentSequenceIndex = context.sequenceIndex();
if (sequenceIndex > currentSequenceIndex)
{
context.onSequenceIndex(clock.nanoTime(), sequenceIndex);
}
}
}
}
Expand Down Expand Up @@ -3502,6 +3495,7 @@ public void onEngineStreamInfoRequest(final EngineStreamInfoRequestCommand comma
command.complete(new EngineStreamInfo(
inboundIndexRegistrationId,
outboundIndexRegistrationId,
librarySubscription.registrationId(),
inboundPublication.sessionId(),
inboundPublication.position(),
outboundPublication.sessionId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@
import uk.co.real_logic.artio.builder.*;
import uk.co.real_logic.artio.decoder.*;
import uk.co.real_logic.artio.engine.SessionInfo;
import uk.co.real_logic.artio.engine.framer.LibraryInfo;
import uk.co.real_logic.artio.library.FixLibrary;
import uk.co.real_logic.artio.messages.DisconnectReason;
import uk.co.real_logic.artio.messages.InitialAcceptedSessionOwner;
import uk.co.real_logic.artio.messages.SessionReplyStatus;
import uk.co.real_logic.artio.messages.ThrottleConfigurationStatus;
import uk.co.real_logic.artio.messages.*;
import uk.co.real_logic.artio.session.Session;
import uk.co.real_logic.artio.session.SessionWriter;
import uk.co.real_logic.artio.util.MutableAsciiBuffer;

import java.io.IOException;
Expand All @@ -55,8 +54,7 @@
import static uk.co.real_logic.artio.messages.InitialAcceptedSessionOwner.ENGINE;
import static uk.co.real_logic.artio.messages.InitialAcceptedSessionOwner.SOLE_LIBRARY;
import static uk.co.real_logic.artio.messages.ThrottleConfigurationStatus.OK;
import static uk.co.real_logic.artio.system_tests.AbstractGatewayToGatewaySystemTest.LONG_TEST_TIMEOUT_IN_MS;
import static uk.co.real_logic.artio.system_tests.AbstractGatewayToGatewaySystemTest.TEST_TIMEOUT_IN_MS;
import static uk.co.real_logic.artio.system_tests.AbstractGatewayToGatewaySystemTest.*;
import static uk.co.real_logic.artio.system_tests.FixConnection.BUFFER_SIZE;
import static uk.co.real_logic.artio.system_tests.MessageBasedInitiatorSystemTest.assertConnectionDisconnects;
import static uk.co.real_logic.artio.system_tests.SystemTestUtil.*;
Expand Down Expand Up @@ -924,6 +922,72 @@ public void shouldDisconnectConnectionTryingToSendOversizedMessage() throws IOEx
}
}

@Test(timeout = TEST_TIMEOUT_IN_MS)
public void shouldSupportFollowerSessionLogonWithoutSequenceResetOnDisconnectBeforeLibraryLogonResponse()
throws IOException
{
setup(false, true);
setupLibrary();

final List<SessionInfo> noSessionContext = engine.allSessions();
assertEquals(0, noSessionContext.size());

final SessionWriter sessionWriter = createFollowerSession(
TEST_TIMEOUT_IN_MS, testSystem, library, INITIATOR_ID, ACCEPTOR_ID);
final SessionReplyStatus requestSessionReply = requestSession(library, sessionWriter.id(), testSystem);
assertEquals(SessionReplyStatus.OK, requestSessionReply);

try (FixConnection connection = FixConnection.initiate(port))
{
connection.logon(false);
Timing.assertEventuallyTrue("Library did not transition session to connected",
() ->
{
library.poll(1);
final List<Session> sessions = library.sessions();
return sessions.size() == 1 && sessions.get(0).state() == SessionState.CONNECTED;
}
);
}

Timing.assertEventuallyTrue("Fix connection was not disconnected",
() ->
{
final Reply<List<LibraryInfo>> libraryReply = engine.libraries();
while (!libraryReply.hasCompleted())
{
sleep(100);
}

final List<LibraryInfo> allLibraryInfo = libraryReply.resultIfPresent();
for (final LibraryInfo libraryInfo : allLibraryInfo)
{
if (libraryInfo.libraryId() == library.libraryId())
{
return libraryInfo.sessions().isEmpty();
}
}
return false;
}
);

Timing.assertEventuallyTrue("Library did not transition session to active",
() ->
{
library.poll(1);
final List<Session> sessions = library.sessions();
return sessions.size() == 1 && sessions.get(0).state() == SessionState.ACTIVE;
}
);

assertEngineSubscriptionCaughtUpToLibraryPublication(
testSystem, mediaDriver.mediaDriver().aeronDirectoryName(), engine, library);

final List<SessionInfo> sessionContextAfterLogonNoSenderEndpoint = engine.allSessions();
assertEquals(1, sessionContextAfterLogonNoSenderEndpoint.size());
assertEquals(0, sessionContextAfterLogonNoSenderEndpoint.get(0).sequenceIndex());
}

private void assertSell(final ExecutionReportDecoder executionReport)
{
assertEquals(executionReport.toString(), Side.SELL, executionReport.sideAsEnum());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -707,4 +707,55 @@ static void awaitIndexerCaughtUp(
() -> {});
}
}

static void assertEngineSubscriptionCaughtUpToLibraryPublication(
final TestSystem testSystem,
final String aeronDirectoryName,
final FixEngine engine,
final FixLibrary library)
{
final EngineStreamInfo engineStreamInfo =
testSystem.awaitCompletedReply(FixEngineInternals.engineStreamInfo(engine)).resultIfPresent();

final LibraryStreamInfo libraryStreamInfo = FixLibraryInternals.libraryStreamInfo(library);

final Aeron.Context aeronCtx = new Aeron.Context().aeronDirectoryName(aeronDirectoryName);
try (Aeron aeron = Aeron.connect(aeronCtx))
{
final CountersReader countersReader = aeron.countersReader();

final SubPosMatcher subPosMatcher = new SubPosMatcher(
countersReader,
engineStreamInfo.librarySubscriptionRegistrationId(),
libraryStreamInfo.outboundPublicationSessionId(),
libraryStreamInfo.outboundPublicationPosition());

countersReader.forEach((counterId, typeId, keyBuffer, label) ->
subPosMatcher.tryMatch(counterId, typeId, keyBuffer));

if (!subPosMatcher.hasCounterId())
{
throw new IllegalStateException("did not match counter: " + subPosMatcher);
}

assertEventuallyTrue(
() ->
{
final StringBuilder builder = new StringBuilder();
builder.append("expected sub-pos counters:\n");
builder.append(subPosMatcher).append('\n');
builder.append("\nbut counters were:\n");
countersReader.forEach((value, counterId, label) ->
builder.append(String.format("%d: %d - %s%n", counterId, value, label)));
return builder.toString();
},
() ->
{
testSystem.poll();
return subPosMatcher.isCaughtUp();
},
DEFAULT_TIMEOUT_IN_MS,
() -> {});
}
}
}