Skip to content

Commit

Permalink
feat(adapter/amqp): emit disconnect event to internal event bus when …
Browse files Browse the repository at this point in the history
…connection closed
  • Loading branch information
CheerlessCloud committed Oct 4, 2018
1 parent 3269f18 commit 6e4d6e0
Showing 1 changed file with 23 additions and 6 deletions.
29 changes: 23 additions & 6 deletions src/AMQPAdapter.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@ import pEvent from 'p-event';
import EError from 'eerror';
import AMQPMessage, { type IMessage } from './AMQPMessage';

type AMQPAdapterEventBusPossibleEvent = 'lock' | 'unlock' | 'bufferOverflow' | 'endHandle';
type AMQPAdapterEventBusPossibleEvent =
| 'lock'
| 'unlock'
| 'bufferOverflow'
| 'endHandle'
| 'disconnect';
type AMQPAdapterEventBus = {
...EventEmitter,
on(AMQPAdapterEventBusPossibleEvent, (reason: string) => mixed): void,
Expand Down Expand Up @@ -79,11 +84,6 @@ class AMQPAdapter {

_mountEventHandlers() {
// @todo check correctness of using on/once for this events
this._connection.on('close', () => {
this._state = 'disconnected';
this._eventBus.emit('lock', 'connectionClosed');
});

this._connection.on('blocked', () => {
this._state = 'blocked';
this._eventBus.emit('lock', 'blocked');
Expand All @@ -104,8 +104,25 @@ class AMQPAdapter {
this._eventBus.emit('unlock', 'drain');
});

this._connection.on('close', () => {
const lastState = this._state;
this._state = 'disconnected';

if (['connecting', 'connected', 'blocked'].includes(lastState)) {
this._eventBus.emit('disconnect', 'connectionClosed');
}

this._eventBus.emit('lock', 'connectionClosed');
});

this._channel.on('close', () => {
const lastState = this._state;
this._state = 'disconnected';

if (['connecting', 'connected', 'blocked'].includes(lastState)) {
this._eventBus.emit('disconnect', 'channelClosed');
}

this._eventBus.emit('lock', 'channelClosed');
});

Expand Down

0 comments on commit 6e4d6e0

Please sign in to comment.