Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disconnect sessions when library times out in SOLE_LIBRARY mode #537

Merged
merged 2 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ class Framer implements Agent, EngineEndPointHandler, ProtocolHandler
"Received Heartbeat (msg=%s) from library %s at %sms, sent at %sns");
private final CharFormatter acquiringSessionFormatter = new CharFormatter(
"Acquiring session %s from library %s");
private final CharFormatter disconnectingSessionFormatter = new CharFormatter(
"Disconnecting session %s from library %s");
private final CharFormatter releasingSessionFormatter = new CharFormatter(
"Releasing session %s with connectionId %s from library %s");
private final CharFormatter connectingFormatter = new CharFormatter(
Expand Down Expand Up @@ -493,7 +495,7 @@ private void onLibraryDisconnect(final LiveLibraryInfo library)
DebugLogger.log(LIBRARY_MANAGEMENT, timingOutFormatter.clear().with(library.libraryId()));
}

tryAcquireLibrarySessions(library);
tryAcquireOrDisconnectLibrarySessions(library);
saveLibraryTimeout(library);
disconnectILinkConnections(library);
}
Expand All @@ -519,7 +521,7 @@ private void soleLibraryModeUnbind()
}
}

private void tryAcquireLibrarySessions(final LiveLibraryInfo library)
private void tryAcquireOrDisconnectLibrarySessions(final LiveLibraryInfo library)
{
final int librarySessionId = library.aeronSessionId();
final Image image = librarySubscription.imageBySessionId(librarySessionId);
Expand All @@ -533,7 +535,7 @@ private void tryAcquireLibrarySessions(final LiveLibraryInfo library)
// just acquire
if (!configuration.logOutboundMessages() || sentIndexedPosition(librarySessionId, libraryPosition))
{
acquireLibrarySessions(library);
acquireOrDisconnectLibrarySessions(library);
}
else
{
Expand All @@ -547,7 +549,7 @@ private boolean retryAcquireLibrarySessions(final LiveLibraryInfo library)
if (!configuration.logOutboundMessages() ||
sentIndexedPosition(library.aeronSessionId(), library.acquireAtPosition()))
{
acquireLibrarySessions(library);
acquireOrDisconnectLibrarySessions(library);
return true;
}

Expand All @@ -572,7 +574,7 @@ private void saveLibraryTimeout(final LibraryInfo library)
schedule(() -> outboundPublication.saveLibraryTimeout(library, 0));
}

private void acquireLibrarySessions(final LiveLibraryInfo library)
private void acquireOrDisconnectLibrarySessions(final LiveLibraryInfo library)
{
final List<GatewaySession> sessions = library.gatewaySessions();
for (int i = 0, size = sessions.size(); i < size; i++)
Expand All @@ -585,31 +587,46 @@ private void acquireLibrarySessions(final LiveLibraryInfo library)
continue;
}

final long sessionId = session.sessionId();
final int sentSequenceNumber = sentSequenceNumberIndex.lastKnownSequenceNumber(sessionId);
final int receivedSequenceNumber = receivedSequenceNumberIndex.lastKnownSequenceNumber(sessionId);
final boolean hasLoggedIn = receivedSequenceNumber != UNK_SESSION;
final SessionState state = hasLoggedIn ? ACTIVE : CONNECTED;

DebugLogger.log(
LIBRARY_MANAGEMENT,
acquiringSessionFormatter, session.sessionId(), library.libraryId());

((FixGatewaySessions)gatewaySessions).acquire(
session,
state,
false,
session.heartbeatIntervalInS(),
sentSequenceNumber,
receivedSequenceNumber,
session.username(),
session.password());

schedule(saveManageSession(ENGINE_LIBRARY_ID, session));
if (soleLibraryMode)
{
DebugLogger.log(
LIBRARY_MANAGEMENT,
disconnectingSessionFormatter, session.sessionId(), library.libraryId());

final long connectionId = session.connectionId();
receiverEndPoints.removeConnection(connectionId, DisconnectReason.LIBRARY_DISCONNECT);
fixSenderEndPoints.removeConnection(connectionId);
gatewaySessions.releaseByConnectionId(connectionId);
}

if (performingDisconnectOperation)
else
{
session.session().logoutAndDisconnect();
final long sessionId = session.sessionId();
final int sentSequenceNumber = sentSequenceNumberIndex.lastKnownSequenceNumber(sessionId);
final int receivedSequenceNumber = receivedSequenceNumberIndex.lastKnownSequenceNumber(sessionId);
final boolean hasLoggedIn = receivedSequenceNumber != UNK_SESSION;
final SessionState state = hasLoggedIn ? ACTIVE : CONNECTED;

DebugLogger.log(
LIBRARY_MANAGEMENT,
acquiringSessionFormatter, session.sessionId(), library.libraryId());

((FixGatewaySessions)gatewaySessions).acquire(
session,
state,
false,
session.heartbeatIntervalInS(),
sentSequenceNumber,
receivedSequenceNumber,
session.username(),
session.password());

schedule(saveManageSession(ENGINE_LIBRARY_ID, session));

if (performingDisconnectOperation)
{
session.session().logoutAndDisconnect();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
*/
package uk.co.real_logic.artio.system_tests;

import uk.co.real_logic.artio.Reply;
import uk.co.real_logic.artio.engine.*;
import uk.co.real_logic.artio.engine.framer.LibraryInfo;
import uk.co.real_logic.artio.library.FixLibrary;
import uk.co.real_logic.artio.library.LibraryConfiguration;
import org.junit.Test;
import uk.co.real_logic.artio.messages.SessionState;
import uk.co.real_logic.artio.session.Session;

import java.util.List;

Expand Down Expand Up @@ -204,7 +207,7 @@ public void shouldAllowReconnectingInitiatorsToReconnect()
}

@Test(timeout = TEST_TIMEOUT_IN_MS)
public void shouldAcquireSessionsWithLoggingSwitchedOff()
public void shouldInitiatingLibraryDisconnectSessionOnLibraryTimeout()
{
// Equivalent invariant tested in Engine mode in NoLoggingGatewayToGatewaySystemTest
launch(false, false);
Expand All @@ -216,6 +219,27 @@ public void shouldAcquireSessionsWithLoggingSwitchedOff()
testSystem.remove(initiatingLibrary);
awaitLibraryDisconnect(initiatingEngine, testSystem);

acceptingMessagesCanBeExchanged();
assertEventuallyTrue("Accepting library did not recognize disconnected session",
() ->
{
testSystem.poll();
final List<Session> sessions = acceptingLibrary.sessions();
assertEquals(1, sessions.size());
final Session session = sessions.get(0);
assertEquals(SessionState.DISCONNECTED, session.state());
}
);

assertEventuallyTrue("Initiating Engine did not disconnect session",
() ->
{
final Reply<List<LibraryInfo>> libraryInfoReply = initiatingEngine.libraries();
assertTrue(libraryInfoReply.hasCompleted());
final List<LibraryInfo> libraryInfo = libraryInfoReply.resultIfPresent();
assertEquals(1, libraryInfo.size());
final LibraryInfo libInfo = libraryInfo.get(0);
assertEquals(0, libInfo.sessions().size());
}
);
}
}
Loading