Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement EventBus for RabbitMQ #5

Closed
RomanReznichenko opened this issue Mar 22, 2022 · 0 comments · Fixed by #6
Closed

Implement EventBus for RabbitMQ #5

RomanReznichenko opened this issue Mar 22, 2022 · 0 comments · Fixed by #6
Assignees
Labels
Type: enhancement New feature or request.

Comments

@RomanReznichenko
Copy link
Contributor

RomanReznichenko commented Mar 22, 2022

Provide an implementation based on RabbitMQ. Most communications between the service should be performed using the RPC and the Pub/Sub pattern.

Below you can find a simplified implementation of an event dispatcher for RabbitMQ:

export class RMQEventBus implements EventBus {
  // ...
  
  public publish<T>(event: Event<T>): Promise<void> {
    const { type, payload, correlationId, createdAt } = event;

    if (!this.channel) {
      throw new Error('You should call `init()` to proceed working with distpacher.');
    }
    
    await this.channel.publish(
      this.options.exchange, // exchange name
      type,                  // routing key
      Buffer.from(JSON.stringify(payload)),
      {
        type,
        correlationId,
        mandatory: true,
        persistent: true
        timestamp: createdAt.getTime()
      }
    );
  }
}

It can be together with RetryStrategy to ensure robust and reliable API:

export class RMQEventBus implements EventBus {
  // ...
  private readonly retryStrategy!: RetryStrategy;
  
  public publish<T>(event: Event<T>): Promise<void> {
    // ...
    
    await this.retryStrategy.acquire(() => channel.publish(
      // props
    ));
  }
}

We should also provide a simplified implementation of a command dispatcher:

export class RMQEventBus implements EventBus {
  private readonly subject = new EventEmitter({ captureRejections: true });
  private readonly REPLY_QUEUE_NAME = 'amq.rabbitmq.reply-to';
  // ...
  
  constructor(/* options */) {
    this.subject.setMaxListeners(Infinity);
  }
  
  public execute<T, R>(command: Command<T>): Promise<R | undefined> { 
    const { type, sendTo, payload, correlationId, createdAt, ttl = 5000, expectReply = true } = command;
  
    if (!this.channel) {
      throw new Error('You should call `init()` to proceed working with dispatcher.');
    }
  
    const waiter = expectReply ? 
      Promise.race([
        once(this.subject, correlationId) as Promise<[R]>,
        new Promise<never>((_, reject) =>
          setTimeout(reject, ttl, new NoResponse(ttl)).unref()
        )
      ]) : 
      Promise.resolve([]);
  
    try {
      await this.channel.sendToQueue(
        sendTo,
        Buffer.from(JSON.stringify(payload)),
        {
          type,
          correlationId,
          timestamp: createdAt.getTime(),
          replyTo: this.REPLY_QUEUE_NAME
        }
      );
  
      const [response]: [R] = await waiter;
  
      return response;
    } finally {
      this.subject.removeAllListeners(correlationId);
    }
  }
  
  private async processReply(message: ConsumeMessage): Promise<void> {
    const event: ParsedConsumeMessage | undefined =
      this.parseConsumeMessage(message);

    if (event?.correlationId) {
      this.subject.emit(event.correlationId, event.payload);
    }
  }
}

Each event type has a related channel to get events from RabbitMQ. You can then have as many event handlers per channel and event type as needed.

The register method should accept a type of EventHandler, adding that event handler to the list of handlers that each event can have. If the bus is not subscribed to the event, it creates a new channel with a routing key using an event type so it can receive events when that event is published from any other service.

@derevnjuk derevnjuk changed the title Provide EventBus based on RabitMQ Implement EventBus for RabbitMQ Mar 22, 2022
@derevnjuk derevnjuk added the Type: enhancement New feature or request. label Mar 22, 2022
derevnjuk pushed a commit that referenced this issue Apr 10, 2022
derevnjuk added a commit that referenced this issue Apr 13, 2022
derevnjuk added a commit that referenced this issue Apr 13, 2022
)

relates-to #5
Co-authored-by: Viachaslau <pmstss@gmail.com>
derevnjuk added a commit that referenced this issue Apr 13, 2022
derevnjuk added a commit that referenced this issue Apr 13, 2022
derevnjuk added a commit that referenced this issue Apr 14, 2022
pmstss pushed a commit that referenced this issue Apr 14, 2022
derevnjuk added a commit that referenced this issue Apr 14, 2022
derevnjuk added a commit that referenced this issue Apr 14, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Type: enhancement New feature or request.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants