diff --git a/packages/microservices/client/client-rmq.ts b/packages/microservices/client/client-rmq.ts index 05844b4889c..81410caedbb 100644 --- a/packages/microservices/client/client-rmq.ts +++ b/packages/microservices/client/client-rmq.ts @@ -44,7 +44,7 @@ type ChannelWrapper = any; type ConsumeMessage = any; type AmqpConnectionManager = any; -let rqmPackage: any = {}; +let rmqPackage: any = {}; const REPLY_QUEUE = 'amq.rabbitmq.reply-to'; @@ -81,7 +81,7 @@ export class ClientRMQ extends ClientProxy { this.getOptionsProp(this.options, 'noAssert') || RQM_DEFAULT_NO_ASSERT; loadPackage('amqplib', ClientRMQ.name, () => require('amqplib')); - rqmPackage = loadPackage('amqp-connection-manager', ClientRMQ.name, () => + rmqPackage = loadPackage('amqp-connection-manager', ClientRMQ.name, () => require('amqp-connection-manager'), ); @@ -133,7 +133,7 @@ export class ClientRMQ extends ClientProxy { public createClient(): AmqpConnectionManager { const socketOptions = this.getOptionsProp(this.options, 'socketOptions'); - return rqmPackage.connect(this.urls, { + return rmqPackage.connect(this.urls, { connectionOptions: socketOptions, }); } diff --git a/packages/microservices/external/rmq-url.interface.ts b/packages/microservices/external/rmq-url.interface.ts index d7abd16046b..4d812350f84 100644 --- a/packages/microservices/external/rmq-url.interface.ts +++ b/packages/microservices/external/rmq-url.interface.ts @@ -1,3 +1,6 @@ +import { ConnectionOptions } from 'tls'; +import { TcpSocketConnectOpts } from 'net'; + /** * @publicApi */ @@ -18,6 +21,26 @@ interface ClientProperties { [key: string]: any; } +type AmqpConnectionOptions = (ConnectionOptions | TcpSocketConnectOpts) & { + noDelay?: boolean; + timeout?: number; + keepAlive?: boolean; + keepAliveDelay?: number; + clientProperties?: any; + credentials?: + | { + mechanism: string; + username: string; + password: string; + response: () => Buffer; + } + | { + mechanism: string; + response: () => Buffer; + } + | undefined; +}; + /** * @publicApi */ @@ -25,7 +48,7 @@ export interface AmqpConnectionManagerSocketOptions { reconnectTimeInSeconds?: number; heartbeatIntervalInSeconds?: number; findServers?: () => string | string[]; - connectionOptions?: any; + connectionOptions?: AmqpConnectionOptions; clientProperties?: ClientProperties; [key: string]: any; } diff --git a/packages/microservices/server/server-rmq.ts b/packages/microservices/server/server-rmq.ts index 3b2f52a3c61..3d2c178f108 100644 --- a/packages/microservices/server/server-rmq.ts +++ b/packages/microservices/server/server-rmq.ts @@ -31,7 +31,7 @@ import { import { RmqRecordSerializer } from '../serializers/rmq-record.serializer'; import { Server } from './server'; -let rqmPackage: any = {}; +let rmqPackage: any = {}; const INFINITE_CONNECTION_ATTEMPTS = -1; @@ -67,7 +67,7 @@ export class ServerRMQ extends Server implements CustomTransportStrategy { this.getOptionsProp(this.options, 'noAssert') || RQM_DEFAULT_NO_ASSERT; this.loadPackage('amqplib', ServerRMQ.name, () => require('amqplib')); - rqmPackage = this.loadPackage( + rmqPackage = this.loadPackage( 'amqp-connection-manager', ServerRMQ.name, () => require('amqp-connection-manager'), @@ -136,7 +136,7 @@ export class ServerRMQ extends Server implements CustomTransportStrategy { public createClient(): T { const socketOptions = this.getOptionsProp(this.options, 'socketOptions'); - return rqmPackage.connect(this.urls, { + return rmqPackage.connect(this.urls, { connectionOptions: socketOptions, heartbeatIntervalInSeconds: socketOptions?.heartbeatIntervalInSeconds, reconnectTimeInSeconds: socketOptions?.reconnectTimeInSeconds,