Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close the Stream when a Subscription Operation is Completed #1067

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ballerina-tests/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def stripBallerinaExtensionVersion(String extVersion) {
}
}

task updateTomlVerions {
task updateTomlFiles {
doLast {
def newBallerinaToml = ballerinaTomlFilePlaceHolder.text.replace("@toml.version@", tomlVersion)
ballerinaTomlFile.text = newBallerinaToml
Expand Down Expand Up @@ -105,7 +105,7 @@ task initializeVariables {

task ballerinaTest {
dependsOn(":${packageName}-${packageOrg}:build")
dependsOn(updateTomlVerions)
dependsOn(updateTomlFiles)
dependsOn(initializeVariables)
finalizedBy(commitTomlFiles)

Expand Down
2 changes: 1 addition & 1 deletion ballerina-tests/tests/36_subscriptions.bal
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ isolated function testSubscriptionWithInvalidPayload() returns error? {

check initiateConnectionInitMessage(wsClient);
check validateConnectionInitMessage(wsClient);

json invalidPayload = {"type": WS_START};
check wsClient->writeMessage(invalidPayload);
json|error response = wsClient->readMessage();
Expand Down
2 changes: 1 addition & 1 deletion ballerina/websocket_service.bal
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ isolated service class WsService {
private final Engine engine;
private final readonly & __Schema schema;
private final Context context;
private map<()> activeConnections;
private final map<()> activeConnections;
private final readonly & map<string> customHeaders;
private boolean initiatedConnection;

Expand Down
10 changes: 10 additions & 0 deletions ballerina/websocket_utils.bal
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
// specific language governing permissions and limitations
// under the License.

import ballerina/log;
import ballerina/websocket;
import graphql.parser;

Expand All @@ -39,6 +40,7 @@ isolated function executeOperation(Engine engine, Context context, readonly & __
} else {
closeConnection(caller);
}
closeStream(sourceStream);
} else {
check sendWebSocketResponse(caller, customHeaders, WS_ERROR, sourceStream, connectionId);
if !customHeaders.hasKey(WS_SUB_PROTOCOL) {
Expand Down Expand Up @@ -115,3 +117,11 @@ isolated function validateSubProtocol(websocket:Caller caller, readonly & map<st
}
return;
}

isolated function closeStream(stream<any, error?> sourceStream) {
error? result = sourceStream.close();
if result is error {
error err = error("Failed to close stream", result);
log:printError(err.message(), stackTrace = err.stackTrace());
}
}
1 change: 1 addition & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [[#3062] Fix Allow to Attach a Service with Subscription to a HTTP2 Based Listener](https://github.com/ballerina-platform/ballerina-standard-library/issues/3601)
- [[#3628] Fix Compilation Failure when Other Annotations are Present](https://github.com/ballerina-platform/ballerina-standard-library/issues/3628)
- [[#3646] Fix Returning Incorrect Validation Errors for Input Object Fields with Default Values](https://github.com/ballerina-platform/ballerina-standard-library/issues/3646)
- [[#3661] Fix Stream not Closing After the Completion of the Subscription Operation](https://github.com/ballerina-platform/ballerina-standard-library/issues/3661)

### Changed
- [[#3062] Improve Compilation Error Messages To Be More Specific](https://github.com/ballerina-platform/ballerina-standard-library/issues/3062)
Expand Down