Skip to content
This repository has been archived by the owner on Jun 11, 2024. It is now read-only.

Inbound peer eviction method - Closes #3721 #3824

Merged
merged 16 commits into from
Jun 26, 2019
7 changes: 7 additions & 0 deletions elements/lisk-p2p/src/p2p.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ const BASE_10_RADIX = 10;
const DEFAULT_MAX_OUTBOUND_CONNECTIONS = 20;
const DEFAULT_MAX_INBOUND_CONNECTIONS = 100;
const DEFAULT_OUTBOUND_SHUFFLE_INTERVAL = 300000;
const DEFAULT_EVICTION_PROTECTION = 0.58;
mitsuaki-u marked this conversation as resolved.
Show resolved Hide resolved

const selectRandomPeerSample = (
peerList: ReadonlyArray<P2PPeerInfo>,
Expand Down Expand Up @@ -334,6 +335,7 @@ export class P2P extends EventEmitter {
// Re-emit the error to allow it to bubble up the class hierarchy.
this.emit(EVENT_INBOUND_SOCKET_ERROR, error);
};

this._peerPool = new PeerPool({
connectTimeout: config.connectTimeout,
ackTimeout: config.ackTimeout,
Expand Down Expand Up @@ -362,6 +364,11 @@ export class P2P extends EventEmitter {
outboundShuffleInterval: config.outboundShuffleInterval
? config.outboundShuffleInterval
: DEFAULT_OUTBOUND_SHUFFLE_INTERVAL,
evictionProtectionEnabled:
config.evictionProtectionEnabled === false ? false : true,
diego-G marked this conversation as resolved.
Show resolved Hide resolved
evictionProtectionRatio: config.evictionProtectionRatio
? config.evictionProtectionRatio
: DEFAULT_EVICTION_PROTECTION,
});

this._bindHandlersToPeerPool(this._peerPool);
Expand Down
19 changes: 18 additions & 1 deletion elements/lisk-p2p/src/p2p_request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,23 @@ export class P2PRequest {
data: unknown,
peerId: string,
rate: number,
productivity: {
// tslint:disable-next-line: readonly-keyword
requestCounter: number;
// tslint:disable-next-line: readonly-keyword
responseCounter: number;
// tslint:disable-next-line: readonly-keyword
responseRate: number;
// tslint:disable-next-line: readonly-keyword
lastResponded: number;
},
respondCallback: (responseError?: Error, responseData?: unknown) => void,
) {
this._procedure = procedure;
this._data = data;
this._peerId = peerId;
this._rate = rate;
productivity.requestCounter += 1;
this._respondCallback = (
responseError?: Error,
responsePacket?: P2PResponsePacket,
Expand All @@ -48,10 +59,16 @@ export class P2PRequest {
);
}
this._wasResponseSent = true;
// We assume peer performed useful work and update peer response rate
if (!responseError && responsePacket) {
productivity.lastResponded = Date.now();
productivity.responseCounter += 1;
productivity.responseRate =
productivity.responseCounter / productivity.requestCounter;
}
respondCallback(responseError, responsePacket);
};
this._wasResponseSent = false;
this._peerId = peerId;
}

public get procedure(): string {
Expand Down
2 changes: 2 additions & 0 deletions elements/lisk-p2p/src/p2p_types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ export interface P2PConfig {
readonly peerBanTime?: number;
readonly sendPeerLimit?: number;
readonly outboundShuffleInterval?: number;
readonly evictionProtectionEnabled?: boolean;
readonly evictionProtectionRatio?: number;
}

// Network info exposed by the P2P library.
Expand Down
39 changes: 35 additions & 4 deletions elements/lisk-p2p/src/peer/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,13 @@ export const DEFAULT_CONNECT_TIMEOUT = 2000;
export const DEFAULT_ACK_TIMEOUT = 2000;
export const DEFAULT_REPUTATION_SCORE = 100;
export const DEFAULT_RATE_INTERVAL = 1000;
export const DEFAULT_PRODUCTIVITY_RESET_INTERVAL = 2000;
export const DEFAULT_PRODUCTIVITY = {
requestCounter: 0,
responseCounter: 0,
responseRate: 0,
lastResponded: 0,
mitsuaki-u marked this conversation as resolved.
Show resolved Hide resolved
};

export enum ConnectionState {
CONNECTING = 'connecting',
Expand Down Expand Up @@ -128,10 +135,19 @@ export class Peer extends EventEmitter {
protected readonly _ipAddress: string;
protected readonly _wsPort: number;
private readonly _height: number;
private _reputation: number;
public reputation: number;
public latency: number;
public connectTime: number;
mitsuaki-u marked this conversation as resolved.
Show resolved Hide resolved
public productivity: {
mitsuaki-u marked this conversation as resolved.
Show resolved Hide resolved
requestCounter: number;
responseCounter: number;
responseRate: number;
lastResponded: number;
};
private _callCounter: Map<string, number>;
private readonly _counterResetInterval: NodeJS.Timer;
protected _peerInfo: P2PPeerInfo;
private readonly _productivityResetInterval: NodeJS.Timer;
protected readonly _peerConfig: PeerConfig;
protected _nodeInfo: P2PNodeInfo | undefined;
protected readonly _handleRawRPC: (
Expand All @@ -158,11 +174,23 @@ export class Peer extends EventEmitter {
this._wsPort = peerInfo.wsPort;
this._id = constructPeerId(this._ipAddress, this._wsPort);
this._height = peerInfo.height ? (peerInfo.height as number) : 0;
this._reputation = DEFAULT_REPUTATION_SCORE;
this.reputation = DEFAULT_REPUTATION_SCORE;
this.latency = 0;
this.connectTime = Date.now();
this._callCounter = new Map();
this._counterResetInterval = setInterval(() => {
this._callCounter = new Map();
}, DEFAULT_RATE_INTERVAL);
this._productivityResetInterval = setInterval(() => {
// If peer has not recently responded, reset productivity to 0
if (
this.productivity.lastResponded <
Date.now() - DEFAULT_PRODUCTIVITY_RESET_INTERVAL
) {
this.productivity = DEFAULT_PRODUCTIVITY;
}
}, DEFAULT_PRODUCTIVITY_RESET_INTERVAL);
this.productivity = DEFAULT_PRODUCTIVITY;

// This needs to be an arrow function so that it can be used as a listener.
this._handleRawRPC = (
Expand All @@ -184,11 +212,13 @@ export class Peer extends EventEmitter {
}

const rate = this._getPeerRate(packet as P2PRequestPacket);

const request = new P2PRequest(
rawRequest.procedure,
rawRequest.data,
this._id,
rate,
this.productivity,
respond,
);

Expand Down Expand Up @@ -277,8 +307,8 @@ export class Peer extends EventEmitter {
}

public applyPenalty(penalty: number): void {
this._reputation -= penalty;
if (this._reputation <= 0) {
this.reputation -= penalty;
if (this.reputation <= 0) {
this._banPeer();
}
}
Expand Down Expand Up @@ -324,6 +354,7 @@ export class Peer extends EventEmitter {

public disconnect(code: number = 1000, reason?: string): void {
clearInterval(this._counterResetInterval);
clearInterval(this._productivityResetInterval);
if (this._socket) {
this._socket.destroy(code, reason);
}
Expand Down
17 changes: 15 additions & 2 deletions elements/lisk-p2p/src/peer/inbound.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
* Removal or modification of this copyright notice is prohibited.
*
*/

import {
Peer,
PeerConfig,
Expand All @@ -27,14 +26,16 @@ import { SCServerSocket } from 'socketcluster-server';

export const EVENT_CLOSE_INBOUND = 'closeInbound';
export const EVENT_INBOUND_SOCKET_ERROR = 'inboundSocketError';

const DEFAULT_PING_INTERVAL = 500;
mitsuaki-u marked this conversation as resolved.
Show resolved Hide resolved
export class InboundPeer extends Peer {
protected _socket: SCServerSocketUpdated;
protected readonly _handleInboundSocketError: (error: Error) => void;
protected readonly _handleInboundSocketClose: (
code: number,
reason: string,
) => void;
protected readonly _handlePong: (pingTime: number) => void;
private readonly _pingIntervalId: NodeJS.Timer | undefined;

public constructor(
peerInfo: P2PDiscoveredPeerInfo,
Expand All @@ -46,13 +47,23 @@ export class InboundPeer extends Peer {
this.emit(EVENT_INBOUND_SOCKET_ERROR, error);
};
this._handleInboundSocketClose = (code, reason) => {
if (this._pingIntervalId) {
clearInterval(this._pingIntervalId);
}
this.emit(EVENT_CLOSE_INBOUND, {
peerInfo,
code,
reason,
});
};
this._handlePong = (responseTime: number) => {
const latency = Date.now() - responseTime;
this.latency = latency;
};
this._socket = peerSocket;
this._pingIntervalId = setInterval(() => {
this._socket.emit('ping', Date.now());
}, DEFAULT_PING_INTERVAL);
this._bindHandlersToInboundSocket(this._socket);
}

Expand Down Expand Up @@ -86,6 +97,7 @@ export class InboundPeer extends Peer {
'postTransactions',
this._handleRawLegacyMessagePostTransactions,
);
inboundSocket.on('pong', this._handlePong);
mitsuaki-u marked this conversation as resolved.
Show resolved Hide resolved
}

// All event handlers for the inbound socket should be unbound in this method.
Expand All @@ -106,5 +118,6 @@ export class InboundPeer extends Peer {
'postTransactions',
this._handleRawLegacyMessagePostTransactions,
);
inboundSocket.off('pong', this._handlePong);
mitsuaki-u marked this conversation as resolved.
Show resolved Hide resolved
}
}
8 changes: 8 additions & 0 deletions elements/lisk-p2p/src/peer/outbound.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ export class OutboundPeer extends Peer {
});
});

outboundSocket.on('ping', () => {
this.emit('pong', Date.now());
mitsuaki-u marked this conversation as resolved.
Show resolved Hide resolved
});

// Bind RPC and remote event handlers
outboundSocket.on(REMOTE_EVENT_RPC_REQUEST, this._handleRawRPC);
outboundSocket.on(REMOTE_EVENT_MESSAGE, this._handleRawMessage);
Expand Down Expand Up @@ -202,6 +206,7 @@ export class OutboundPeer extends Peer {
'postTransactions',
this._handleRawLegacyMessagePostTransactions,
);
outboundSocket.off('ping');
}
}

Expand Down Expand Up @@ -248,6 +253,9 @@ export const connectAndRequest = async (
// Bind an error handler immediately after creating the socket; otherwise errors may crash the process
// tslint:disable-next-line no-empty
outboundSocket.on('error', () => {});
outboundSocket.on('ping' as any, () => {
outboundSocket.emit('pong', Date.now());
});

// tslint:disable-next-line no-let
let disconnectStatusCode: number;
Expand Down
69 changes: 67 additions & 2 deletions elements/lisk-p2p/src/peer_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ interface PeerPoolConfig {
readonly maxOutboundConnections: number;
readonly maxInboundConnections: number;
readonly outboundShuffleInterval?: number;
readonly evictionProtectionEnabled?: boolean;
readonly evictionProtectionRatio?: number;
mitsuaki-u marked this conversation as resolved.
Show resolved Hide resolved
}

export const MAX_PEER_LIST_BATCH_SIZE = 100;
Expand Down Expand Up @@ -343,7 +345,11 @@ export class PeerPool extends EventEmitter {
): Peer {
const inboundPeers = this.getPeers(InboundPeer);
if (inboundPeers.length >= this._maxInboundConnections) {
this.removePeer(shuffle(inboundPeers)[0].id);
if (!this._peerPoolConfig.evictionProtectionEnabled) {
this.removePeer(shuffle(inboundPeers)[0].id);
} else {
this._evictPeer(InboundPeer);
}
}

const peerConfig = {
Expand Down Expand Up @@ -490,16 +496,75 @@ export class PeerPool extends EventEmitter {
})();
}

private _selectPeersForEviction(peers: Peer[]): Peer[] {
const PEER_PROTECTION_PERCENTAGE = 0.08;
mitsuaki-u marked this conversation as resolved.
Show resolved Hide resolved

// Cannot manipulate without physically moving nodes closer to the target.
const LATENCY_PERCENTAGE =
(this._peerPoolConfig.evictionProtectionRatio as number) *
PEER_PROTECTION_PERCENTAGE;
const PROXIMAL_PEER_COUNT = Math.ceil(
(this._peerPoolConfig.evictionProtectionRatio as number) *
LATENCY_PERCENTAGE *
peers.length,
);
const filteredPeersByLatency = peers
.sort((a, b) => (a.latency > b.latency ? 1 : -1))
.slice(PROXIMAL_PEER_COUNT, peers.length);

if (filteredPeersByLatency.length <= 1) {
return filteredPeersByLatency;
}

// Cannot manipulate this metric without performing useful work.
const RESPONSIVENESS_PERCENTAGE =
(this._peerPoolConfig.evictionProtectionRatio as number) *
PEER_PROTECTION_PERCENTAGE;
const RESPONSIVE_PEER_COUNT = Math.ceil(
(this._peerPoolConfig.evictionProtectionRatio as number) *
RESPONSIVENESS_PERCENTAGE *
filteredPeersByLatency.length,
);
const filteredPeersByResponsiveness = filteredPeersByLatency
.sort((a, b) =>
a.productivity.responseRate > b.productivity.responseRate ? -1 : 1,
)
.slice(RESPONSIVE_PEER_COUNT, filteredPeersByLatency.length);

if (filteredPeersByResponsiveness.length <= 1) {
return filteredPeersByResponsiveness;
}

// Protect remaining half of peers by longevity, precludes attacks that start later.
const LONGEVITY_PERCENTAGE = 0.5;
const STEADY_PEER_COUNT =
filteredPeersByResponsiveness.length * LONGEVITY_PERCENTAGE;
const filteredPeersByConnectTime = filteredPeersByResponsiveness
.sort((a, b) => (a.connectTime > b.connectTime ? 1 : -1))
.slice(
Math.ceil(STEADY_PEER_COUNT),
filteredPeersByResponsiveness.length,
);

return filteredPeersByConnectTime;
}

private _evictPeer(kind: typeof InboundPeer | typeof OutboundPeer): void {
const peers = this.getPeers(kind);
if (peers.length === 0) {
if (peers.length < 1) {
return;
}

if (kind === OutboundPeer) {
const peerIdToRemove = constructPeerIdFromPeerInfo(shuffle(peers)[0]);
this.removePeer(peerIdToRemove);
}

if (kind === InboundPeer) {
const evictionCandidates = this._selectPeersForEviction([...peers]);
const peerToEvict = shuffle(evictionCandidates)[0];
this.removePeer(peerToEvict.id);
}
}

private _bindHandlersToPeer(peer: Peer): void {
Expand Down
Loading