Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix GraphQL Subscriptions Incorrectly Closing the Connection When Stream Returns an Error #1014

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions ballerina-tests/tests/36_subscriptions.bal
Original file line number Diff line number Diff line change
Expand Up @@ -584,3 +584,20 @@ isolated function testInvalidSubProtocolInSubscriptions() returns error? {
test:assertEquals((<error>message).message(), errorMsg);
}
}

@test:Config {
groups: ["subscriptions", "runtime_errors"]
}
isolated function testErrorsInStreams() returns error? {
string url = "ws://localhost:9099/subscriptions";
websocket:Client wsClient = check new(url);
string document = "subscription { evenNumber }";
json payload = {query: document};
check wsClient->writeMessage(payload);
json expectedPayload = {data: {evenNumber: 2}};
check validateWebSocketResponse(wsClient, expectedPayload);
expectedPayload = check getJsonContentFromFile("errors_in_streams.json");
check validateWebSocketResponse(wsClient, expectedPayload);
expectedPayload = {data: {evenNumber: 6}};
check validateWebSocketResponse(wsClient, expectedPayload);
}
60 changes: 36 additions & 24 deletions ballerina-tests/tests/object_types.bal
Original file line number Diff line number Diff line change
Expand Up @@ -59,38 +59,38 @@ public distinct isolated service class Lift {
self.lift = lift.cloneReadOnly();
}

isolated resource function get id () returns string {
isolated resource function get id() returns string {
return self.lift.id;
}

isolated resource function get name () returns string {
isolated resource function get name() returns string {
return self.lift.name;
}

isolated resource function get status () returns string {
isolated resource function get status() returns string {
return self.lift.status;
}

isolated resource function get capacity () returns int {
isolated resource function get capacity() returns int {
return self.lift.capacity;
}

isolated resource function get night () returns boolean {
isolated resource function get night() returns boolean {
return self.lift.night;
}

isolated resource function get elevationgain () returns int {
isolated resource function get elevationgain() returns int {
return self.lift.elevationgain;
}

isolated resource function get trailAccess () returns Trail[] {
isolated resource function get trailAccess() returns Trail[] {
LiftRecord[] lifts = [self.lift];
EdgeRecord[] edges = from var edge in edgeTable
join var lift in lifts on edge.liftId equals lift.id
select edge;
join var lift in lifts on edge.liftId equals lift.id
select edge;
TrailRecord[] trails = from var trail in trailTable
join var edge in edges on trail.id equals edge.trailId
select trail;
join var edge in edges on trail.id equals edge.trailId
select trail;
return trails.map(trailRecord => new Trail(trailRecord));
}
}
Expand All @@ -102,42 +102,42 @@ public distinct isolated service class Trail {
self.trail = trail.cloneReadOnly();
}

isolated resource function get id () returns string {
isolated resource function get id() returns string {
return self.trail.id;
}

isolated resource function get name () returns string {
isolated resource function get name() returns string {
return self.trail.name;
}

isolated resource function get status () returns string {
isolated resource function get status() returns string {
return self.trail.status;
}

isolated resource function get difficulty () returns string? {
isolated resource function get difficulty() returns string? {
return self.trail.difficulty;
}

isolated resource function get groomed () returns boolean {
isolated resource function get groomed() returns boolean {
return self.trail.groomed;
}

isolated resource function get trees () returns boolean {
isolated resource function get trees() returns boolean {
return self.trail.trees;
}

isolated resource function get night () returns boolean {
isolated resource function get night() returns boolean {
return self.trail.night;
}

isolated resource function get accessByLifts () returns Lift[] {
isolated resource function get accessByLifts() returns Lift[] {
TrailRecord[] trails = [self.trail];
EdgeRecord[] edges = from var edge in edgeTable
join var trail in trails on edge.trailId equals trail.id
select edge;
join var trail in trails on edge.trailId equals trail.id
select edge;
LiftRecord[] lifts = from var lift in liftTable
join var edge in edges on lift.id equals edge.liftId
select lift;
join var edge in edges on lift.id equals edge.liftId
select lift;
return lifts.map(liftRecord => new Lift(liftRecord));
}
}
Expand Down Expand Up @@ -238,7 +238,7 @@ public isolated distinct service class AnimalClass {
string call = "";
int i = 0;
while i < count {
call += string`${sound} `;
call += string `${sound} `;
i += 1;
}
return call;
Expand Down Expand Up @@ -276,3 +276,15 @@ public isolated service class Vehicle {
}
}
}

class EvenNumberGenerator {
private int i = 0;

public isolated function next() returns record {|int value;|}|error {
self.i += 2;
if self.i == 4 {
return error("Runtime exception");
}
return {value: self.i};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"errors": [
{
"message": "Runtime exception",
"locations": [
{
"line": 1,
"column": 16
}
],
"path": [
"evenNumber"
]
}
],
"data": null
}
12 changes: 8 additions & 4 deletions ballerina-tests/tests/test_services.bal
Original file line number Diff line number Diff line change
Expand Up @@ -1316,6 +1316,11 @@ service /subscriptions on subscriptionListener {
TeacherService t = new TeacherService(0, "Walter White", "Chemistry");
return [s, t].toStream();
}

isolated resource function subscribe evenNumber() returns stream<int, error?> {
EvenNumberGenerator evenNumberGenerator = new;
return new (evenNumberGenerator);
}
}

# GraphQL service with documentation.
Expand All @@ -1326,7 +1331,7 @@ service /documentation on basicListener {
# + name - The name of the person
# + return - The personalized greeting message
isolated resource function get greeting(string name) returns string {
return string`Hello ${name}`;
return string `Hello ${name}`;
}

# Returns a predefined instrument.
Expand Down Expand Up @@ -1706,7 +1711,7 @@ service /invalid_interceptor2 on basicListener {
}
service /invalid_interceptor3 on basicListener {
isolated resource function get person() returns Person {
return {
return {
name: "Albus Percival Wulfric Brian Dumbledore",
age: 80,
address: {number: "101", street: "Mould-on-the-Wold", city: "London"}
Expand Down Expand Up @@ -1746,7 +1751,7 @@ service /intercept_errors2 on basicListener {
}
service /intercept_errors3 on basicListener {
isolated resource function get person() returns Person {
return {
return {
name: "Albus Percival Wulfric Brian Dumbledore",
age: 80,
address: {number: "101", street: "Mould-on-the-Wold", city: "London"}
Expand Down Expand Up @@ -1841,7 +1846,6 @@ service /maps on basicListener {
}
}


@graphql:ServiceConfig {
introspection: false
}
Expand Down
2 changes: 1 addition & 1 deletion ballerina/engine.bal
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ isolated class Engine {
}
}

isolated function getResult(parser:OperationNode operationNode, Context context, any result = ())
isolated function getResult(parser:OperationNode operationNode, Context context, any|error result = ())
returns OutputObject {
DefaultDirectiveProcessorVisitor defaultDirectiveProcessor = new (self.schema);
DuplicateFieldRemoverVisitor duplicateFieldRemover = new;
Expand Down
6 changes: 3 additions & 3 deletions ballerina/executor_visitor.bal
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ class ExecutorVisitor {
private Data data;
private ErrorDetail[] errors;
private Context context;
private any result;
private any|error result;

isolated function init(Engine engine, __Schema schema, Context context, any result = ()) {
isolated function init(Engine engine, __Schema schema, Context context, any|error result = ()) {
self.engine = engine;
self.schema = schema;
self.context = context;
Expand Down Expand Up @@ -107,7 +107,7 @@ class ExecutorVisitor {
}

isolated function executeSubscription(parser:FieldNode fieldNode, parser:RootOperationType operationType,
any fieldValue) {
any|error fieldValue) {
(string|int)[] path = [fieldNode.getName()];
string operationTypeName = getOperationTypeNameFromOperationType(operationType);
__Type parentType = <__Type>getTypeFromTypeArray(self.schema.types, operationTypeName);
Expand Down
6 changes: 3 additions & 3 deletions ballerina/field.bal
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ public class Field {
private final parser:RootOperationType operationType;
private final parser:FieldNode internalNode;
private final service object {}? serviceObject;
private final any fieldValue;
private final any|error fieldValue;
private final __Type fieldType;
private (string|int)[] path;
private string[] resourcePath;

isolated function init(parser:FieldNode internalNode, __Type fieldType, service object {}? serviceObject = (),
(string|int)[] path = [], parser:RootOperationType operationType = parser:OPERATION_QUERY,
string[] resourcePath = [], any fieldValue = ()) {
string[] resourcePath = [], any|error fieldValue = ()) {
self.internalNode = internalNode;
self.serviceObject = serviceObject;
self.fieldType = fieldType;
Expand Down Expand Up @@ -75,7 +75,7 @@ public class Field {
return self.fieldType;
}

isolated function getFieldValue() returns any {
isolated function getFieldValue() returns any|error {
return self.fieldValue;
}
}
91 changes: 55 additions & 36 deletions ballerina/websocket_service.bal
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ isolated service class WsService {
private final readonly & map<string> customHeaders;
private boolean initiatedConnection;

isolated function init(Engine engine, __Schema & readonly schema, map<string> & readonly customHeaders,
isolated function init(Engine engine, __Schema & readonly schema, map<string> & readonly customHeaders,
Context context) {
self.engine = engine;
self.schema = schema;
Expand Down Expand Up @@ -71,50 +71,69 @@ isolated service class WsService {
string wsType = wsPayload is WSPayload ? <string>wsPayload.'type : DEFAULT_VALUE;
string connectionId = wsPayload is WSPayload && wsPayload?.id !is () ? <string>wsPayload?.id : DEFAULT_VALUE;

if wsType == WS_INIT {
lock {
if self.initiatedConnection {
closeConnection(caller, 4429, "Too many initialisation requests");
return;
}
check caller->writeMessage({"type": WS_ACK});
self.initiatedConnection = true;
}
} else if wsType == WS_SUBSCRIBE || wsType == WS_START || !self.customHeaders.hasKey(WS_SUB_PROTOCOL) {
lock {
if self.customHeaders.hasKey(WS_SUB_PROTOCOL) {
if !self.initiatedConnection {
closeConnection(caller, 4401, "Unauthorized");
if !self.customHeaders.hasKey(WS_SUB_PROTOCOL) {
return self.handleSubscriptionRequest(caller, connectionId, wsPayload);
}

match wsType {
WS_INIT => {
lock {
if self.initiatedConnection {
closeConnection(caller, 4429, "Too many initialisation requests");
return;
}
if self.activeConnections.indexOf(connectionId) !is () {
closeConnection(caller, 4409, string `Subscriber for ${connectionId} already exists`);
return;
check caller->writeMessage({"type": WS_ACK});
self.initiatedConnection = true;
}
}
WS_SUBSCRIBE|WS_START => {
lock {
if self.customHeaders.hasKey(WS_SUB_PROTOCOL) {
if !self.initiatedConnection {
closeConnection(caller, 4401, "Unauthorized");
return;
}
if self.activeConnections.indexOf(connectionId) !is () {
closeConnection(caller, 4409, string `Subscriber for ${connectionId} already exists`);
return;
}
self.activeConnections.push(connectionId);
}
self.activeConnections.push(connectionId);
}
return self.handleSubscriptionRequest(caller, connectionId, wsPayload);
}
parser:OperationNode|json node = validateSubscriptionPayload(wsPayload, self.engine);
if node is parser:OperationNode {
check executeOperation(self.engine, self.context, self.schema, self.customHeaders, caller,
connectionId, node);
} else {
check sendWebSocketResponse(caller, self.customHeaders, WS_ERROR, node, connectionId);
WS_STOP|WS_COMPLETE => {
lock {
_ = self.activeConnections.remove(<int>self.activeConnections.indexOf(connectionId));
self.initiatedConnection = false;
}
check sendWebSocketResponse(caller, self.customHeaders, WS_COMPLETE, null, connectionId);
closeConnection(caller);
}
} else if wsType == WS_STOP || wsType == WS_COMPLETE {
lock {
_ = self.activeConnections.remove(<int>self.activeConnections.indexOf(connectionId));
self.initiatedConnection = false;
WS_PING => {
check caller->writeMessage({"type": WS_PONG});
}
check sendWebSocketResponse(caller, self.customHeaders, WS_COMPLETE, null, connectionId);
closeConnection(caller);
} else if wsType == WS_PING {
check caller->writeMessage({"type": WS_PONG});
} else if wsType == WS_PONG {
check caller->writeMessage({"type": WS_PING});
WS_PONG => {
check caller->writeMessage({"type": WS_PING});
}
}
}

isolated function handleSubscriptionRequest(websocket:Caller caller, string connectionId, WSPayload|json wsPayload)
returns websocket:Error? {
parser:OperationNode|json node = validateSubscriptionPayload(wsPayload, self.engine);
if node is parser:OperationNode {
check executeOperation(self.engine, self.context, self.schema, self.customHeaders, caller, connectionId,
node);
} else {
// do nothing
check sendWebSocketResponse(caller, self.customHeaders, WS_ERROR, node, connectionId);
closeConnection(caller);
}
}

isolated function handleError(websocket:Caller caller, error err) returns websocket:Error? {
json payload = {errors: [{message: "Invalid format in WebSocket payload: " + err.message()}]};
check sendWebSocketResponse(caller, self.customHeaders, WS_ERROR, payload);
closeConnection(caller);
}
}
Loading