From 23fa141c332f60cac7468ab3fd1e1c0abb054218 Mon Sep 17 00:00:00 2001 From: marc-adaptive Date: Fri, 3 Jan 2025 10:37:31 -0500 Subject: [PATCH] Disconnect sessions when library times out in SOLE_LIBRARY mode (#537) If FixEngine uses SOLE_LIBRARY mode, Framer will disconnect FIX session connections if library times out --- .../artio/engine/framer/Framer.java | 73 ++++++++++++------- .../system_tests/SoleLibrarySystemTest.java | 28 ++++++- 2 files changed, 71 insertions(+), 30 deletions(-) diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/Framer.java b/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/Framer.java index 81b469a23f..70a30be972 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/Framer.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/Framer.java @@ -114,6 +114,8 @@ class Framer implements Agent, EngineEndPointHandler, ProtocolHandler "Received Heartbeat (msg=%s) from library %s at %sms, sent at %sns"); private final CharFormatter acquiringSessionFormatter = new CharFormatter( "Acquiring session %s from library %s"); + private final CharFormatter disconnectingSessionFormatter = new CharFormatter( + "Disconnecting session %s from library %s"); private final CharFormatter releasingSessionFormatter = new CharFormatter( "Releasing session %s with connectionId %s from library %s"); private final CharFormatter connectingFormatter = new CharFormatter( @@ -493,7 +495,7 @@ private void onLibraryDisconnect(final LiveLibraryInfo library) DebugLogger.log(LIBRARY_MANAGEMENT, timingOutFormatter.clear().with(library.libraryId())); } - tryAcquireLibrarySessions(library); + tryAcquireOrDisconnectLibrarySessions(library); saveLibraryTimeout(library); disconnectILinkConnections(library); } @@ -519,7 +521,7 @@ private void soleLibraryModeUnbind() } } - private void tryAcquireLibrarySessions(final LiveLibraryInfo library) + private void tryAcquireOrDisconnectLibrarySessions(final LiveLibraryInfo library) { final int librarySessionId = library.aeronSessionId(); final Image image = librarySubscription.imageBySessionId(librarySessionId); @@ -533,7 +535,7 @@ private void tryAcquireLibrarySessions(final LiveLibraryInfo library) // just acquire if (!configuration.logOutboundMessages() || sentIndexedPosition(librarySessionId, libraryPosition)) { - acquireLibrarySessions(library); + acquireOrDisconnectLibrarySessions(library); } else { @@ -547,7 +549,7 @@ private boolean retryAcquireLibrarySessions(final LiveLibraryInfo library) if (!configuration.logOutboundMessages() || sentIndexedPosition(library.aeronSessionId(), library.acquireAtPosition())) { - acquireLibrarySessions(library); + acquireOrDisconnectLibrarySessions(library); return true; } @@ -572,7 +574,7 @@ private void saveLibraryTimeout(final LibraryInfo library) schedule(() -> outboundPublication.saveLibraryTimeout(library, 0)); } - private void acquireLibrarySessions(final LiveLibraryInfo library) + private void acquireOrDisconnectLibrarySessions(final LiveLibraryInfo library) { final List sessions = library.gatewaySessions(); for (int i = 0, size = sessions.size(); i < size; i++) @@ -585,31 +587,46 @@ private void acquireLibrarySessions(final LiveLibraryInfo library) continue; } - final long sessionId = session.sessionId(); - final int sentSequenceNumber = sentSequenceNumberIndex.lastKnownSequenceNumber(sessionId); - final int receivedSequenceNumber = receivedSequenceNumberIndex.lastKnownSequenceNumber(sessionId); - final boolean hasLoggedIn = receivedSequenceNumber != UNK_SESSION; - final SessionState state = hasLoggedIn ? ACTIVE : CONNECTED; - - DebugLogger.log( - LIBRARY_MANAGEMENT, - acquiringSessionFormatter, session.sessionId(), library.libraryId()); - - ((FixGatewaySessions)gatewaySessions).acquire( - session, - state, - false, - session.heartbeatIntervalInS(), - sentSequenceNumber, - receivedSequenceNumber, - session.username(), - session.password()); - - schedule(saveManageSession(ENGINE_LIBRARY_ID, session)); + if (soleLibraryMode) + { + DebugLogger.log( + LIBRARY_MANAGEMENT, + disconnectingSessionFormatter, session.sessionId(), library.libraryId()); + + final long connectionId = session.connectionId(); + receiverEndPoints.removeConnection(connectionId, DisconnectReason.LIBRARY_DISCONNECT); + fixSenderEndPoints.removeConnection(connectionId); + gatewaySessions.releaseByConnectionId(connectionId); + } - if (performingDisconnectOperation) + else { - session.session().logoutAndDisconnect(); + final long sessionId = session.sessionId(); + final int sentSequenceNumber = sentSequenceNumberIndex.lastKnownSequenceNumber(sessionId); + final int receivedSequenceNumber = receivedSequenceNumberIndex.lastKnownSequenceNumber(sessionId); + final boolean hasLoggedIn = receivedSequenceNumber != UNK_SESSION; + final SessionState state = hasLoggedIn ? ACTIVE : CONNECTED; + + DebugLogger.log( + LIBRARY_MANAGEMENT, + acquiringSessionFormatter, session.sessionId(), library.libraryId()); + + ((FixGatewaySessions)gatewaySessions).acquire( + session, + state, + false, + session.heartbeatIntervalInS(), + sentSequenceNumber, + receivedSequenceNumber, + session.username(), + session.password()); + + schedule(saveManageSession(ENGINE_LIBRARY_ID, session)); + + if (performingDisconnectOperation) + { + session.session().logoutAndDisconnect(); + } } } } diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/SoleLibrarySystemTest.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/SoleLibrarySystemTest.java index a634fcb7ef..c1108249f2 100644 --- a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/SoleLibrarySystemTest.java +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/SoleLibrarySystemTest.java @@ -15,11 +15,14 @@ */ package uk.co.real_logic.artio.system_tests; +import uk.co.real_logic.artio.Reply; import uk.co.real_logic.artio.engine.*; import uk.co.real_logic.artio.engine.framer.LibraryInfo; import uk.co.real_logic.artio.library.FixLibrary; import uk.co.real_logic.artio.library.LibraryConfiguration; import org.junit.Test; +import uk.co.real_logic.artio.messages.SessionState; +import uk.co.real_logic.artio.session.Session; import java.util.List; @@ -204,7 +207,7 @@ public void shouldAllowReconnectingInitiatorsToReconnect() } @Test(timeout = TEST_TIMEOUT_IN_MS) - public void shouldAcquireSessionsWithLoggingSwitchedOff() + public void shouldInitiatingLibraryDisconnectSessionOnLibraryTimeout() { // Equivalent invariant tested in Engine mode in NoLoggingGatewayToGatewaySystemTest launch(false, false); @@ -216,6 +219,27 @@ public void shouldAcquireSessionsWithLoggingSwitchedOff() testSystem.remove(initiatingLibrary); awaitLibraryDisconnect(initiatingEngine, testSystem); - acceptingMessagesCanBeExchanged(); + assertEventuallyTrue("Accepting library did not recognize disconnected session", + () -> + { + testSystem.poll(); + final List sessions = acceptingLibrary.sessions(); + assertEquals(1, sessions.size()); + final Session session = sessions.get(0); + assertEquals(SessionState.DISCONNECTED, session.state()); + } + ); + + assertEventuallyTrue("Initiating Engine did not disconnect session", + () -> + { + final Reply> libraryInfoReply = initiatingEngine.libraries(); + assertTrue(libraryInfoReply.hasCompleted()); + final List libraryInfo = libraryInfoReply.resultIfPresent(); + assertEquals(1, libraryInfo.size()); + final LibraryInfo libInfo = libraryInfo.get(0); + assertEquals(0, libInfo.sessions().size()); + } + ); } }