Skip to content

Commit

Permalink
Added support for custom RSocketError
Browse files Browse the repository at this point in the history
  • Loading branch information
palamccc committed Feb 17, 2024
1 parent cdfa824 commit 4e23a1f
Showing 1 changed file with 17 additions and 9 deletions.
26 changes: 17 additions & 9 deletions packages/rsocket-core/src/RSocketMachine.js
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ class RSocketMachineImpl<D, M> implements RSocketMachine<D, M> {
this._sendStreamComplete(streamId);
},
onError: error => {
this._sendStreamError(streamId, error.message);
this._sendStreamError(streamId, error);
},
//Subscriber methods
onNext: payload => {
Expand Down Expand Up @@ -677,7 +677,7 @@ class RSocketMachineImpl<D, M> implements RSocketMachine<D, M> {
if (this._isRequest(frame.type)) {
const leaseError = this._useLeaseOrError(this._responderLeaseHandler);
if (leaseError) {
this._sendStreamError(streamId, leaseError);
this._sendStreamError(streamId, new Error(leaseError));
return;
}
}
Expand Down Expand Up @@ -758,7 +758,7 @@ class RSocketMachineImpl<D, M> implements RSocketMachine<D, M> {
onComplete: payload => {
this._sendStreamPayload(streamId, payload, true);
},
onError: error => this._sendStreamError(streamId, error.message),
onError: error => this._sendStreamError(streamId, error),
onSubscribe: cancel => {
const subscription = {
cancel,
Expand All @@ -773,7 +773,7 @@ class RSocketMachineImpl<D, M> implements RSocketMachine<D, M> {
const payload = this._deserializePayload(frame);
this._requestHandler.requestStream(payload).subscribe({
onComplete: () => this._sendStreamComplete(streamId),
onError: error => this._sendStreamError(streamId, error.message),
onError: error => this._sendStreamError(streamId, error),
onNext: payload => this._sendStreamPayload(streamId, payload),
onSubscribe: subscription => {
this._subscriptions.set(streamId, subscription);
Expand Down Expand Up @@ -835,7 +835,7 @@ class RSocketMachineImpl<D, M> implements RSocketMachine<D, M> {

this._requestHandler.requestChannel(framesToPayloads).subscribe({
onComplete: () => this._sendStreamComplete(streamId),
onError: error => this._sendStreamError(streamId, error.message),
onError: error => this._sendStreamError(streamId, error),
onNext: payload => this._sendStreamPayload(streamId, payload),
onSubscribe: subscription => {
this._subscriptions.set(streamId, subscription);
Expand Down Expand Up @@ -864,16 +864,16 @@ class RSocketMachineImpl<D, M> implements RSocketMachine<D, M> {
});
}

_sendStreamError(streamId: number, errorMessage: string): void {
_sendStreamError(streamId: number, err: Error): void {
this._subscriptions.delete(streamId);
this._connection.sendOne({
code: ERROR_CODES.APPLICATION_ERROR,
code: err instanceof RSocketError ? err.errorCode : ERROR_CODES.APPLICATION_ERROR,
flags: 0,
message: errorMessage,
message: err.message,
streamId,
type: FRAME_TYPES.ERROR,
});
const error = new Error(`terminated from the requester: ${errorMessage}`);
const error = new Error(`terminated from the requester: ${err.message}`);
this._handleStreamError(streamId, error);
}

Expand Down Expand Up @@ -943,3 +943,11 @@ function deserializeMetadataPushPayload<D, M>(
metadata: serializers.metadata.deserialize(frame.metadata),
};
}

export class RSocketError extends Error {
+errorCode: number;
constructor(errorCode: number, message: string) {
super(message);
this.errorCode = errorCode;
}
}

0 comments on commit 4e23a1f

Please sign in to comment.