From 90e8463dda2a3318b9e2f9320e8ed3ee68206221 Mon Sep 17 00:00:00 2001 From: marc-adaptive Date: Thu, 2 Jan 2025 16:20:12 -0500 Subject: [PATCH 1/2] Disconnect sessions when library times out in SOLE_LIBRARY mode --- .../artio/engine/framer/Framer.java | 73 ++++++++++++------- .../system_tests/SoleLibrarySystemTest.java | 22 +++++- 2 files changed, 65 insertions(+), 30 deletions(-) diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/Framer.java b/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/Framer.java index 81b469a23f..70a30be972 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/Framer.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/Framer.java @@ -114,6 +114,8 @@ class Framer implements Agent, EngineEndPointHandler, ProtocolHandler "Received Heartbeat (msg=%s) from library %s at %sms, sent at %sns"); private final CharFormatter acquiringSessionFormatter = new CharFormatter( "Acquiring session %s from library %s"); + private final CharFormatter disconnectingSessionFormatter = new CharFormatter( + "Disconnecting session %s from library %s"); private final CharFormatter releasingSessionFormatter = new CharFormatter( "Releasing session %s with connectionId %s from library %s"); private final CharFormatter connectingFormatter = new CharFormatter( @@ -493,7 +495,7 @@ private void onLibraryDisconnect(final LiveLibraryInfo library) DebugLogger.log(LIBRARY_MANAGEMENT, timingOutFormatter.clear().with(library.libraryId())); } - tryAcquireLibrarySessions(library); + tryAcquireOrDisconnectLibrarySessions(library); saveLibraryTimeout(library); disconnectILinkConnections(library); } @@ -519,7 +521,7 @@ private void soleLibraryModeUnbind() } } - private void tryAcquireLibrarySessions(final LiveLibraryInfo library) + private void tryAcquireOrDisconnectLibrarySessions(final LiveLibraryInfo library) { final int librarySessionId = library.aeronSessionId(); final Image image = librarySubscription.imageBySessionId(librarySessionId); @@ -533,7 +535,7 @@ private void tryAcquireLibrarySessions(final LiveLibraryInfo library) // just acquire if (!configuration.logOutboundMessages() || sentIndexedPosition(librarySessionId, libraryPosition)) { - acquireLibrarySessions(library); + acquireOrDisconnectLibrarySessions(library); } else { @@ -547,7 +549,7 @@ private boolean retryAcquireLibrarySessions(final LiveLibraryInfo library) if (!configuration.logOutboundMessages() || sentIndexedPosition(library.aeronSessionId(), library.acquireAtPosition())) { - acquireLibrarySessions(library); + acquireOrDisconnectLibrarySessions(library); return true; } @@ -572,7 +574,7 @@ private void saveLibraryTimeout(final LibraryInfo library) schedule(() -> outboundPublication.saveLibraryTimeout(library, 0)); } - private void acquireLibrarySessions(final LiveLibraryInfo library) + private void acquireOrDisconnectLibrarySessions(final LiveLibraryInfo library) { final List sessions = library.gatewaySessions(); for (int i = 0, size = sessions.size(); i < size; i++) @@ -585,31 +587,46 @@ private void acquireLibrarySessions(final LiveLibraryInfo library) continue; } - final long sessionId = session.sessionId(); - final int sentSequenceNumber = sentSequenceNumberIndex.lastKnownSequenceNumber(sessionId); - final int receivedSequenceNumber = receivedSequenceNumberIndex.lastKnownSequenceNumber(sessionId); - final boolean hasLoggedIn = receivedSequenceNumber != UNK_SESSION; - final SessionState state = hasLoggedIn ? ACTIVE : CONNECTED; - - DebugLogger.log( - LIBRARY_MANAGEMENT, - acquiringSessionFormatter, session.sessionId(), library.libraryId()); - - ((FixGatewaySessions)gatewaySessions).acquire( - session, - state, - false, - session.heartbeatIntervalInS(), - sentSequenceNumber, - receivedSequenceNumber, - session.username(), - session.password()); - - schedule(saveManageSession(ENGINE_LIBRARY_ID, session)); + if (soleLibraryMode) + { + DebugLogger.log( + LIBRARY_MANAGEMENT, + disconnectingSessionFormatter, session.sessionId(), library.libraryId()); + + final long connectionId = session.connectionId(); + receiverEndPoints.removeConnection(connectionId, DisconnectReason.LIBRARY_DISCONNECT); + fixSenderEndPoints.removeConnection(connectionId); + gatewaySessions.releaseByConnectionId(connectionId); + } - if (performingDisconnectOperation) + else { - session.session().logoutAndDisconnect(); + final long sessionId = session.sessionId(); + final int sentSequenceNumber = sentSequenceNumberIndex.lastKnownSequenceNumber(sessionId); + final int receivedSequenceNumber = receivedSequenceNumberIndex.lastKnownSequenceNumber(sessionId); + final boolean hasLoggedIn = receivedSequenceNumber != UNK_SESSION; + final SessionState state = hasLoggedIn ? ACTIVE : CONNECTED; + + DebugLogger.log( + LIBRARY_MANAGEMENT, + acquiringSessionFormatter, session.sessionId(), library.libraryId()); + + ((FixGatewaySessions)gatewaySessions).acquire( + session, + state, + false, + session.heartbeatIntervalInS(), + sentSequenceNumber, + receivedSequenceNumber, + session.username(), + session.password()); + + schedule(saveManageSession(ENGINE_LIBRARY_ID, session)); + + if (performingDisconnectOperation) + { + session.session().logoutAndDisconnect(); + } } } } diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/SoleLibrarySystemTest.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/SoleLibrarySystemTest.java index a634fcb7ef..b80a09386f 100644 --- a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/SoleLibrarySystemTest.java +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/SoleLibrarySystemTest.java @@ -15,11 +15,14 @@ */ package uk.co.real_logic.artio.system_tests; +import uk.co.real_logic.artio.Reply; import uk.co.real_logic.artio.engine.*; import uk.co.real_logic.artio.engine.framer.LibraryInfo; import uk.co.real_logic.artio.library.FixLibrary; import uk.co.real_logic.artio.library.LibraryConfiguration; import org.junit.Test; +import uk.co.real_logic.artio.messages.SessionState; +import uk.co.real_logic.artio.session.Session; import java.util.List; @@ -204,7 +207,7 @@ public void shouldAllowReconnectingInitiatorsToReconnect() } @Test(timeout = TEST_TIMEOUT_IN_MS) - public void shouldAcquireSessionsWithLoggingSwitchedOff() + public void shouldInitiatingLibraryDisconnectSessionOnLibraryTimeout() { // Equivalent invariant tested in Engine mode in NoLoggingGatewayToGatewaySystemTest launch(false, false); @@ -216,6 +219,21 @@ public void shouldAcquireSessionsWithLoggingSwitchedOff() testSystem.remove(initiatingLibrary); awaitLibraryDisconnect(initiatingEngine, testSystem); - acceptingMessagesCanBeExchanged(); + assertEventuallyTrue("Accepting library did not recognize disconnected session", () -> { + testSystem.poll(); + final List sessions = acceptingLibrary.sessions(); + assertEquals(1, sessions.size()); + final Session session = sessions.get(0); + assertEquals(SessionState.DISCONNECTED, session.state()); + }); + + assertEventuallyTrue("Initiating Engine did not disconnect session", () -> { + final Reply> libraryInfoReply = initiatingEngine.libraries(); + assertTrue(libraryInfoReply.hasCompleted()); + final List libraryInfo = libraryInfoReply.resultIfPresent(); + assertEquals(1, libraryInfo.size()); + final LibraryInfo libInfo = libraryInfo.get(0); + assertEquals(0, libInfo.sessions().size()); + }); } } From 82c4e70e7534f540aec4595ffb2b2828b17195d1 Mon Sep 17 00:00:00 2001 From: marc-adaptive Date: Thu, 2 Jan 2025 18:12:21 -0500 Subject: [PATCH 2/2] checkstyle --- .../system_tests/SoleLibrarySystemTest.java | 38 +++++++++++-------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/SoleLibrarySystemTest.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/SoleLibrarySystemTest.java index b80a09386f..c1108249f2 100644 --- a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/SoleLibrarySystemTest.java +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/SoleLibrarySystemTest.java @@ -219,21 +219,27 @@ public void shouldInitiatingLibraryDisconnectSessionOnLibraryTimeout() testSystem.remove(initiatingLibrary); awaitLibraryDisconnect(initiatingEngine, testSystem); - assertEventuallyTrue("Accepting library did not recognize disconnected session", () -> { - testSystem.poll(); - final List sessions = acceptingLibrary.sessions(); - assertEquals(1, sessions.size()); - final Session session = sessions.get(0); - assertEquals(SessionState.DISCONNECTED, session.state()); - }); - - assertEventuallyTrue("Initiating Engine did not disconnect session", () -> { - final Reply> libraryInfoReply = initiatingEngine.libraries(); - assertTrue(libraryInfoReply.hasCompleted()); - final List libraryInfo = libraryInfoReply.resultIfPresent(); - assertEquals(1, libraryInfo.size()); - final LibraryInfo libInfo = libraryInfo.get(0); - assertEquals(0, libInfo.sessions().size()); - }); + assertEventuallyTrue("Accepting library did not recognize disconnected session", + () -> + { + testSystem.poll(); + final List sessions = acceptingLibrary.sessions(); + assertEquals(1, sessions.size()); + final Session session = sessions.get(0); + assertEquals(SessionState.DISCONNECTED, session.state()); + } + ); + + assertEventuallyTrue("Initiating Engine did not disconnect session", + () -> + { + final Reply> libraryInfoReply = initiatingEngine.libraries(); + assertTrue(libraryInfoReply.hasCompleted()); + final List libraryInfo = libraryInfoReply.resultIfPresent(); + assertEquals(1, libraryInfo.size()); + final LibraryInfo libInfo = libraryInfo.get(0); + assertEquals(0, libInfo.sessions().size()); + } + ); } }