diff --git a/docs/ris-disconnections.md b/docs/ris-disconnections.md index 608c4620..d3380a0d 100644 --- a/docs/ris-disconnections.md +++ b/docs/ris-disconnections.md @@ -6,6 +6,7 @@ The following causes are possible: 1) **Network issues.** The machine where BGPalerter is running loses connectivity (maybe just for a few seconds). 2) **You are monitoring something that produces too many BGP updates** (e.g., your prefixes are not stable or constantly re-announced). In such cases you may be too slow in consuming the data and the server disconnects you to flush the buffer. +3) **Process termination.** This happens when BGPalerter was killed or crashed for some reason, this is not related to RIPE RIS. Anyway, unfortunately sometimes this happens without an explanation due to RIPE RIS instabilities. This has been reported to the RIPE RIS team. diff --git a/package-lock.json b/package-lock.json index 1c899865..e068305d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -3831,6 +3831,11 @@ } } }, + "node-cleanup": { + "version": "2.1.2", + "resolved": "https://registry.npmjs.org/node-cleanup/-/node-cleanup-2.1.2.tgz", + "integrity": "sha1-esGavSl+Caf3KnFUXZUbUX5N3iw=" + }, "node-environment-flags": { "version": "1.0.6", "resolved": "https://registry.npmjs.org/node-environment-flags/-/node-environment-flags-1.0.6.tgz", diff --git a/package.json b/package.json index 918b1f0c..dcc626e0 100644 --- a/package.json +++ b/package.json @@ -71,6 +71,7 @@ "kafkajs": "^1.15.0", "md5": "^2.3.0", "moment": "^2.29.1", + "node-cleanup": "^2.1.2", "nodemailer": "^6.7.0", "path": "^0.12.7", "restify": "^8.6.0", diff --git a/src/connectors/connector.js b/src/connectors/connector.js index d9315cdc..321b9fe0 100644 --- a/src/connectors/connector.js +++ b/src/connectors/connector.js @@ -107,5 +107,4 @@ export default class Connector { disconnect = () => { throw new Error('The method disconnect MUST be implemented'); }; - } \ No newline at end of file diff --git a/src/connectors/connectorRIS.js b/src/connectors/connectorRIS.js index 3b8b296f..f1204b6e 100644 --- a/src/connectors/connectorRIS.js +++ b/src/connectors/connectorRIS.js @@ -46,23 +46,25 @@ export default class ConnectorRIS extends Connector { this.agent = env.agent; this.subscribed = {}; this.canaryBeacons = {}; + this.clientId = env.clientId; + this.instanceId = env.instanceId; this.url = brembo.build(this.params.url, { params: { client_version: env.version, - client: env.clientId, - instance: env.instanceId + client: this.clientId, + instance: this.instanceId } }); + if (this.environment !== "research") { // The canary feature may impact performance if you are planning to get all the possible updates of RIS this._startCanaryInterval = setInterval(this._startCanary, 60000); } }; - _openConnect = (resolve) => { + _openConnect = (resolve, data) => { resolve(true); - this._connect(`${this.name} connector connected`); - + this._connect(`${this.name} connector connected (instance:${this.instanceId} connection:${data.connection})`); if (this.subscription) { this.subscribe(this.subscription); } @@ -76,7 +78,7 @@ export default class ConnectorRIS extends Connector { this._message(messageObj); }; - _appendListeners = (resolve, reject) => { + _appendListeners = (resolve, reject) => { this.ws.on('message', this._messageToJson); this.ws.on('close', (error) => { @@ -87,8 +89,10 @@ export default class ConnectorRIS extends Connector { reject(); } }); - this.ws.on('error', this._error); - this.ws.on('open', this._openConnect.bind(null, resolve)); + this.ws.on('error', error => { + this._error(`${this.name} ${error.message} (instance:${this.instanceId} connection:${error.connection})`); + }); + this.ws.on('open', data => this._openConnect(resolve, data)); }; connect = () => @@ -314,7 +318,7 @@ export default class ConnectorRIS extends Connector { } this._timeoutFileChange = setTimeout(() => { this._onInputChange(input); - }, 2000); + }, 5000); }); }; diff --git a/src/utils/WebSocket.js b/src/utils/WebSocket.js index 9a7396b6..b02d5bfb 100644 --- a/src/utils/WebSocket.js +++ b/src/utils/WebSocket.js @@ -2,6 +2,7 @@ import _ws from "ws"; import PubSub from "../utils/pubSub"; import brembo from "brembo"; import { v4 as uuidv4 } from 'uuid'; +import nodeCleanup from "node-cleanup"; export default class WebSocket { constructor(host, options) { @@ -13,8 +14,16 @@ export default class WebSocket { this.alive = false; this.pingInterval = options.pingIntervalSeconds ? options.pingIntervalSeconds * 1000 : 40000; this.reconnectSeconds = options.reconnectSeconds ? options.reconnectSeconds * 1000 : 30000; - this.connectionDelay = 5000; + this.connectionDelay = 8000; + this.openConnectionTimeoutSeconds = 40000; this.lastPingReceived = null; + + nodeCleanup(() => { + if (this.ws) { + this.pubsub.publish("close", "process termination"); + this.disconnect(); + } + }); } _ping = () => { @@ -35,7 +44,7 @@ export default class WebSocket { const nPings = 6; if (this.ws) { if (this.lastPingReceived + (this.pingInterval * nPings) < new Date().getTime()) { - this.pubsub.publish("error", `The WebSocket client didn't receive ${nPings} pings. Disconnecting.`); + this._publishError(`The WebSocket client didn't receive ${nPings} pings. Disconnecting.`) this.disconnect(); this.connect(); } else { @@ -64,21 +73,24 @@ export default class WebSocket { }); this.ws = new _ws(url, this.options); + this.setOpenTimeout(true); this.ws.on('message', (data) => { this.pubsub.publish("message", data); }); - this.ws.on('close', (data) => { + this.ws.on('close', data => { this.alive = false; + this.setOpenTimeout(false); this.pubsub.publish("close", data); }); this.ws.on('pong', this._pingReceived); - this.ws.on('error', (data) => { - this.pubsub.publish("error", data); + this.ws.on('error', message => { + this._publishError(message, {connection: connectionId}); }); - this.ws.on('open', (data) => { + this.ws.on('open', () => { this.alive = true; - this.pubsub.publish("open", data); + this.setOpenTimeout(false); + this.pubsub.publish("open", { connection: connectionId }); }); this._startPing(); @@ -108,6 +120,24 @@ export default class WebSocket { this.connectionDelay = this.reconnectSeconds; }; + _publishError = (message, extra={}) => { + this.pubsub.publish("error", { type: "error", message, ...extra }); + }; + + setOpenTimeout = (setting) => { + if (setting) { + this.openConnectionTimeout = setTimeout(() => { + this._publishError("connection timed out"); + if (this.ws) { + this.disconnect(); + this.connect(); + } + }, this.openConnectionTimeoutSeconds); + } else { + clearTimeout(this.openConnectionTimeout); + } + }; + disconnect = () => { try { this.ws.removeAllListeners("message");