From ce538589347eacaa743bfb071a43485518c84902 Mon Sep 17 00:00:00 2001 From: Pavel Fediukovich Date: Sat, 11 Feb 2023 14:13:27 +0300 Subject: [PATCH] feat: added init parameter and validation of topics --- lib/gc-pubsub.client.spec.ts | 9 ++- lib/gc-pubsub.client.ts | 75 ++++++++++++++------- lib/gc-pubsub.constants.ts | 4 +- lib/gc-pubsub.interface.ts | 1 + lib/gc-pubsub.server.ts | 31 +++++++-- tests/src/gc-pubsub-broadcast.controller.ts | 1 + tests/src/gc-pubsub.controller.ts | 2 + 7 files changed, 89 insertions(+), 34 deletions(-) diff --git a/lib/gc-pubsub.client.spec.ts b/lib/gc-pubsub.client.spec.ts index 47a58d0..d75b246 100644 --- a/lib/gc-pubsub.client.spec.ts +++ b/lib/gc-pubsub.client.spec.ts @@ -13,7 +13,10 @@ describe('GCPubSubClient', () => { beforeEach(() => { sandbox = sinon.createSandbox(); - client = new GCPubSubClient({}); + client = new GCPubSubClient({ + replyTopic: 'replyTopic', + replySubscription: 'replySubcription', + }); subscriptionMock = { create: sandbox.stub().resolves(), @@ -25,6 +28,7 @@ describe('GCPubSubClient', () => { create: sandbox.stub().resolves(), flush: sandbox.stub().callsFake((callback) => callback()), publishMessage: sandbox.stub().resolves(), + exists: sandbox.stub().resolves([true]), subscription: sandbox.stub().returns(subscriptionMock), }; @@ -53,6 +57,9 @@ describe('GCPubSubClient', () => { it('should call "client.topic" once', async () => { expect(pubsub.topic.called).to.be.true; }); + it('should call "topic.exists" once', async () => { + expect(topicMock.exists.called).to.be.true; + }); it('should call "topic.create" once', async () => { expect(topicMock.create.called).to.be.true; }); diff --git a/lib/gc-pubsub.client.ts b/lib/gc-pubsub.client.ts index a2320c4..31a98a6 100644 --- a/lib/gc-pubsub.client.ts +++ b/lib/gc-pubsub.client.ts @@ -19,10 +19,9 @@ import { ERROR_EVENT, MESSAGE_EVENT } from '@nestjs/microservices/constants'; import { ALREADY_EXISTS, GC_PUBSUB_DEFAULT_CLIENT_CONFIG, + GC_PUBSUB_DEFAULT_INIT, GC_PUBSUB_DEFAULT_NO_ACK, GC_PUBSUB_DEFAULT_PUBLISHER_CONFIG, - GC_PUBSUB_DEFAULT_REPLY_SUBSCRIPTION, - GC_PUBSUB_DEFAULT_REPLY_TOPIC, GC_PUBSUB_DEFAULT_SUBSCRIBER_CONFIG, GC_PUBSUB_DEFAULT_TOPIC, } from './gc-pubsub.constants'; @@ -34,15 +33,16 @@ export class GCPubSubClient extends ClientProxy { protected readonly topicName: string; protected readonly publisherConfig: PublishOptions; - protected readonly replyTopicName: string; + protected readonly replyTopicName?: string; + protected readonly replySubscriptionName?: string; protected readonly clientConfig: ClientConfig; - protected readonly replySubscriptionName: string; protected readonly subscriberConfig: SubscriberOptions; protected readonly noAck: boolean; protected client: PubSub | null = null; protected replySubscription: Subscription | null = null; protected topic: Topic | null = null; + protected init: boolean; constructor(protected readonly options: GCPubSubOptions) { super(); @@ -57,13 +57,12 @@ export class GCPubSubClient extends ClientProxy { this.publisherConfig = this.options.publisher || GC_PUBSUB_DEFAULT_PUBLISHER_CONFIG; - this.replyTopicName = - this.options.replyTopic || GC_PUBSUB_DEFAULT_REPLY_TOPIC; + this.replyTopicName = this.options.replyTopic; - this.replySubscriptionName = - this.options.replySubscription || GC_PUBSUB_DEFAULT_REPLY_SUBSCRIPTION; + this.replySubscriptionName = this.options.replySubscription; this.noAck = this.options.noAck ?? GC_PUBSUB_DEFAULT_NO_ACK; + this.init = this.options.init ?? GC_PUBSUB_DEFAULT_INIT; this.initializeSerializer(options); this.initializeDeserializer(options); @@ -87,27 +86,55 @@ export class GCPubSubClient extends ClientProxy { this.topic = this.client.topic(this.topicName, this.publisherConfig); - const replyTopic = this.client.topic(this.replyTopicName); + const [topicExists] = await this.topic.exists(); - await this.createIfNotExists(replyTopic.create.bind(replyTopic)); + if (!topicExists) { + const message = `PubSub client is not connected: topic ${this.topicName} does not exist`; + this.logger.error(message); + throw new Error(message); + } + + if (this.replyTopicName && this.replySubscriptionName) { + const replyTopic = this.client.topic(this.replyTopicName); - this.replySubscription = replyTopic.subscription( - this.replySubscriptionName, - this.subscriberConfig, - ); + if (this.init) { + await this.createIfNotExists(replyTopic.create.bind(replyTopic)); + } else { + const [exists] = await replyTopic.exists(); + if (!exists) { + const message = `PubSub client is not connected: topic ${this.replyTopicName} does not exist`; + this.logger.error(message); + throw new Error(message); + } + } - await this.createIfNotExists( - this.replySubscription.create.bind(this.replySubscription), - ); + this.replySubscription = replyTopic.subscription( + this.replySubscriptionName, + this.subscriberConfig, + ); - this.replySubscription - .on(MESSAGE_EVENT, async (message: Message) => { - await this.handleResponse(message.data); - if (this.noAck) { - message.ack(); + if (this.init) { + await this.createIfNotExists( + this.replySubscription.create.bind(this.replySubscription), + ); + } else { + const [exists] = await this.replySubscription.exists(); + if (!exists) { + const message = `PubSub client is not connected: subscription ${this.replySubscription} does not exist`; + this.logger.error(message); + throw new Error(message); } - }) - .on(ERROR_EVENT, (err: any) => this.logger.error(err)); + } + + this.replySubscription + .on(MESSAGE_EVENT, async (message: Message) => { + await this.handleResponse(message.data); + if (this.noAck) { + message.ack(); + } + }) + .on(ERROR_EVENT, (err: any) => this.logger.error(err)); + } return this.client; } diff --git a/lib/gc-pubsub.constants.ts b/lib/gc-pubsub.constants.ts index 138666e..80a96a9 100644 --- a/lib/gc-pubsub.constants.ts +++ b/lib/gc-pubsub.constants.ts @@ -1,10 +1,8 @@ export const GC_PUBSUB_DEFAULT_TOPIC = 'default_topic'; -export const GC_PUBSUB_DEFAULT_REPLY_TOPIC = 'default_reply_topic'; export const GC_PUBSUB_DEFAULT_SUBSCRIPTION = 'default_subscription'; -export const GC_PUBSUB_DEFAULT_REPLY_SUBSCRIPTION = - 'default_reply_subscription'; export const GC_PUBSUB_DEFAULT_PUBLISHER_CONFIG = {}; export const GC_PUBSUB_DEFAULT_SUBSCRIBER_CONFIG = {}; export const GC_PUBSUB_DEFAULT_CLIENT_CONFIG = {}; export const GC_PUBSUB_DEFAULT_NO_ACK = true; +export const GC_PUBSUB_DEFAULT_INIT = true; export const ALREADY_EXISTS = 6; diff --git a/lib/gc-pubsub.interface.ts b/lib/gc-pubsub.interface.ts index 7cade2c..597d5af 100644 --- a/lib/gc-pubsub.interface.ts +++ b/lib/gc-pubsub.interface.ts @@ -10,6 +10,7 @@ export interface GCPubSubOptions { subscription?: string; replySubscription?: string; noAck?: boolean; + init?: boolean publisher?: PublishOptions; subscriber?: SubscriberOptions; serializer?: Serializer; diff --git a/lib/gc-pubsub.server.ts b/lib/gc-pubsub.server.ts index d2083a4..2a9eff7 100644 --- a/lib/gc-pubsub.server.ts +++ b/lib/gc-pubsub.server.ts @@ -3,7 +3,6 @@ import { Message, PubSub, Subscription, - Topic, } from '@google-cloud/pubsub'; import { PublishOptions } from '@google-cloud/pubsub/build/src/publisher'; import { SubscriberOptions } from '@google-cloud/pubsub/build/src/subscriber'; @@ -26,6 +25,7 @@ import { GCPubSubOptions } from './gc-pubsub.interface'; import { ALREADY_EXISTS, GC_PUBSUB_DEFAULT_CLIENT_CONFIG, + GC_PUBSUB_DEFAULT_INIT, GC_PUBSUB_DEFAULT_NO_ACK, GC_PUBSUB_DEFAULT_PUBLISHER_CONFIG, GC_PUBSUB_DEFAULT_SUBSCRIBER_CONFIG, @@ -45,9 +45,9 @@ export class GCPubSubServer extends Server implements CustomTransportStrategy { protected readonly subscriberConfig: SubscriberOptions; protected readonly noAck: boolean; protected readonly replyTopics: Set; + protected readonly init: boolean; protected client: PubSub | null = null; - protected readonly topics: Map = new Map(); protected subscription: Subscription | null = null; constructor(protected readonly options: GCPubSubOptions) { @@ -67,6 +67,7 @@ export class GCPubSubServer extends Server implements CustomTransportStrategy { this.options.publisher || GC_PUBSUB_DEFAULT_PUBLISHER_CONFIG; this.noAck = this.options.noAck ?? GC_PUBSUB_DEFAULT_NO_ACK; + this.init = this.options.init ?? GC_PUBSUB_DEFAULT_INIT; this.replyTopics = new Set(); @@ -78,16 +79,34 @@ export class GCPubSubServer extends Server implements CustomTransportStrategy { this.client = this.createClient(); const topic = this.client.topic(this.topicName); - await this.createIfNotExists(topic.create.bind(topic)); + if (this.init) { + await this.createIfNotExists(topic.create.bind(topic)); + } else { + const [exists] = await topic.exists(); + if (!exists) { + const message = `PubSub server is not started: topic ${this.topicName} does not exist`; + this.logger.error(message); + throw new Error(message); + } + } this.subscription = topic.subscription( this.subscriptionName, this.subscriberConfig, ); - await this.createIfNotExists( - this.subscription.create.bind(this.subscription), - ); + if (this.init) { + await this.createIfNotExists( + this.subscription.create.bind(this.subscription), + ); + } else { + const [exists] = await this.subscription.exists(); + if (!exists) { + const message = `PubSub server is not started: subscription ${this.subscriptionName} does not exist`; + this.logger.error(message); + throw new Error(message); + } + } this.subscription .on(MESSAGE_EVENT, async (message: Message) => { diff --git a/tests/src/gc-pubsub-broadcast.controller.ts b/tests/src/gc-pubsub-broadcast.controller.ts index 177be55..998e601 100644 --- a/tests/src/gc-pubsub-broadcast.controller.ts +++ b/tests/src/gc-pubsub-broadcast.controller.ts @@ -11,6 +11,7 @@ export class GCPubSubBroadcastController implements OnApplicationShutdown { constructor() { this.client = new GCPubSubClient({ topic: 'broadcast', + replyTopic: 'default_reply_topic', replySubscription: 'broadcast_reply_subscription', client: { apiEndpoint: 'localhost:8681', diff --git a/tests/src/gc-pubsub.controller.ts b/tests/src/gc-pubsub.controller.ts index bb6d07d..a493427 100644 --- a/tests/src/gc-pubsub.controller.ts +++ b/tests/src/gc-pubsub.controller.ts @@ -27,6 +27,8 @@ export class GCPubSubController implements OnApplicationShutdown { apiEndpoint: 'localhost:8681', projectId: 'microservice', }, + replyTopic: 'default_reply_topic', + replySubscription: 'default_reply_subscription', }); }