diff --git a/packages/bus/src/dispatchers/RMQEventBus.spec.ts b/packages/bus/src/dispatchers/RMQEventBus.spec.ts index 7a452277..8f67d9ae 100644 --- a/packages/bus/src/dispatchers/RMQEventBus.spec.ts +++ b/packages/bus/src/dispatchers/RMQEventBus.spec.ts @@ -7,6 +7,7 @@ import { Command, Event, EventHandler, + EventHandlerNotFound, Logger, NoResponse, RetryStrategy @@ -615,7 +616,7 @@ describe('RMQEventBus', () => { ).once(); }); - it('should throw an error if no active subscriptions', async () => { + it('should log an error if no active subscriptions', async () => { // arrange const payload = { foo: 'bar' }; const message = { @@ -630,11 +631,13 @@ describe('RMQEventBus', () => { } } as ConsumeMessage; - // act / assert + // act + await processMessage(message); + // assert verify(spiedHandler.handle(anything())).never(); - await expect(processMessage(message)).rejects.toThrow( - 'Event handler not found' - ); + verify( + mockedLogger.error(anyString(), anyOfClass(EventHandlerNotFound)) + ).once(); }); it('should skip a redelivered event', async () => { diff --git a/packages/bus/src/dispatchers/RMQEventBus.ts b/packages/bus/src/dispatchers/RMQEventBus.ts index 602611aa..b5f23967 100644 --- a/packages/bus/src/dispatchers/RMQEventBus.ts +++ b/packages/bus/src/dispatchers/RMQEventBus.ts @@ -49,8 +49,6 @@ export class RMQEventBus implements EventBus { string, EventHandler[] >(); - private readonly consumerTags: string[] = []; - private readonly REPLY_QUEUE_NAME = 'amq.rabbitmq.reply-to'; constructor( @@ -141,17 +139,12 @@ export class RMQEventBus implements EventBus { public async destroy(): Promise { try { if (this.channel) { - await Promise.all( - this.consumerTags.map(consumerTag => - this.channel?.cancel(consumerTag) - ) - ); + await this.channel.cancelAll(); await this.channel.close(); } delete this.channel; - this.consumerTags.splice(0, this.consumerTags.length); this.subject.removeAllListeners(); } catch (e) { this.logger.error('Cannot terminate event bus gracefully'); @@ -272,27 +265,34 @@ export class RMQEventBus implements EventBus { } private async startReplyQueueConsume(channel: Channel): Promise { - const { consumerTag } = await channel.consume( + await channel.consume( this.REPLY_QUEUE_NAME, (msg: ConsumeMessage | null) => (msg ? this.processReply(msg) : void 0), { noAck: true } ); - - this.consumerTags.push(consumerTag); } private async startBasicConsume(channel: Channel): Promise { - const { consumerTag } = await channel.consume( + await channel.consume( this.options.clientQueue, - (msg: ConsumeMessage | null) => (msg ? this.processMessage(msg) : void 0), + async (msg: ConsumeMessage | null) => { + try { + if (msg) { + await this.processMessage(msg); + } + } catch (e) { + this.logger.error( + 'Error while processing a message due to error occurred: ', + e + ); + } + }, { noAck: true } ); - - this.consumerTags.push(consumerTag); } private async bindExchangesToQueue(channel: Channel): Promise { @@ -371,12 +371,12 @@ export class RMQEventBus implements EventBus { }); } } catch (e) { - this.logger.debug( - 'Error while processing a message (%s) due to error occurred: %s. Event: %j', + this.logger.error( + 'Error occurred while precessing a message (%s)', event.correlationId, - e.message, - event + e ); + this.logger.debug('Failed message (%s): %j', event.correlationId, event); } }