Skip to content
This repository has been archived by the owner on Jun 11, 2024. It is now read-only.

Inbound peer eviction method - Closes #3721 #3824

Merged
merged 16 commits into from
Jun 26, 2019
Merged
17 changes: 17 additions & 0 deletions elements/lisk-p2p/src/p2p.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ const BASE_10_RADIX = 10;
const DEFAULT_MAX_OUTBOUND_CONNECTIONS = 20;
const DEFAULT_MAX_INBOUND_CONNECTIONS = 100;
const DEFAULT_OUTBOUND_SHUFFLE_INTERVAL = 300000;
const DEFAULT_PEER_PROTECTION_FOR_LATENCY = 0.068;
const DEFAULT_PEER_PROTECTION_FOR_USEFULNESS = 0.068;
const DEFAULT_PEER_PROTECTION_FOR_LONGEVITY = 0.5;

const selectRandomPeerSample = (
peerList: ReadonlyArray<P2PPeerInfo>,
Expand Down Expand Up @@ -178,6 +181,7 @@ export class P2P extends EventEmitter {
private readonly _handleInboundSocketError: (error: Error) => void;
private readonly _peerHandshakeCheck: P2PCheckPeerCompatibility;

// tslint:disable-next-line: cyclomatic-complexity
diego-G marked this conversation as resolved.
Show resolved Hide resolved
public constructor(config: P2PConfig) {
super();
this._sanitizedPeerLists = sanitizePeerLists(
Expand Down Expand Up @@ -362,6 +366,7 @@ export class P2P extends EventEmitter {
// Re-emit the error to allow it to bubble up the class hierarchy.
this.emit(EVENT_INBOUND_SOCKET_ERROR, error);
};

this._peerPool = new PeerPool({
connectTimeout: config.connectTimeout,
ackTimeout: config.ackTimeout,
Expand Down Expand Up @@ -390,6 +395,18 @@ export class P2P extends EventEmitter {
outboundShuffleInterval: config.outboundShuffleInterval
? config.outboundShuffleInterval
: DEFAULT_OUTBOUND_SHUFFLE_INTERVAL,
latencyProtectionRatio:
typeof config.latencyProtectionRatio === 'number'
mitsuaki-u marked this conversation as resolved.
Show resolved Hide resolved
? config.latencyProtectionRatio
: DEFAULT_PEER_PROTECTION_FOR_LATENCY,
productivityProtectionRatio:
typeof config.productivityProtectionRatio === 'number'
? config.productivityProtectionRatio
: DEFAULT_PEER_PROTECTION_FOR_USEFULNESS,
longevityProtectionRatio:
typeof config.longevityProtectionRatio === 'number'
? config.longevityProtectionRatio
: DEFAULT_PEER_PROTECTION_FOR_LONGEVITY,
});

this._bindHandlersToPeerPool(this._peerPool);
Expand Down
40 changes: 31 additions & 9 deletions elements/lisk-p2p/src/p2p_request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,23 @@
import { RPCResponseAlreadySentError } from './errors';
import { P2PResponsePacket } from './p2p_types';

interface RequestOptions {
readonly procedure: string;
readonly data: unknown;
readonly id: string;
readonly rate: number;
productivity: {
shuse2 marked this conversation as resolved.
Show resolved Hide resolved
// tslint:disable-next-line: readonly-keyword
requestCounter: number;
// tslint:disable-next-line: readonly-keyword
responseCounter: number;
// tslint:disable-next-line: readonly-keyword
responseRate: number;
// tslint:disable-next-line: readonly-keyword
lastResponded: number;
};
}

export class P2PRequest {
private readonly _procedure: string;
private readonly _data: unknown;
Expand All @@ -28,16 +45,14 @@ export class P2PRequest {
private readonly _rate: number;

public constructor(
procedure: string,
data: unknown,
peerId: string,
rate: number,
options: RequestOptions,
respondCallback: (responseError?: Error, responseData?: unknown) => void,
) {
this._procedure = procedure;
this._data = data;
this._peerId = peerId;
this._rate = rate;
this._procedure = options.procedure;
this._data = options.data;
this._peerId = options.id;
this._rate = options.rate;
options.productivity.requestCounter += 1;
this._respondCallback = (
responseError?: Error,
responsePacket?: P2PResponsePacket,
Expand All @@ -48,10 +63,17 @@ export class P2PRequest {
);
}
this._wasResponseSent = true;
// We assume peer performed useful work and update peer response rate
if (!responseError && responsePacket) {
options.productivity.lastResponded = Date.now();
options.productivity.responseCounter += 1;
options.productivity.responseRate =
options.productivity.responseCounter /
options.productivity.requestCounter;
}
respondCallback(responseError, responsePacket);
};
this._wasResponseSent = false;
this._peerId = peerId;
}

public get procedure(): string {
Expand Down
3 changes: 3 additions & 0 deletions elements/lisk-p2p/src/p2p_types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ export interface P2PConfig {
readonly peerBanTime?: number;
readonly sendPeerLimit?: number;
readonly outboundShuffleInterval?: number;
readonly latencyProtectionRatio?: number;
readonly productivityProtectionRatio?: number;
readonly longevityProtectionRatio?: number;
readonly hostIp?: string;
}

Expand Down
70 changes: 65 additions & 5 deletions elements/lisk-p2p/src/peer/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ export interface ClientOptionsUpdated {
readonly connectTimeout?: number;
}

export interface Productivity {
readonly requestCounter: number;
readonly responseCounter: number;
readonly responseRate: number;
readonly lastResponded: number;
}

export type SCServerSocketUpdated = {
destroy(code?: number, data?: string | object): void;
on(event: string | unknown, listener: (packet?: unknown) => void): void;
Expand Down Expand Up @@ -91,6 +98,13 @@ export const DEFAULT_CONNECT_TIMEOUT = 2000;
export const DEFAULT_ACK_TIMEOUT = 2000;
export const DEFAULT_REPUTATION_SCORE = 100;
export const DEFAULT_RATE_INTERVAL = 1000;
export const DEFAULT_PRODUCTIVITY_RESET_INTERVAL = 20000;
export const DEFAULT_PRODUCTIVITY = {
requestCounter: 0,
responseCounter: 0,
responseRate: 0,
lastResponded: 0,
mitsuaki-u marked this conversation as resolved.
Show resolved Hide resolved
};

export enum ConnectionState {
CONNECTING = 'connecting',
Expand Down Expand Up @@ -128,10 +142,19 @@ export class Peer extends EventEmitter {
protected readonly _ipAddress: string;
protected readonly _wsPort: number;
private readonly _height: number;
private _reputation: number;
protected _reputation: number;
protected _latency: number;
protected _connectTime: number;
protected _productivity: {
requestCounter: number;
responseCounter: number;
responseRate: number;
lastResponded: number;
};
private _callCounter: Map<string, number>;
private readonly _counterResetInterval: NodeJS.Timer;
protected _peerInfo: P2PPeerInfo;
private readonly _productivityResetInterval: NodeJS.Timer;
protected readonly _peerConfig: PeerConfig;
protected _nodeInfo: P2PNodeInfo | undefined;
protected readonly _handleRawRPC: (
Expand Down Expand Up @@ -159,10 +182,22 @@ export class Peer extends EventEmitter {
this._id = constructPeerId(this._ipAddress, this._wsPort);
this._height = peerInfo.height ? (peerInfo.height as number) : 0;
this._reputation = DEFAULT_REPUTATION_SCORE;
this._latency = 0;
this._connectTime = Date.now();
this._callCounter = new Map();
this._counterResetInterval = setInterval(() => {
this._callCounter = new Map();
}, DEFAULT_RATE_INTERVAL);
this._productivityResetInterval = setInterval(() => {
// If peer has not recently responded, reset productivity to 0
if (
this._productivity.lastResponded <
Date.now() - DEFAULT_PRODUCTIVITY_RESET_INTERVAL
) {
this._productivity = { ...DEFAULT_PRODUCTIVITY };
}
}, DEFAULT_PRODUCTIVITY_RESET_INTERVAL);
this._productivity = { ...DEFAULT_PRODUCTIVITY };

// This needs to be an arrow function so that it can be used as a listener.
this._handleRawRPC = (
Expand All @@ -184,11 +219,15 @@ export class Peer extends EventEmitter {
}

const rate = this._getPeerRate(packet as P2PRequestPacket);

const request = new P2PRequest(
rawRequest.procedure,
rawRequest.data,
this._id,
rate,
{
procedure: rawRequest.procedure,
data: rawRequest.data,
id: this._id,
rate,
productivity: this._productivity,
},
respond,
);

Expand Down Expand Up @@ -263,6 +302,26 @@ export class Peer extends EventEmitter {
return this._ipAddress;
}

public get reputation(): number {
return this._reputation;
}

public get latency(): number {
return this._latency;
}

public get connectTime(): number {
return this._connectTime;
}

public get responseRate(): number {
return this._productivity.responseRate;
}

public get productivity(): Productivity {
return { ...this._productivity };
}

public updatePeerInfo(newPeerInfo: P2PDiscoveredPeerInfo): void {
// The ipAddress and wsPort properties cannot be updated after the initial discovery.
this._peerInfo = {
Expand Down Expand Up @@ -324,6 +383,7 @@ export class Peer extends EventEmitter {

public disconnect(code: number = 1000, reason?: string): void {
clearInterval(this._counterResetInterval);
clearInterval(this._productivityResetInterval);
if (this._socket) {
this._socket.destroy(code, reason);
}
Expand Down
22 changes: 21 additions & 1 deletion elements/lisk-p2p/src/peer/inbound.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
* Removal or modification of this copyright notice is prohibited.
*
*/

import {
Peer,
PeerConfig,
Expand All @@ -27,6 +26,14 @@ import { SCServerSocket } from 'socketcluster-server';

export const EVENT_CLOSE_INBOUND = 'closeInbound';
export const EVENT_INBOUND_SOCKET_ERROR = 'inboundSocketError';
export const EVENT_PING = 'ping';
mitsuaki-u marked this conversation as resolved.
Show resolved Hide resolved

const DEFAULT_PING_INTERVAL_MAX = 60000;
const DEFAULT_PING_INTERVAL_MIN = 20000;

const getRandomPingDelay = () =>
Math.random() * (DEFAULT_PING_INTERVAL_MAX - DEFAULT_PING_INTERVAL_MIN) +
DEFAULT_PING_INTERVAL_MIN;

export class InboundPeer extends Peer {
protected _socket: SCServerSocketUpdated;
Expand All @@ -35,6 +42,8 @@ export class InboundPeer extends Peer {
code: number,
reason: string,
) => void;
private readonly _sendPing: () => void;
private _pingTimeoutId: NodeJS.Timer;

public constructor(
peerInfo: P2PDiscoveredPeerInfo,
Expand All @@ -46,12 +55,23 @@ export class InboundPeer extends Peer {
this.emit(EVENT_INBOUND_SOCKET_ERROR, error);
};
this._handleInboundSocketClose = (code, reason) => {
if (this._pingTimeoutId) {
clearTimeout(this._pingTimeoutId);
}
this.emit(EVENT_CLOSE_INBOUND, {
peerInfo,
code,
reason,
});
};
this._sendPing = () => {
const pingStart = Date.now();
this._socket.emit(EVENT_PING, undefined, (_: Error, __: unknown) => {
this._latency = Date.now() - pingStart;
this._pingTimeoutId = setTimeout(this._sendPing, getRandomPingDelay());
});
};
this._pingTimeoutId = setTimeout(this._sendPing, getRandomPingDelay());
this._socket = peerSocket;
this._bindHandlersToInboundSocket(this._socket);
}
Expand Down
19 changes: 18 additions & 1 deletion elements/lisk-p2p/src/peer/outbound.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import {
REMOTE_RPC_GET_NODE_INFO,
} from './base';

import { EVENT_PING } from './inbound';

import {
P2PDiscoveredPeerInfo,
P2PMessagePacket,
Expand All @@ -52,6 +54,7 @@ export const EVENT_CONNECT_OUTBOUND = 'connectOutbound';
export const EVENT_CONNECT_ABORT_OUTBOUND = 'connectAbortOutbound';
export const EVENT_CLOSE_OUTBOUND = 'closeOutbound';
export const EVENT_OUTBOUND_SOCKET_ERROR = 'outboundSocketError';
export const RESPONSE_PONG = 'pong';

export interface PeerInfoAndOutboundConnection {
readonly peerInfo: P2PDiscoveredPeerInfo;
Expand Down Expand Up @@ -165,6 +168,13 @@ export class OutboundPeer extends Peer {
});
});

outboundSocket.on(
EVENT_PING,
(_: undefined, res: (_: undefined, data: string) => void) => {
res(undefined, RESPONSE_PONG);
},
);

// Bind RPC and remote event handlers
outboundSocket.on(REMOTE_EVENT_RPC_REQUEST, this._handleRawRPC);
outboundSocket.on(REMOTE_EVENT_MESSAGE, this._handleRawMessage);
Expand Down Expand Up @@ -202,6 +212,7 @@ export class OutboundPeer extends Peer {
'postTransactions',
this._handleRawLegacyMessagePostTransactions,
);
outboundSocket.off(EVENT_PING);
}
}

Expand Down Expand Up @@ -248,6 +259,12 @@ export const connectAndRequest = async (
// Bind an error handler immediately after creating the socket; otherwise errors may crash the process
// tslint:disable-next-line no-empty
outboundSocket.on('error', () => {});
outboundSocket.on(
EVENT_PING,
(_: undefined, res: (_: undefined, data: string) => void) => {
res(undefined, RESPONSE_PONG);
},
);

// tslint:disable-next-line no-let
let disconnectStatusCode: number;
Expand Down Expand Up @@ -321,7 +338,7 @@ export const connectAndFetchPeerInfo = async (
ip: basicPeerInfo.ipAddress,
wsPort: basicPeerInfo.wsPort,
};

return validatePeerInfo(rawPeerInfo);
} catch (error) {
throw new FetchPeerStatusError(
Expand Down
Loading