Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: added init parameter #19

Merged
merged 2 commits into from
Feb 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ The `options` property is specific to the chosen transporter. The <strong>GCloud
<td><code>noAck</code></td>
<td>If <code>false</code>, manual acknowledgment mode enabled</td>
</tr>
<tr>
<td><code>init</code></td>
<td>If <code>false</code>, topics and subscriptions will not be created, only validated</td>
</tr>
<tr>
<td><code>client</code></td>
<td>Additional client options (read more <a href="https://googleapis.dev/nodejs/pubsub/latest/global.html#ClientConfig" rel="nofollow" target="_blank">here</a>)</td>
Expand Down
9 changes: 8 additions & 1 deletion lib/gc-pubsub.client.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ describe('GCPubSubClient', () => {

beforeEach(() => {
sandbox = sinon.createSandbox();
client = new GCPubSubClient({});
client = new GCPubSubClient({
replyTopic: 'replyTopic',
replySubscription: 'replySubcription',
});

subscriptionMock = {
create: sandbox.stub().resolves(),
Expand All @@ -25,6 +28,7 @@ describe('GCPubSubClient', () => {
create: sandbox.stub().resolves(),
flush: sandbox.stub().callsFake((callback) => callback()),
publishMessage: sandbox.stub().resolves(),
exists: sandbox.stub().resolves([true]),
subscription: sandbox.stub().returns(subscriptionMock),
};

Expand Down Expand Up @@ -53,6 +57,9 @@ describe('GCPubSubClient', () => {
it('should call "client.topic" once', async () => {
expect(pubsub.topic.called).to.be.true;
});
it('should call "topic.exists" once', async () => {
expect(topicMock.exists.called).to.be.true;
});
it('should call "topic.create" once', async () => {
expect(topicMock.create.called).to.be.true;
});
Expand Down
75 changes: 51 additions & 24 deletions lib/gc-pubsub.client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ import { ERROR_EVENT, MESSAGE_EVENT } from '@nestjs/microservices/constants';
import {
ALREADY_EXISTS,
GC_PUBSUB_DEFAULT_CLIENT_CONFIG,
GC_PUBSUB_DEFAULT_INIT,
GC_PUBSUB_DEFAULT_NO_ACK,
GC_PUBSUB_DEFAULT_PUBLISHER_CONFIG,
GC_PUBSUB_DEFAULT_REPLY_SUBSCRIPTION,
GC_PUBSUB_DEFAULT_REPLY_TOPIC,
GC_PUBSUB_DEFAULT_SUBSCRIBER_CONFIG,
GC_PUBSUB_DEFAULT_TOPIC,
} from './gc-pubsub.constants';
Expand All @@ -34,15 +33,16 @@ export class GCPubSubClient extends ClientProxy {

protected readonly topicName: string;
protected readonly publisherConfig: PublishOptions;
protected readonly replyTopicName: string;
protected readonly replyTopicName?: string;
protected readonly replySubscriptionName?: string;
protected readonly clientConfig: ClientConfig;
protected readonly replySubscriptionName: string;
protected readonly subscriberConfig: SubscriberOptions;
protected readonly noAck: boolean;

protected client: PubSub | null = null;
protected replySubscription: Subscription | null = null;
protected topic: Topic | null = null;
protected init: boolean;

constructor(protected readonly options: GCPubSubOptions) {
super();
Expand All @@ -57,13 +57,12 @@ export class GCPubSubClient extends ClientProxy {
this.publisherConfig =
this.options.publisher || GC_PUBSUB_DEFAULT_PUBLISHER_CONFIG;

this.replyTopicName =
this.options.replyTopic || GC_PUBSUB_DEFAULT_REPLY_TOPIC;
this.replyTopicName = this.options.replyTopic;

this.replySubscriptionName =
this.options.replySubscription || GC_PUBSUB_DEFAULT_REPLY_SUBSCRIPTION;
this.replySubscriptionName = this.options.replySubscription;

this.noAck = this.options.noAck ?? GC_PUBSUB_DEFAULT_NO_ACK;
this.init = this.options.init ?? GC_PUBSUB_DEFAULT_INIT;

this.initializeSerializer(options);
this.initializeDeserializer(options);
Expand All @@ -87,27 +86,55 @@ export class GCPubSubClient extends ClientProxy {

this.topic = this.client.topic(this.topicName, this.publisherConfig);

const replyTopic = this.client.topic(this.replyTopicName);
const [topicExists] = await this.topic.exists();

await this.createIfNotExists(replyTopic.create.bind(replyTopic));
if (!topicExists) {
const message = `PubSub client is not connected: topic ${this.topicName} does not exist`;
this.logger.error(message);
throw new Error(message);
}

if (this.replyTopicName && this.replySubscriptionName) {
const replyTopic = this.client.topic(this.replyTopicName);

this.replySubscription = replyTopic.subscription(
this.replySubscriptionName,
this.subscriberConfig,
);
if (this.init) {
await this.createIfNotExists(replyTopic.create.bind(replyTopic));
} else {
const [exists] = await replyTopic.exists();
if (!exists) {
const message = `PubSub client is not connected: topic ${this.replyTopicName} does not exist`;
this.logger.error(message);
throw new Error(message);
}
}

await this.createIfNotExists(
this.replySubscription.create.bind(this.replySubscription),
);
this.replySubscription = replyTopic.subscription(
this.replySubscriptionName,
this.subscriberConfig,
);

this.replySubscription
.on(MESSAGE_EVENT, async (message: Message) => {
await this.handleResponse(message.data);
if (this.noAck) {
message.ack();
if (this.init) {
await this.createIfNotExists(
this.replySubscription.create.bind(this.replySubscription),
);
} else {
const [exists] = await this.replySubscription.exists();
if (!exists) {
const message = `PubSub client is not connected: subscription ${this.replySubscription} does not exist`;
this.logger.error(message);
throw new Error(message);
}
})
.on(ERROR_EVENT, (err: any) => this.logger.error(err));
}

this.replySubscription
.on(MESSAGE_EVENT, async (message: Message) => {
await this.handleResponse(message.data);
if (this.noAck) {
message.ack();
}
})
.on(ERROR_EVENT, (err: any) => this.logger.error(err));
}

return this.client;
}
Expand Down
4 changes: 1 addition & 3 deletions lib/gc-pubsub.constants.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
export const GC_PUBSUB_DEFAULT_TOPIC = 'default_topic';
export const GC_PUBSUB_DEFAULT_REPLY_TOPIC = 'default_reply_topic';
export const GC_PUBSUB_DEFAULT_SUBSCRIPTION = 'default_subscription';
export const GC_PUBSUB_DEFAULT_REPLY_SUBSCRIPTION =
'default_reply_subscription';
export const GC_PUBSUB_DEFAULT_PUBLISHER_CONFIG = {};
export const GC_PUBSUB_DEFAULT_SUBSCRIBER_CONFIG = {};
export const GC_PUBSUB_DEFAULT_CLIENT_CONFIG = {};
export const GC_PUBSUB_DEFAULT_NO_ACK = true;
export const GC_PUBSUB_DEFAULT_INIT = true;
export const ALREADY_EXISTS = 6;
1 change: 1 addition & 0 deletions lib/gc-pubsub.interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export interface GCPubSubOptions {
subscription?: string;
replySubscription?: string;
noAck?: boolean;
init?: boolean
publisher?: PublishOptions;
subscriber?: SubscriberOptions;
serializer?: Serializer;
Expand Down
31 changes: 25 additions & 6 deletions lib/gc-pubsub.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import {
Message,
PubSub,
Subscription,
Topic,
} from '@google-cloud/pubsub';
import { PublishOptions } from '@google-cloud/pubsub/build/src/publisher';
import { SubscriberOptions } from '@google-cloud/pubsub/build/src/subscriber';
Expand All @@ -26,6 +25,7 @@ import { GCPubSubOptions } from './gc-pubsub.interface';
import {
ALREADY_EXISTS,
GC_PUBSUB_DEFAULT_CLIENT_CONFIG,
GC_PUBSUB_DEFAULT_INIT,
GC_PUBSUB_DEFAULT_NO_ACK,
GC_PUBSUB_DEFAULT_PUBLISHER_CONFIG,
GC_PUBSUB_DEFAULT_SUBSCRIBER_CONFIG,
Expand All @@ -45,9 +45,9 @@ export class GCPubSubServer extends Server implements CustomTransportStrategy {
protected readonly subscriberConfig: SubscriberOptions;
protected readonly noAck: boolean;
protected readonly replyTopics: Set<string>;
protected readonly init: boolean;

protected client: PubSub | null = null;
protected readonly topics: Map<string, Topic> = new Map();
protected subscription: Subscription | null = null;

constructor(protected readonly options: GCPubSubOptions) {
Expand All @@ -67,6 +67,7 @@ export class GCPubSubServer extends Server implements CustomTransportStrategy {
this.options.publisher || GC_PUBSUB_DEFAULT_PUBLISHER_CONFIG;

this.noAck = this.options.noAck ?? GC_PUBSUB_DEFAULT_NO_ACK;
this.init = this.options.init ?? GC_PUBSUB_DEFAULT_INIT;

this.replyTopics = new Set();

Expand All @@ -78,16 +79,34 @@ export class GCPubSubServer extends Server implements CustomTransportStrategy {
this.client = this.createClient();
const topic = this.client.topic(this.topicName);

await this.createIfNotExists(topic.create.bind(topic));
if (this.init) {
await this.createIfNotExists(topic.create.bind(topic));
} else {
const [exists] = await topic.exists();
if (!exists) {
const message = `PubSub server is not started: topic ${this.topicName} does not exist`;
this.logger.error(message);
throw new Error(message);
}
}

this.subscription = topic.subscription(
this.subscriptionName,
this.subscriberConfig,
);

await this.createIfNotExists(
this.subscription.create.bind(this.subscription),
);
if (this.init) {
await this.createIfNotExists(
this.subscription.create.bind(this.subscription),
);
} else {
const [exists] = await this.subscription.exists();
if (!exists) {
const message = `PubSub server is not started: subscription ${this.subscriptionName} does not exist`;
this.logger.error(message);
throw new Error(message);
}
}

this.subscription
.on(MESSAGE_EVENT, async (message: Message) => {
Expand Down
1 change: 1 addition & 0 deletions tests/src/gc-pubsub-broadcast.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export class GCPubSubBroadcastController implements OnApplicationShutdown {
constructor() {
this.client = new GCPubSubClient({
topic: 'broadcast',
replyTopic: 'default_reply_topic',
replySubscription: 'broadcast_reply_subscription',
client: {
apiEndpoint: 'localhost:8681',
Expand Down
2 changes: 2 additions & 0 deletions tests/src/gc-pubsub.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ export class GCPubSubController implements OnApplicationShutdown {
apiEndpoint: 'localhost:8681',
projectId: 'microservice',
},
replyTopic: 'default_reply_topic',
replySubscription: 'default_reply_subscription',
});
}

Expand Down