Skip to content

Commit

Permalink
fix(providers): add provider for generic producer
Browse files Browse the repository at this point in the history
GH-0
  • Loading branch information
akshatdubeysf committed Dec 27, 2024
1 parent cfe60bf commit 5076645
Show file tree
Hide file tree
Showing 12 changed files with 135 additions and 3 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ export class StartConsumer implements IConsumer<TestStream, Events.start> {

A Producer is a loopback service for producing message for a particular topic, you can inject a producer using the `@producer(TOPIC_NAME)` decorator.
Note: The topic name passed to decorator must be first configured in the Component configuration's topic property -
If you want to produce a raw message without any event type, you can use the `@genericProducer(TOPIC_NAME)` decorator, note that in this case, the topic name must be passed in the genericTopics property of the component configuration.

#### Example

Expand Down
13 changes: 13 additions & 0 deletions src/__tests__/acceptance/application.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
setupConsumerApplication,
setupProducerApplication,
} from './test-helper';
import {GenericProducerService} from './fixtures/producer/generic-producer.service';

describe('end-to-end', () => {
let consumerApp: Application;
Expand Down Expand Up @@ -93,6 +94,18 @@ describe('end-to-end', () => {
);
});

it('should produce from a generic producer without events for a single topic', async () => {
const producerService = producerApp.getSync<GenericProducerService>(
`services.GenericProducerService`,
);
const message = 'message';
await producerService.produceMessage(message);
sinon.assert.called(genericHandler);
expect(genericHandler.getCalls()[0].args[0]).to.be.deepEqual({
data: message,
});
});

it('should consume from a generic consumer without events for a single topic', async () => {
const producerInstance = producerApp.getSync<Producer<TestStream>>(
producerKey(Topics.Generic),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import {genericProducer} from '../../../../decorators/generic-producer.decorator';
import {GenericProducer} from '../../../../types';
import {GenericStream} from '../stream';
import {Topics} from '../topics.enum';

export class GenericProducerService {
constructor(
@genericProducer(Topics.Generic)
private producer: GenericProducer<GenericStream>,
) {}

async produceMessage(message: string): Promise<void> {
await this.producer.send([
{
data: message,
},
]);
}
}
3 changes: 3 additions & 0 deletions src/__tests__/acceptance/fixtures/producer/producer-app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {KafkaClientComponent} from '../../../../component';
import {KafkaClientBindings} from '../../../../keys';
import {KafkaClientStub} from '../../../stubs';
import {Topics} from '../topics.enum';
import {GenericProducerService} from './generic-producer.service';

export class ProducerApp extends BootMixin(
ServiceMixin(RepositoryMixin(RestApplication)),
Expand All @@ -16,11 +17,13 @@ export class ProducerApp extends BootMixin(

this.configure(KafkaClientBindings.Component).to({
topics: Object.values(Topics) as string[],
genericTopics: [Topics.Generic],
});
this.bind<KafkaClientStub>(KafkaClientBindings.KafkaClient).to(
options.client,
);
this.component(KafkaClientComponent);
this.service(GenericProducerService);

this.projectRoot = __dirname;
// Customize @loopback/boot Booter Conventions here
Expand Down
2 changes: 1 addition & 1 deletion src/__tests__/acceptance/fixtures/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@ export interface TestStream extends IStreamDefinition {
export interface GenericStream extends IStreamDefinition {
topic: Topics.Generic;
messages: {
[Events.close]: {};
data: string;
};
}
24 changes: 22 additions & 2 deletions src/component.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@ import {
} from '@loopback/core';
import {LoggerExtensionComponent} from '@sourceloop/core';
import {Kafka} from 'kafkajs';
import {KafkaClientBindings, producerKey} from './keys';
import {genericProducerKey, KafkaClientBindings, producerKey} from './keys';
import {KafkaObserver} from './observers';
import {KafkaProducerFactoryProvider} from './providers';
import {
GenericKafkaProducerFactoryProvider,
KafkaProducerFactoryProvider,
} from './providers';
import {KafkaConsumerService} from './services/kafka-consumer.service';
import {KafkaClientOptions} from './types';

Expand All @@ -39,6 +42,11 @@ export class KafkaClientComponent implements Component {
.toProvider(KafkaProducerFactoryProvider)
.inScope(BindingScope.SINGLETON);

app
.bind(KafkaClientBindings.GenericProducerFactor)
.toProvider(GenericKafkaProducerFactoryProvider)
.inScope(BindingScope.SINGLETON);

app.service(KafkaConsumerService);

if (configuration?.topics) {
Expand All @@ -50,6 +58,18 @@ export class KafkaClientComponent implements Component {
.inScope(BindingScope.SINGLETON);
});
}

if (configuration?.genericTopics) {
const genericProducerFactory = app.getSync(
KafkaClientBindings.GenericProducerFactor,
);
configuration.genericTopics.forEach(topic => {
app
.bind(genericProducerKey(topic))
.to(genericProducerFactory(topic))
.inScope(BindingScope.SINGLETON);
});
}
if (configuration?.initObservers) {
app.lifeCycleObserver(KafkaObserver);
}
Expand Down
6 changes: 6 additions & 0 deletions src/decorators/generic-producer.decorator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import {inject} from '@loopback/core';
import {genericProducerKey} from '../keys';

export function genericProducer(topic: string) {
return inject(genericProducerKey(topic));
}
1 change: 1 addition & 0 deletions src/decorators/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ export * from './producer.decorator';
export * from './handler.decorator';
export * from './consumer.decorator';
export * from './generic-consumer.decorator';
export * from './generic-producer.decorator';
10 changes: 10 additions & 0 deletions src/keys.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import {KafkaClientComponent} from './component';
import {KafkaConsumerService} from './services/kafka-consumer.service';
import {
ConsumerConfig,
GenericProducer,
GenericProducerFactoryType,
IStreamDefinition,
Producer,
ProducerFactoryType,
Expand All @@ -30,6 +32,9 @@ export namespace KafkaClientBindings {
export const ProducerFactory = BindingKey.create<
ProducerFactoryType<IStreamDefinition>
>(`${KafkaNamespace}.ProducerFactory`);
export const GenericProducerFactor = BindingKey.create<
GenericProducerFactoryType<IStreamDefinition>
>(`${KafkaNamespace}.GenericProducerFactory`);
export const LifeCycleGroup = `${KafkaNamespace}.KAFKA_OBSERVER_GROUP`;
}

Expand All @@ -38,6 +43,11 @@ export const producerKey = (topic: string) =>
`${KafkaNamespace}.producer.${topic}`,
);

export const genericProducerKey = (topic: string) =>
BindingKey.create<GenericProducer<IStreamDefinition>>(
`${KafkaNamespace}.generic.producer.${topic}`,
);

export const eventHandlerKey = <
Stream extends IStreamDefinition,
K extends keyof Stream['messages'],
Expand Down
49 changes: 49 additions & 0 deletions src/providers/generic-kafka-producer-factory.provider.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import {inject, Provider} from '@loopback/core';
import {ILogger, LOGGER} from '@sourceloop/core';
import {CompressionTypes, Kafka, ProducerConfig} from 'kafkajs';
import {KafkaErrorKeys} from '../error-keys';
import {GenericProducerFactoryType, IStreamDefinition} from '../types';
import {KafkaClientBindings} from '../keys';

/* The class `GenericKafkaProducerFactoryProvider` is a TypeScript class that provides a factory for creating
Kafka producers to send messages to specified topics without events. */
export class GenericKafkaProducerFactoryProvider<T extends IStreamDefinition>
implements Provider<GenericProducerFactoryType<T>>
{
constructor(
@inject(KafkaClientBindings.KafkaClient)
private client: Kafka,
@inject(LOGGER.LOGGER_INJECT) private readonly logger: ILogger,
@inject(KafkaClientBindings.ProducerConfiguration, {optional: true})
private configuration?: ProducerConfig,
) {}

value(): GenericProducerFactoryType<T> {
return (topic: string) => {
return {
send: async (payload: T['messages'][], key?: string): Promise<void> => {
const producer = this.client.producer(this.configuration);

try {
await producer.connect();
await producer.send({
topic: topic,
compression: CompressionTypes.GZIP,
messages: payload.map(message => ({
key,
value: JSON.stringify(message),
})),
});
await producer.disconnect();
} catch (e) {
this.logger.error(
`${KafkaErrorKeys.PublishFailed}: ${JSON.stringify(e)}`,
);
await producer.disconnect();
throw e;
}
},
};
};
}
}
1 change: 1 addition & 0 deletions src/providers/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export * from './kafka-producer-factory.provider';
export * from './generic-kafka-producer-factory.provider';
9 changes: 9 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ export interface KafkaClientOptions {
connection: KafkaConfig;
topics?: string[];
initObservers?: boolean;
genericTopics?: string[];
}

export type ConsumerConfig = {
Expand Down Expand Up @@ -81,10 +82,18 @@ export interface Producer<Stream extends IStreamDefinition> {
): Promise<void>;
}

export interface GenericProducer<Stream extends IStreamDefinition> {
send(payload: Stream['messages'][], key?: string): Promise<void>;
}

export type ProducerFactoryType<Stream extends IStreamDefinition> = (
topic: Stream['topic'],
) => Producer<Stream>;

export type GenericProducerFactoryType<Stream extends IStreamDefinition> = (
topic: Stream['topic'],
) => GenericProducer<Stream>;

export type StreamHandler<
Stream extends IStreamDefinition,
K extends EventsInStream<Stream>,
Expand Down

0 comments on commit 5076645

Please sign in to comment.