diff --git a/src/consumer.ts b/src/consumer.ts index 1743d011..36119fcb 100644 --- a/src/consumer.ts +++ b/src/consumer.ts @@ -434,10 +434,11 @@ export class Consumer extends TypedEventEmitter { return result instanceof Object ? result : message; } catch (err) { - err.message = - err instanceof TimeoutError - ? `Message handler timed out after ${this.handleMessageTimeout}ms: Operation timed out.` - : `Unexpected message handler failure: ${err.message}`; + if (err instanceof TimeoutError) { + err.message = `Message handler timed out after ${this.handleMessageTimeout}ms: Operation timed out.`; + } else if (err instanceof Error) { + err.message = `Unexpected message handler failure: ${err.message}`; + } throw err; } finally { if (handleMessageTimeoutId) { @@ -456,7 +457,9 @@ export class Consumer extends TypedEventEmitter { return result instanceof Object ? result : messages; } catch (err) { - err.message = `Unexpected message handler failure: ${err.message}`; + if (err instanceof Error) { + err.message = `Unexpected message handler failure: ${err.message}`; + } throw err; } } diff --git a/test/tests/consumer.test.ts b/test/tests/consumer.test.ts index 6c81869e..ae732318 100644 --- a/test/tests/consumer.test.ts +++ b/test/tests/consumer.test.ts @@ -269,6 +269,37 @@ describe('Consumer', () => { ); }); + it('handles non-standard exceptions thrown by the handler function', async () => { + class CustomError { + private _message: string; + + constructor(message) { + this._message = message; + } + + get message() { + return this._message; + } + } + + consumer = new Consumer({ + queueUrl: QUEUE_URL, + region: REGION, + handleMessage: () => { + throw new CustomError('unexpected parsing error'); + }, + sqs, + authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT + }); + + consumer.start(); + const err: any = await pEvent(consumer, 'processing_error'); + consumer.stop(); + + assert.ok(err); + assert.equal(err.message, 'unexpected parsing error'); + }); + it('fires an error event when an error occurs deleting a message', async () => { const deleteErr = new Error('Delete error'); @@ -742,6 +773,63 @@ describe('Consumer', () => { sandbox.assert.callCount(handleMessageBatch, 1); }); + it('handles unexpected exceptions thrown by the handler batch function', async () => { + consumer = new Consumer({ + queueUrl: QUEUE_URL, + messageAttributeNames: ['attribute-1', 'attribute-2'], + region: REGION, + handleMessageBatch: () => { + throw new Error('unexpected parsing error'); + }, + batchSize: 2, + sqs, + authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT + }); + + consumer.start(); + const err: any = await pEvent(consumer, 'error'); + consumer.stop(); + + assert.ok(err); + assert.equal( + err.message, + 'Unexpected message handler failure: unexpected parsing error' + ); + }); + + it('handles non-standard exceptions thrown by the handler batch function', async () => { + class CustomError { + private _message: string; + + constructor(message) { + this._message = message; + } + + get message() { + return this._message; + } + } + + consumer = new Consumer({ + queueUrl: QUEUE_URL, + messageAttributeNames: ['attribute-1', 'attribute-2'], + region: REGION, + handleMessageBatch: () => { + throw new CustomError('unexpected parsing error'); + }, + batchSize: 2, + sqs, + authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT + }); + + consumer.start(); + const err: any = await pEvent(consumer, 'error'); + consumer.stop(); + + assert.ok(err); + assert.equal(err.message, 'unexpected parsing error'); + }); + it('prefers handleMessagesBatch over handleMessage when both are set', async () => { consumer = new Consumer({ queueUrl: QUEUE_URL,