Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: close streams gracefully #458

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28,823 changes: 13,561 additions & 15,262 deletions package-lock.json

Large diffs are not rendered by default.

72 changes: 28 additions & 44 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
"version": "9.1.0",
"description": "A typescript implementation of gossipsub",
"files": [
"src",
"dist",
"src"
"!dist/test",
"!**/*.tsbuildinfo"
],
"type": "module",
"types": "dist/src/index.d.ts",
Expand Down Expand Up @@ -70,71 +72,53 @@
},
"homepage": "https://github.com/ChainSafe/js-libp2p-gossipsub#readme",
"dependencies": {
"@libp2p/crypto": "^1.0.3",
"@libp2p/interface-connection": "^5.0.1",
"@libp2p/interface-connection-manager": "^3.0.1",
"@libp2p/interface-keys": "^1.0.3",
"@libp2p/interface-peer-id": "^2.0.0",
"@libp2p/interface-peer-store": "^2.0.3",
"@libp2p/interface-pubsub": "^4.0.0",
"@libp2p/interface-registrar": "^2.0.3",
"@libp2p/interfaces": "^3.2.0",
"@libp2p/logger": "^2.0.0",
"@libp2p/peer-id": "^2.0.0",
"@libp2p/peer-record": "^5.0.0",
"@libp2p/pubsub": "^7.0.1",
"@libp2p/topology": "^4.0.0",
"@multiformats/multiaddr": "^12.0.0",
"@libp2p/crypto": "^2.0.0",
"@libp2p/interface": "^0.1.0",
"@libp2p/interface-internal": "^0.1.0",
"@libp2p/logger": "^3.0.0",
"@libp2p/peer-id": "^3.0.0",
"@libp2p/pubsub": "^8.0.0",
"@multiformats/multiaddr": "^12.1.3",
"abortable-iterator": "^5.0.1",
"denque": "^1.5.0",
"denque": "^2.1.0",
"it-length-prefixed": "^9.0.1",
"it-pipe": "^3.0.1",
"it-pushable": "^3.1.0",
"multiformats": "^11.0.0",
"protobufjs": "^6.11.2",
"uint8arraylist": "^2.3.2",
"uint8arrays": "^4.0.2"
"it-pushable": "^3.2.0",
"multiformats": "^12.0.1",
"protobufjs": "^7.2.4",
"uint8arraylist": "^2.4.3",
"uint8arrays": "^4.0.4"
},
"devDependencies": {
"@chainsafe/as-sha256": "^0.2.4",
"@dapplion/benchmark": "^0.2.2",
"@libp2p/floodsub": "^7.0.1",
"@libp2p/interface-libp2p": "^3.1.0",
"@libp2p/interface-mocks": "^12.0.1",
"@libp2p/interface-pubsub-compliance-tests": "^5.0.0",
"@libp2p/peer-id-factory": "^2.0.0",
"@libp2p/peer-store": "^8.1.2",
"@dapplion/benchmark": "^0.2.4",
"@libp2p/floodsub": "^8.0.0",
"@libp2p/interface-compliance-tests": "^4.0.0",
"@libp2p/peer-id-factory": "^3.0.0",
"@libp2p/peer-store": "^9.0.0",
"@types/node": "^17.0.21",
"@typescript-eslint/eslint-plugin": "^3.0.2",
"@typescript-eslint/parser": "^3.0.2",
"aegir": "^38.1.8",
"benchmark": "^2.1.4",
"aegir": "^40.0.1",
"datastore-core": "^9.1.1",
"delay": "^5.0.0",
"detect-node": "^2.1.0",
"delay": "^6.0.0",
"eslint": "^7.1.0",
"eslint-config-standard": "^14.1.1",
"eslint-plugin-import": "^2.20.2",
"eslint-plugin-node": "^11.1.0",
"eslint-plugin-prettier": "^4.0.0",
"eslint-plugin-promise": "^4.2.1",
"eslint-plugin-standard": "^4.0.1",
"it-pair": "^2.0.2",
"lodash": "^4.17.15",
"mkdirp": "^1.0.4",
"os": "^0.1.1",
"mkdirp": "^3.0.1",
"p-defer": "^4.0.0",
"p-event": "^5.0.1",
"p-event": "^6.0.0",
"p-retry": "^5.1.2",
"p-times": "^4.0.0",
"p-wait-for": "^5.0.0",
"p-wait-for": "^5.0.2",
"prettier": "^2.0.5",
"promisify-es6": "^1.0.3",
"sinon": "^15.0.3",
"sinon": "^15.1.2",
"time-cache": "^0.3.0",
"ts-node": "^10.7.0",
"ts-sinon": "^2.0.2",
"util": "^0.12.3"
"ts-sinon": "^2.0.2"
},
"engines": {
"npm": ">=8.7.0"
Expand Down
87 changes: 42 additions & 45 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
import { pipe } from 'it-pipe'
import type { Connection, Stream } from '@libp2p/interface-connection'
import type { Connection, Stream } from '@libp2p/interface/connection'
import { peerIdFromBytes, peerIdFromString } from '@libp2p/peer-id'
import { Logger, logger } from '@libp2p/logger'
import { createTopology } from '@libp2p/topology'
import type { PeerId } from '@libp2p/interface-peer-id'
import { CustomEvent, EventEmitter } from '@libp2p/interfaces/events'
import { type Logger, logger } from '@libp2p/logger'
import type { PeerId } from '@libp2p/interface/peer-id'
import { CustomEvent, EventEmitter } from '@libp2p/interface/events'

import { MessageCache, MessageCacheRecord } from './message-cache.js'
import { RPC, IRPC } from './message/rpc.js'
import { MessageCache, type MessageCacheRecord } from './message-cache.js'
import { RPC, type IRPC } from './message/rpc.js'
import * as constants from './constants.js'
import { shuffle, messageIdToString } from './utils/index.js'
import {
PeerScore,
PeerScoreParams,
PeerScoreThresholds,
type PeerScoreParams,
type PeerScoreThresholds,
createPeerScoreParams,
createPeerScoreThresholds,
PeerScoreStatsDump
type PeerScoreStatsDump
} from './score/index.js'
import { IWantTracer } from './tracer.js'
import { SimpleTimeCache } from './utils/time-cache.js'
Expand All @@ -31,56 +30,54 @@
getMetrics,
IHaveIgnoreReason,
InclusionReason,
Metrics,
MetricsRegister,
type Metrics,
type MetricsRegister,
ScorePenalty,
TopicStrToLabel,
ToSendGroupCount
type TopicStrToLabel,
type ToSendGroupCount
} from './metrics.js'
import {
MsgIdFn,
PublishConfig,
TopicStr,
MsgIdStr,
type MsgIdFn,
type PublishConfig,
type TopicStr,
type MsgIdStr,
ValidateError,
PeerIdStr,
type PeerIdStr,
MessageStatus,
RejectReason,
RejectReasonObj,
FastMsgIdFn,
AddrInfo,
DataTransform,
type RejectReasonObj,
type FastMsgIdFn,
type AddrInfo,
type DataTransform,
rejectReasonFromAcceptance,
MsgIdToStrFn,
MessageId,
PublishOpts
type MsgIdToStrFn,
type MessageId,
type PublishOpts
} from './types.js'
import { buildRawMessage, validateToRawMessage } from './utils/buildRawMessage.js'
import { msgIdFnStrictNoSign, msgIdFnStrictSign } from './utils/msgIdFn.js'
import { computeAllPeersScoreWeights } from './score/scoreMetrics.js'
import { getPublishConfigFromPeerId } from './utils/publishConfig.js'
import type { GossipsubOptsSpec } from './config.js'
import {
import type {
Message,
PublishResult,
PubSub,
PubSubEvents,
PubSubInit,
StrictNoSign,
StrictSign,
SubscriptionChangeData,
TopicValidatorFn,
TopicValidatorResult
} from '@libp2p/interface-pubsub'
import type { IncomingStreamData, Registrar } from '@libp2p/interface-registrar'
TopicValidatorFn
} from '@libp2p/interface/pubsub'
import { StrictSign, StrictNoSign, TopicValidatorResult } from '@libp2p/interface/pubsub'
import type { IncomingStreamData, Registrar } from '@libp2p/interface-internal/registrar'
import { removeFirstNItemsFromSet, removeItemsFromSet } from './utils/set.js'
import { pushable } from 'it-pushable'
import { InboundStream, OutboundStream } from './stream.js'
import { Uint8ArrayList } from 'uint8arraylist'
import { decodeRpc, DecodeRPCLimits, defaultDecodeRpcLimits } from './message/decodeRpc.js'
import { ConnectionManager } from '@libp2p/interface-connection-manager'
import { Peer, PeerStore } from '@libp2p/interface-peer-store'
import { Multiaddr } from '@multiformats/multiaddr'
import type { Uint8ArrayList } from 'uint8arraylist'
import { decodeRpc, type DecodeRPCLimits, defaultDecodeRpcLimits } from './message/decodeRpc.js'
import type { ConnectionManager } from '@libp2p/interface-internal/connection-manager'
import type { Peer, PeerStore } from '@libp2p/interface/peer-store'
import type { Multiaddr } from '@multiformats/multiaddr'
import { multiaddrToIPStr } from './utils/multiaddr.js'

type ConnectionDirection = 'inbound' | 'outbound'
Expand Down Expand Up @@ -589,10 +586,10 @@

// register protocol with topology
// Topology callbacks called on connection manager changes
const topology = createTopology({
const topology = {
onConnect: this.onPeerConnected.bind(this),
onDisconnect: this.onPeerDisconnected.bind(this)
})
}
const registrarTopologyIds = await Promise.all(
this.multicodecs.map((multicodec) => registrar.register(multicodec, topology))
)
Expand Down Expand Up @@ -698,7 +695,7 @@

const peerId = connection.remotePeer
// add peer to router
this.addPeer(peerId, connection.stat.direction, connection.remoteAddr)
this.addPeer(peerId, connection.direction, connection.remoteAddr)
// create inbound stream
this.createInboundStream(peerId, stream)
// attempt to create outbound stream
Expand All @@ -709,14 +706,14 @@
* Registrar notifies an established connection with pubsub protocol
*/
private onPeerConnected(peerId: PeerId, connection: Connection): void {
this.metrics?.newConnectionCount.inc({ status: connection.stat.status })
this.metrics?.newConnectionCount.inc({ status: connection.status })
// libp2p may emit a closed connection and never issue peer:disconnect event
// see https://github.com/ChainSafe/js-libp2p-gossipsub/issues/398
if (!this.isStarted() || connection.stat.status !== 'OPEN') {
if (!this.isStarted() || connection.status !== 'open') {
return
}

this.addPeer(peerId, connection.stat.direction, connection.remoteAddr)
this.addPeer(peerId, connection.direction, connection.remoteAddr)
this.outboundInflightQueue.push({ peerId, connection })
}

Expand Down Expand Up @@ -1733,7 +1730,7 @@
const connection = await this.components.connectionManager.openConnection(peerId)
for (const multicodec of this.multicodecs) {
for (const topology of this.components.registrar.getTopologies(multicodec)) {
topology.onConnect(peerId, connection)
topology.onConnect?.(peerId, connection)
}
}
}
Expand Down Expand Up @@ -2462,7 +2459,7 @@

try {
peerInfo = await this.components.peerStore.get(id)
} catch (err: any) {

Check warning on line 2462 in src/index.ts

View workflow job for this annotation

GitHub Actions / check

Unexpected any. Specify a different type
if (err.code !== 'ERR_NOT_FOUND') {
throw err
}
Expand Down
11 changes: 9 additions & 2 deletions src/metrics.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
import { TopicValidatorResult } from '@libp2p/interface-pubsub'
import type { TopicValidatorResult } from '@libp2p/interface/pubsub'
import type { IRPC } from './message/rpc.js'
import type { PeerScoreThresholds } from './score/peer-score-thresholds.js'
import { MessageStatus, PeerIdStr, RejectReason, RejectReasonObj, TopicStr, ValidateError } from './types.js'
import {
MessageStatus,
type PeerIdStr,
RejectReason,
type RejectReasonObj,
type TopicStr,
ValidateError
} from './types.js'

/** Topic label as provided in `topicStrToLabel` */
export type TopicLabel = string
Expand Down
2 changes: 1 addition & 1 deletion src/score/peer-score-params.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { ERR_INVALID_PEER_SCORE_PARAMS } from './constants.js'
import { CodeError } from '@libp2p/interfaces/errors'
import { CodeError } from '@libp2p/interface/errors'

// This file defines PeerScoreParams and TopicScoreParams interfaces
// as well as constructors, default constructors, and validation functions
Expand Down
2 changes: 1 addition & 1 deletion src/score/peer-score-thresholds.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { ERR_INVALID_PEER_SCORE_THRESHOLDS } from './constants.js'
import { CodeError } from '@libp2p/interfaces/errors'
import { CodeError } from '@libp2p/interface/errors'

// This file defines PeerScoreThresholds interface
// as well as a constructor, default constructor, and validation function
Expand Down
4 changes: 2 additions & 2 deletions src/score/peer-score.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { PeerScoreParams, validatePeerScoreParams } from './peer-score-params.js'
import { type PeerScoreParams, validatePeerScoreParams } from './peer-score-params.js'
import type { PeerStats, TopicStats } from './peer-stats.js'
import { computeScore } from './compute-score.js'
import { MessageDeliveries, DeliveryRecordStatus } from './message-deliveries.js'
import { logger } from '@libp2p/logger'
import { MsgIdStr, PeerIdStr, RejectReason, TopicStr, IPStr } from '../types.js'
import { type MsgIdStr, type PeerIdStr, RejectReason, type TopicStr, type IPStr } from '../types.js'
import type { Metrics, ScorePenalty } from '../metrics.js'
import { MapDef } from '../utils/set.js'

Expand Down
8 changes: 4 additions & 4 deletions src/stream.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { Stream } from '@libp2p/interface-connection'
import type { Stream } from '@libp2p/interface/connection'
import { abortableSource } from 'abortable-iterator'
import { pipe } from 'it-pipe'
import { pushable, Pushable } from 'it-pushable'
import { pushable, type Pushable } from 'it-pushable'
import { encode, decode } from 'it-length-prefixed'
import { Uint8ArrayList } from 'uint8arraylist'
import type { Uint8ArrayList } from 'uint8arraylist'

type OutboundStreamOpts = {
/** Max size in bytes for pushable buffer. If full, will throw on .push */
Expand Down Expand Up @@ -34,7 +34,7 @@ export class OutboundStream {

get protocol(): string {
// TODO remove this non-nullish assertion after https://github.com/libp2p/js-libp2p-interfaces/pull/265 is incorporated
return this.rawStream.stat.protocol!
return this.rawStream.protocol!
}

push(data: Uint8Array): void {
Expand Down
2 changes: 1 addition & 1 deletion src/tracer.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { MsgIdStr, MsgIdToStrFn, PeerIdStr, RejectReason } from './types.js'
import { type MsgIdStr, type MsgIdToStrFn, type PeerIdStr, RejectReason } from './types.js'
import type { Metrics } from './metrics.js'

/**
Expand Down
6 changes: 3 additions & 3 deletions src/types.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import type { PeerId } from '@libp2p/interface-peer-id'
import type { PrivateKey } from '@libp2p/interface-keys'
import type { PeerId } from '@libp2p/interface/peer-id'
import type { PrivateKey } from '@libp2p/interface/keys'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { RPC } from './message/rpc.js'
import { Message, TopicValidatorResult } from '@libp2p/interface-pubsub'
import { type Message, TopicValidatorResult } from '@libp2p/interface/pubsub'

export type MsgIdStr = string
export type PeerIdStr = string
Expand Down
8 changes: 4 additions & 4 deletions src/utils/buildRawMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { marshalPublicKey, unmarshalPublicKey } from '@libp2p/crypto/keys'
import { randomBytes } from '@libp2p/crypto'
import { peerIdFromBytes } from '@libp2p/peer-id'
import type { PublicKey } from '@libp2p/interface-keys'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { PublicKey } from '@libp2p/interface/keys'
import type { PeerId } from '@libp2p/interface/peer-id'
import { equals as uint8ArrayEquals } from 'uint8arrays/equals'
import { RPC } from '../message/rpc.js'
import { PublishConfig, PublishConfigType, TopicStr, ValidateError } from '../types.js'
import { StrictSign, StrictNoSign, Message } from '@libp2p/interface-pubsub'
import { type PublishConfig, PublishConfigType, type TopicStr, ValidateError } from '../types.js'
import { StrictSign, StrictNoSign, type Message } from '@libp2p/interface/pubsub'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'

export const SignPrefix = uint8ArrayFromString('libp2p-pubsub:')
Expand Down
2 changes: 1 addition & 1 deletion src/utils/msgIdFn.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { sha256 } from 'multiformats/hashes/sha2'
import type { Message } from '@libp2p/interface-pubsub'
import type { Message } from '@libp2p/interface/pubsub'
import { msgId } from '@libp2p/pubsub/utils'

/**
Expand Down
2 changes: 1 addition & 1 deletion src/utils/multiaddr.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Multiaddr } from '@multiformats/multiaddr'
import type { Multiaddr } from '@multiformats/multiaddr'
import { convertToString } from '@multiformats/multiaddr/convert'

// Protocols https://github.com/multiformats/multiaddr/blob/master/protocols.csv
Expand Down
6 changes: 3 additions & 3 deletions src/utils/publishConfig.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { unmarshalPrivateKey } from '@libp2p/crypto/keys'
import { StrictSign, StrictNoSign } from '@libp2p/interface-pubsub'
import type { PeerId } from '@libp2p/interface-peer-id'
import { PublishConfig, PublishConfigType } from '../types.js'
import { StrictSign, StrictNoSign } from '@libp2p/interface/pubsub'
import type { PeerId } from '@libp2p/interface/peer-id'
import { type PublishConfig, PublishConfigType } from '../types.js'

/**
* Prepare a PublishConfig object from a PeerId.
Expand Down
Loading
Loading