Skip to content

Commit

Permalink
feat: added init parameter and validation of topics
Browse files Browse the repository at this point in the history
  • Loading branch information
p-fedyukovich committed Feb 11, 2023
1 parent 20c7f4c commit ce53858
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 34 deletions.
9 changes: 8 additions & 1 deletion lib/gc-pubsub.client.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ describe('GCPubSubClient', () => {

beforeEach(() => {
sandbox = sinon.createSandbox();
client = new GCPubSubClient({});
client = new GCPubSubClient({
replyTopic: 'replyTopic',
replySubscription: 'replySubcription',
});

subscriptionMock = {
create: sandbox.stub().resolves(),
Expand All @@ -25,6 +28,7 @@ describe('GCPubSubClient', () => {
create: sandbox.stub().resolves(),
flush: sandbox.stub().callsFake((callback) => callback()),
publishMessage: sandbox.stub().resolves(),
exists: sandbox.stub().resolves([true]),
subscription: sandbox.stub().returns(subscriptionMock),
};

Expand Down Expand Up @@ -53,6 +57,9 @@ describe('GCPubSubClient', () => {
it('should call "client.topic" once', async () => {
expect(pubsub.topic.called).to.be.true;
});
it('should call "topic.exists" once', async () => {
expect(topicMock.exists.called).to.be.true;
});
it('should call "topic.create" once', async () => {
expect(topicMock.create.called).to.be.true;
});
Expand Down
75 changes: 51 additions & 24 deletions lib/gc-pubsub.client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ import { ERROR_EVENT, MESSAGE_EVENT } from '@nestjs/microservices/constants';
import {
ALREADY_EXISTS,
GC_PUBSUB_DEFAULT_CLIENT_CONFIG,
GC_PUBSUB_DEFAULT_INIT,
GC_PUBSUB_DEFAULT_NO_ACK,
GC_PUBSUB_DEFAULT_PUBLISHER_CONFIG,
GC_PUBSUB_DEFAULT_REPLY_SUBSCRIPTION,
GC_PUBSUB_DEFAULT_REPLY_TOPIC,
GC_PUBSUB_DEFAULT_SUBSCRIBER_CONFIG,
GC_PUBSUB_DEFAULT_TOPIC,
} from './gc-pubsub.constants';
Expand All @@ -34,15 +33,16 @@ export class GCPubSubClient extends ClientProxy {

protected readonly topicName: string;
protected readonly publisherConfig: PublishOptions;
protected readonly replyTopicName: string;
protected readonly replyTopicName?: string;
protected readonly replySubscriptionName?: string;
protected readonly clientConfig: ClientConfig;
protected readonly replySubscriptionName: string;
protected readonly subscriberConfig: SubscriberOptions;
protected readonly noAck: boolean;

protected client: PubSub | null = null;
protected replySubscription: Subscription | null = null;
protected topic: Topic | null = null;
protected init: boolean;

constructor(protected readonly options: GCPubSubOptions) {
super();
Expand All @@ -57,13 +57,12 @@ export class GCPubSubClient extends ClientProxy {
this.publisherConfig =
this.options.publisher || GC_PUBSUB_DEFAULT_PUBLISHER_CONFIG;

this.replyTopicName =
this.options.replyTopic || GC_PUBSUB_DEFAULT_REPLY_TOPIC;
this.replyTopicName = this.options.replyTopic;

this.replySubscriptionName =
this.options.replySubscription || GC_PUBSUB_DEFAULT_REPLY_SUBSCRIPTION;
this.replySubscriptionName = this.options.replySubscription;

this.noAck = this.options.noAck ?? GC_PUBSUB_DEFAULT_NO_ACK;
this.init = this.options.init ?? GC_PUBSUB_DEFAULT_INIT;

this.initializeSerializer(options);
this.initializeDeserializer(options);
Expand All @@ -87,27 +86,55 @@ export class GCPubSubClient extends ClientProxy {

this.topic = this.client.topic(this.topicName, this.publisherConfig);

const replyTopic = this.client.topic(this.replyTopicName);
const [topicExists] = await this.topic.exists();

await this.createIfNotExists(replyTopic.create.bind(replyTopic));
if (!topicExists) {
const message = `PubSub client is not connected: topic ${this.topicName} does not exist`;
this.logger.error(message);
throw new Error(message);
}

if (this.replyTopicName && this.replySubscriptionName) {
const replyTopic = this.client.topic(this.replyTopicName);

this.replySubscription = replyTopic.subscription(
this.replySubscriptionName,
this.subscriberConfig,
);
if (this.init) {
await this.createIfNotExists(replyTopic.create.bind(replyTopic));
} else {
const [exists] = await replyTopic.exists();
if (!exists) {
const message = `PubSub client is not connected: topic ${this.replyTopicName} does not exist`;
this.logger.error(message);
throw new Error(message);
}
}

await this.createIfNotExists(
this.replySubscription.create.bind(this.replySubscription),
);
this.replySubscription = replyTopic.subscription(
this.replySubscriptionName,
this.subscriberConfig,
);

this.replySubscription
.on(MESSAGE_EVENT, async (message: Message) => {
await this.handleResponse(message.data);
if (this.noAck) {
message.ack();
if (this.init) {
await this.createIfNotExists(
this.replySubscription.create.bind(this.replySubscription),
);
} else {
const [exists] = await this.replySubscription.exists();
if (!exists) {
const message = `PubSub client is not connected: subscription ${this.replySubscription} does not exist`;
this.logger.error(message);
throw new Error(message);
}
})
.on(ERROR_EVENT, (err: any) => this.logger.error(err));
}

this.replySubscription
.on(MESSAGE_EVENT, async (message: Message) => {
await this.handleResponse(message.data);
if (this.noAck) {
message.ack();
}
})
.on(ERROR_EVENT, (err: any) => this.logger.error(err));
}

return this.client;
}
Expand Down
4 changes: 1 addition & 3 deletions lib/gc-pubsub.constants.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
export const GC_PUBSUB_DEFAULT_TOPIC = 'default_topic';
export const GC_PUBSUB_DEFAULT_REPLY_TOPIC = 'default_reply_topic';
export const GC_PUBSUB_DEFAULT_SUBSCRIPTION = 'default_subscription';
export const GC_PUBSUB_DEFAULT_REPLY_SUBSCRIPTION =
'default_reply_subscription';
export const GC_PUBSUB_DEFAULT_PUBLISHER_CONFIG = {};
export const GC_PUBSUB_DEFAULT_SUBSCRIBER_CONFIG = {};
export const GC_PUBSUB_DEFAULT_CLIENT_CONFIG = {};
export const GC_PUBSUB_DEFAULT_NO_ACK = true;
export const GC_PUBSUB_DEFAULT_INIT = true;
export const ALREADY_EXISTS = 6;
1 change: 1 addition & 0 deletions lib/gc-pubsub.interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export interface GCPubSubOptions {
subscription?: string;
replySubscription?: string;
noAck?: boolean;
init?: boolean
publisher?: PublishOptions;
subscriber?: SubscriberOptions;
serializer?: Serializer;
Expand Down
31 changes: 25 additions & 6 deletions lib/gc-pubsub.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import {
Message,
PubSub,
Subscription,
Topic,
} from '@google-cloud/pubsub';
import { PublishOptions } from '@google-cloud/pubsub/build/src/publisher';
import { SubscriberOptions } from '@google-cloud/pubsub/build/src/subscriber';
Expand All @@ -26,6 +25,7 @@ import { GCPubSubOptions } from './gc-pubsub.interface';
import {
ALREADY_EXISTS,
GC_PUBSUB_DEFAULT_CLIENT_CONFIG,
GC_PUBSUB_DEFAULT_INIT,
GC_PUBSUB_DEFAULT_NO_ACK,
GC_PUBSUB_DEFAULT_PUBLISHER_CONFIG,
GC_PUBSUB_DEFAULT_SUBSCRIBER_CONFIG,
Expand All @@ -45,9 +45,9 @@ export class GCPubSubServer extends Server implements CustomTransportStrategy {
protected readonly subscriberConfig: SubscriberOptions;
protected readonly noAck: boolean;
protected readonly replyTopics: Set<string>;
protected readonly init: boolean;

protected client: PubSub | null = null;
protected readonly topics: Map<string, Topic> = new Map();
protected subscription: Subscription | null = null;

constructor(protected readonly options: GCPubSubOptions) {
Expand All @@ -67,6 +67,7 @@ export class GCPubSubServer extends Server implements CustomTransportStrategy {
this.options.publisher || GC_PUBSUB_DEFAULT_PUBLISHER_CONFIG;

this.noAck = this.options.noAck ?? GC_PUBSUB_DEFAULT_NO_ACK;
this.init = this.options.init ?? GC_PUBSUB_DEFAULT_INIT;

this.replyTopics = new Set();

Expand All @@ -78,16 +79,34 @@ export class GCPubSubServer extends Server implements CustomTransportStrategy {
this.client = this.createClient();
const topic = this.client.topic(this.topicName);

await this.createIfNotExists(topic.create.bind(topic));
if (this.init) {
await this.createIfNotExists(topic.create.bind(topic));
} else {
const [exists] = await topic.exists();
if (!exists) {
const message = `PubSub server is not started: topic ${this.topicName} does not exist`;
this.logger.error(message);
throw new Error(message);
}
}

this.subscription = topic.subscription(
this.subscriptionName,
this.subscriberConfig,
);

await this.createIfNotExists(
this.subscription.create.bind(this.subscription),
);
if (this.init) {
await this.createIfNotExists(
this.subscription.create.bind(this.subscription),
);
} else {
const [exists] = await this.subscription.exists();
if (!exists) {
const message = `PubSub server is not started: subscription ${this.subscriptionName} does not exist`;
this.logger.error(message);
throw new Error(message);
}
}

this.subscription
.on(MESSAGE_EVENT, async (message: Message) => {
Expand Down
1 change: 1 addition & 0 deletions tests/src/gc-pubsub-broadcast.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export class GCPubSubBroadcastController implements OnApplicationShutdown {
constructor() {
this.client = new GCPubSubClient({
topic: 'broadcast',
replyTopic: 'default_reply_topic',
replySubscription: 'broadcast_reply_subscription',
client: {
apiEndpoint: 'localhost:8681',
Expand Down
2 changes: 2 additions & 0 deletions tests/src/gc-pubsub.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ export class GCPubSubController implements OnApplicationShutdown {
apiEndpoint: 'localhost:8681',
projectId: 'microservice',
},
replyTopic: 'default_reply_topic',
replySubscription: 'default_reply_subscription',
});
}

Expand Down

0 comments on commit ce53858

Please sign in to comment.