Skip to content

Commit

Permalink
feat(microservices): add type for rmq connection options
Browse files Browse the repository at this point in the history
change socketOptions.connectionOptions type from any to copied amqp-manager's AmqpConnectionOptions

Closes #13071
  • Loading branch information
Deniks committed Jan 31, 2024
1 parent 95e8b94 commit 4008cc2
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 7 deletions.
6 changes: 3 additions & 3 deletions packages/microservices/client/client-rmq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type ChannelWrapper = any;
type ConsumeMessage = any;
type AmqpConnectionManager = any;

let rqmPackage: any = {};
let rmqPackage: any = {};

const REPLY_QUEUE = 'amq.rabbitmq.reply-to';

Expand Down Expand Up @@ -81,7 +81,7 @@ export class ClientRMQ extends ClientProxy {
this.getOptionsProp(this.options, 'noAssert') || RQM_DEFAULT_NO_ASSERT;

loadPackage('amqplib', ClientRMQ.name, () => require('amqplib'));
rqmPackage = loadPackage('amqp-connection-manager', ClientRMQ.name, () =>
rmqPackage = loadPackage('amqp-connection-manager', ClientRMQ.name, () =>
require('amqp-connection-manager'),
);

Expand Down Expand Up @@ -133,7 +133,7 @@ export class ClientRMQ extends ClientProxy {

public createClient(): AmqpConnectionManager {
const socketOptions = this.getOptionsProp(this.options, 'socketOptions');
return rqmPackage.connect(this.urls, {
return rmqPackage.connect(this.urls, {
connectionOptions: socketOptions,
});
}
Expand Down
25 changes: 24 additions & 1 deletion packages/microservices/external/rmq-url.interface.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import { ConnectionOptions } from 'tls';
import { TcpSocketConnectOpts } from 'net';

/**
* @publicApi
*/
Expand All @@ -18,14 +21,34 @@ interface ClientProperties {
[key: string]: any;
}

type AmqpConnectionOptions = (ConnectionOptions | TcpSocketConnectOpts) & {
noDelay?: boolean;
timeout?: number;
keepAlive?: boolean;
keepAliveDelay?: number;
clientProperties?: any;
credentials?:
| {
mechanism: string;
username: string;
password: string;
response: () => Buffer;
}
| {
mechanism: string;
response: () => Buffer;
}
| undefined;
};

/**
* @publicApi
*/
export interface AmqpConnectionManagerSocketOptions {
reconnectTimeInSeconds?: number;
heartbeatIntervalInSeconds?: number;
findServers?: () => string | string[];
connectionOptions?: any;
connectionOptions?: AmqpConnectionOptions;
clientProperties?: ClientProperties;
[key: string]: any;
}
Expand Down
6 changes: 3 additions & 3 deletions packages/microservices/server/server-rmq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import {
import { RmqRecordSerializer } from '../serializers/rmq-record.serializer';
import { Server } from './server';

let rqmPackage: any = {};
let rmqPackage: any = {};

const INFINITE_CONNECTION_ATTEMPTS = -1;

Expand Down Expand Up @@ -67,7 +67,7 @@ export class ServerRMQ extends Server implements CustomTransportStrategy {
this.getOptionsProp(this.options, 'noAssert') || RQM_DEFAULT_NO_ASSERT;

this.loadPackage('amqplib', ServerRMQ.name, () => require('amqplib'));
rqmPackage = this.loadPackage(
rmqPackage = this.loadPackage(
'amqp-connection-manager',
ServerRMQ.name,
() => require('amqp-connection-manager'),
Expand Down Expand Up @@ -136,7 +136,7 @@ export class ServerRMQ extends Server implements CustomTransportStrategy {

public createClient<T = any>(): T {
const socketOptions = this.getOptionsProp(this.options, 'socketOptions');
return rqmPackage.connect(this.urls, {
return rmqPackage.connect(this.urls, {
connectionOptions: socketOptions,
heartbeatIntervalInSeconds: socketOptions?.heartbeatIntervalInSeconds,
reconnectTimeInSeconds: socketOptions?.reconnectTimeInSeconds,
Expand Down

0 comments on commit 4008cc2

Please sign in to comment.