diff --git a/ballerina-tests/build.gradle b/ballerina-tests/build.gradle index 201b14de8..653bea731 100644 --- a/ballerina-tests/build.gradle +++ b/ballerina-tests/build.gradle @@ -48,7 +48,7 @@ def stripBallerinaExtensionVersion(String extVersion) { } } -task updateTomlVerions { +task updateTomlFiles { doLast { def newBallerinaToml = ballerinaTomlFilePlaceHolder.text.replace("@toml.version@", tomlVersion) ballerinaTomlFile.text = newBallerinaToml @@ -105,7 +105,7 @@ task initializeVariables { task ballerinaTest { dependsOn(":${packageName}-${packageOrg}:build") - dependsOn(updateTomlVerions) + dependsOn(updateTomlFiles) dependsOn(initializeVariables) finalizedBy(commitTomlFiles) diff --git a/ballerina-tests/tests/36_subscriptions.bal b/ballerina-tests/tests/36_subscriptions.bal index 923358614..d49ecb8ea 100644 --- a/ballerina-tests/tests/36_subscriptions.bal +++ b/ballerina-tests/tests/36_subscriptions.bal @@ -626,7 +626,7 @@ isolated function testSubscriptionWithInvalidPayload() returns error? { check initiateConnectionInitMessage(wsClient); check validateConnectionInitMessage(wsClient); - + json invalidPayload = {"type": WS_START}; check wsClient->writeMessage(invalidPayload); json|error response = wsClient->readMessage(); diff --git a/ballerina/websocket_service.bal b/ballerina/websocket_service.bal index 6246f4329..7ee1265ef 100644 --- a/ballerina/websocket_service.bal +++ b/ballerina/websocket_service.bal @@ -24,7 +24,7 @@ isolated service class WsService { private final Engine engine; private final readonly & __Schema schema; private final Context context; - private map<()> activeConnections; + private final map<()> activeConnections; private final readonly & map customHeaders; private boolean initiatedConnection; diff --git a/ballerina/websocket_utils.bal b/ballerina/websocket_utils.bal index 997f52c59..1ce20badd 100644 --- a/ballerina/websocket_utils.bal +++ b/ballerina/websocket_utils.bal @@ -14,6 +14,7 @@ // specific language governing permissions and limitations // under the License. +import ballerina/log; import ballerina/websocket; import graphql.parser; @@ -39,6 +40,7 @@ isolated function executeOperation(Engine engine, Context context, readonly & __ } else { closeConnection(caller); } + closeStream(sourceStream); } else { check sendWebSocketResponse(caller, customHeaders, WS_ERROR, sourceStream, connectionId); if !customHeaders.hasKey(WS_SUB_PROTOCOL) { @@ -115,3 +117,11 @@ isolated function validateSubProtocol(websocket:Caller caller, readonly & map sourceStream) { + error? result = sourceStream.close(); + if result is error { + error err = error("Failed to close stream", result); + log:printError(err.message(), stackTrace = err.stackTrace()); + } +} diff --git a/changelog.md b/changelog.md index af41e0c9b..3ff63309b 100644 --- a/changelog.md +++ b/changelog.md @@ -26,6 +26,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [[#3062] Fix Allow to Attach a Service with Subscription to a HTTP2 Based Listener](https://github.com/ballerina-platform/ballerina-standard-library/issues/3601) - [[#3628] Fix Compilation Failure when Other Annotations are Present](https://github.com/ballerina-platform/ballerina-standard-library/issues/3628) - [[#3646] Fix Returning Incorrect Validation Errors for Input Object Fields with Default Values](https://github.com/ballerina-platform/ballerina-standard-library/issues/3646) +- [[#3661] Fix Stream not Closing After the Completion of the Subscription Operation](https://github.com/ballerina-platform/ballerina-standard-library/issues/3661) ### Changed - [[#3062] Improve Compilation Error Messages To Be More Specific](https://github.com/ballerina-platform/ballerina-standard-library/issues/3062)