Skip to content

Emit message to kafka and wait while nestjs process it

Notifications You must be signed in to change notification settings


Repository files navigation


Emit message to kafka and wait while nestjs process it.

How it works

Create helper utils for end to end test:

  • emitMessage - function to emit message to kafka
  • AwaitableEmitInterceptor - nestjs global interceptor (required for emitMessage)
  • dispose - function to run on test tear down stage


  1. Create helper objects
const { emitMessage, AwaitableEmitInterceptor, dispose } = createAwaitableEmit(options)`
  1. Add interceptor to nestjs app
app.useGlobalInterceptors(new AwaitableEmitInterceptor());
  1. Use emitMessage in test
  2. Run dispose in 'after all' stage


getKafkaClient: () => ClientKafka Factory function which returns kafka client instance
wait?: number Wait time in milliseconds (if controller cannot handle message in this time, promise will be resolved)
brokers?: string[] Kafka brokers, this is optional and uses kafka admin under the hood - waits when all consumners groups will have no lag (this may or may not be useful for parallel running tests)


export class AppController {
  constructor(private readonly appService: AppService) {}

  async handleEntityCreated(@Payload() payload: object) {
    console.log('user created', payload);
    await setTimeout(2000); // Imitate long running process
describe('AppController (e2e)', () => {
  let app: INestMicroservice;
  let clientKafka: ClientKafka;
  let service: AppService;

  const { emitMessage, AwaitableEmitInterceptor, dispose } =
      getKafkaClient: () => clientKafka,

  // Before all
  before(async () => {
    const providers: Provider[] = [
        provide: 'KAFKA_CLIENT',
        useFactory: () => {
          return ClientProxyFactory.create({
            transport: Transport.KAFKA,
            options: {
              client: {
                clientId: 'KAFKA_CLIENT',
                brokers: [''],
    const testingModule = await Test.createTestingModule({
      imports: [AppModule],

    app = testingModule.createNestMicroservice({
      transport: Transport.KAFKA,
      options: {
        client: {
          clientId: 'KAFKA_CLIENT',
          brokers: [''],

    app.useGlobalInterceptors(new AwaitableEmitInterceptor());

    await app.init();
    await app.listen();

    clientKafka = app.get<ClientKafka>('KAFKA_CLIENT');
    service = app.get(AppService);

  // After all
  after(async () => {
    await dispose();
    await clientKafka?.close();
    await app?.close();

  it('smoke', () => {

  it('test emit message', async () => {
    await emitMessage('user-created', {
      value: { name: 'Bob' },
    expect({ name: 'Bob' });