Skip to content

Commit

Permalink
Disconnect sessions when library times out in SOLE_LIBRARY mode
Browse files Browse the repository at this point in the history
  • Loading branch information
marc-adaptive committed Jan 2, 2025
1 parent d05ced1 commit 90e8463
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ class Framer implements Agent, EngineEndPointHandler, ProtocolHandler
"Received Heartbeat (msg=%s) from library %s at %sms, sent at %sns");
private final CharFormatter acquiringSessionFormatter = new CharFormatter(
"Acquiring session %s from library %s");
private final CharFormatter disconnectingSessionFormatter = new CharFormatter(
"Disconnecting session %s from library %s");
private final CharFormatter releasingSessionFormatter = new CharFormatter(
"Releasing session %s with connectionId %s from library %s");
private final CharFormatter connectingFormatter = new CharFormatter(
Expand Down Expand Up @@ -493,7 +495,7 @@ private void onLibraryDisconnect(final LiveLibraryInfo library)
DebugLogger.log(LIBRARY_MANAGEMENT, timingOutFormatter.clear().with(library.libraryId()));
}

tryAcquireLibrarySessions(library);
tryAcquireOrDisconnectLibrarySessions(library);
saveLibraryTimeout(library);
disconnectILinkConnections(library);
}
Expand All @@ -519,7 +521,7 @@ private void soleLibraryModeUnbind()
}
}

private void tryAcquireLibrarySessions(final LiveLibraryInfo library)
private void tryAcquireOrDisconnectLibrarySessions(final LiveLibraryInfo library)
{
final int librarySessionId = library.aeronSessionId();
final Image image = librarySubscription.imageBySessionId(librarySessionId);
Expand All @@ -533,7 +535,7 @@ private void tryAcquireLibrarySessions(final LiveLibraryInfo library)
// just acquire
if (!configuration.logOutboundMessages() || sentIndexedPosition(librarySessionId, libraryPosition))
{
acquireLibrarySessions(library);
acquireOrDisconnectLibrarySessions(library);
}
else
{
Expand All @@ -547,7 +549,7 @@ private boolean retryAcquireLibrarySessions(final LiveLibraryInfo library)
if (!configuration.logOutboundMessages() ||
sentIndexedPosition(library.aeronSessionId(), library.acquireAtPosition()))
{
acquireLibrarySessions(library);
acquireOrDisconnectLibrarySessions(library);
return true;
}

Expand All @@ -572,7 +574,7 @@ private void saveLibraryTimeout(final LibraryInfo library)
schedule(() -> outboundPublication.saveLibraryTimeout(library, 0));
}

private void acquireLibrarySessions(final LiveLibraryInfo library)
private void acquireOrDisconnectLibrarySessions(final LiveLibraryInfo library)
{
final List<GatewaySession> sessions = library.gatewaySessions();
for (int i = 0, size = sessions.size(); i < size; i++)
Expand All @@ -585,31 +587,46 @@ private void acquireLibrarySessions(final LiveLibraryInfo library)
continue;
}

final long sessionId = session.sessionId();
final int sentSequenceNumber = sentSequenceNumberIndex.lastKnownSequenceNumber(sessionId);
final int receivedSequenceNumber = receivedSequenceNumberIndex.lastKnownSequenceNumber(sessionId);
final boolean hasLoggedIn = receivedSequenceNumber != UNK_SESSION;
final SessionState state = hasLoggedIn ? ACTIVE : CONNECTED;

DebugLogger.log(
LIBRARY_MANAGEMENT,
acquiringSessionFormatter, session.sessionId(), library.libraryId());

((FixGatewaySessions)gatewaySessions).acquire(
session,
state,
false,
session.heartbeatIntervalInS(),
sentSequenceNumber,
receivedSequenceNumber,
session.username(),
session.password());

schedule(saveManageSession(ENGINE_LIBRARY_ID, session));
if (soleLibraryMode)
{
DebugLogger.log(
LIBRARY_MANAGEMENT,
disconnectingSessionFormatter, session.sessionId(), library.libraryId());

final long connectionId = session.connectionId();
receiverEndPoints.removeConnection(connectionId, DisconnectReason.LIBRARY_DISCONNECT);
fixSenderEndPoints.removeConnection(connectionId);
gatewaySessions.releaseByConnectionId(connectionId);
}

if (performingDisconnectOperation)
else
{
session.session().logoutAndDisconnect();
final long sessionId = session.sessionId();
final int sentSequenceNumber = sentSequenceNumberIndex.lastKnownSequenceNumber(sessionId);
final int receivedSequenceNumber = receivedSequenceNumberIndex.lastKnownSequenceNumber(sessionId);
final boolean hasLoggedIn = receivedSequenceNumber != UNK_SESSION;
final SessionState state = hasLoggedIn ? ACTIVE : CONNECTED;

DebugLogger.log(
LIBRARY_MANAGEMENT,
acquiringSessionFormatter, session.sessionId(), library.libraryId());

((FixGatewaySessions)gatewaySessions).acquire(
session,
state,
false,
session.heartbeatIntervalInS(),
sentSequenceNumber,
receivedSequenceNumber,
session.username(),
session.password());

schedule(saveManageSession(ENGINE_LIBRARY_ID, session));

if (performingDisconnectOperation)
{
session.session().logoutAndDisconnect();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
*/
package uk.co.real_logic.artio.system_tests;

import uk.co.real_logic.artio.Reply;
import uk.co.real_logic.artio.engine.*;
import uk.co.real_logic.artio.engine.framer.LibraryInfo;
import uk.co.real_logic.artio.library.FixLibrary;
import uk.co.real_logic.artio.library.LibraryConfiguration;
import org.junit.Test;
import uk.co.real_logic.artio.messages.SessionState;
import uk.co.real_logic.artio.session.Session;

import java.util.List;

Expand Down Expand Up @@ -204,7 +207,7 @@ public void shouldAllowReconnectingInitiatorsToReconnect()
}

@Test(timeout = TEST_TIMEOUT_IN_MS)
public void shouldAcquireSessionsWithLoggingSwitchedOff()
public void shouldInitiatingLibraryDisconnectSessionOnLibraryTimeout()
{
// Equivalent invariant tested in Engine mode in NoLoggingGatewayToGatewaySystemTest
launch(false, false);
Expand All @@ -216,6 +219,21 @@ public void shouldAcquireSessionsWithLoggingSwitchedOff()
testSystem.remove(initiatingLibrary);
awaitLibraryDisconnect(initiatingEngine, testSystem);

acceptingMessagesCanBeExchanged();
assertEventuallyTrue("Accepting library did not recognize disconnected session", () -> {
testSystem.poll();
final List<Session> sessions = acceptingLibrary.sessions();
assertEquals(1, sessions.size());
final Session session = sessions.get(0);
assertEquals(SessionState.DISCONNECTED, session.state());
});

assertEventuallyTrue("Initiating Engine did not disconnect session", () -> {
final Reply<List<LibraryInfo>> libraryInfoReply = initiatingEngine.libraries();
assertTrue(libraryInfoReply.hasCompleted());
final List<LibraryInfo> libraryInfo = libraryInfoReply.resultIfPresent();
assertEquals(1, libraryInfo.size());
final LibraryInfo libInfo = libraryInfo.get(0);
assertEquals(0, libInfo.sessions().size());
});
}
}

0 comments on commit 90e8463

Please sign in to comment.