diff --git a/package.json b/package.json index 0f1c8fd..daf3972 100644 --- a/package.json +++ b/package.json @@ -127,10 +127,10 @@ }, "scripts": { "lint": "aegir lint", - "dep-check": "aegir dep-check", + "dep-check": "aegir dep-check dist/src/**/*.js dist/test/**/*.js", "build": "tsc", "pretest": "npm run build", - "test": "aegir test -f ./dist/test", + "test": "aegir test -f \"./dist/test/**/*.js\"", "test:chrome": "npm run test -- -t browser --cov", "test:chrome-webworker": "npm run test -- -t webworker", "test:firefox": "npm run test -- -t browser -- --browser firefox", @@ -140,24 +140,24 @@ "release": "semantic-release" }, "dependencies": { - "@libp2p/logger": "^1.0.3", - "@libp2p/tracked-map": "^1.0.1", + "@libp2p/logger": "^1.1.2", + "@libp2p/tracked-map": "^1.0.4", "abortable-iterator": "^4.0.2", "any-signal": "^3.0.0", "err-code": "^3.0.1", "it-pipe": "^2.0.3", "it-pushable": "^2.0.1", "it-stream-types": "^1.0.4", - "uint8arraylist": "^1.2.0", + "uint8arraylist": "^1.4.0", "varint": "^6.0.0" }, "devDependencies": { - "@libp2p/interface-compliance-tests": "^1.0.7", - "@libp2p/interfaces": "^1.1.1", + "@libp2p/interface-compliance-tests": "^1.1.16", + "@libp2p/interfaces": "^1.3.14", "@types/varint": "^6.0.0", "aegir": "^36.1.3", - "cborg": "^1.2.1", - "iso-random-stream": "^2.0.0", + "cborg": "^1.8.1", + "iso-random-stream": "^2.0.2", "it-all": "^1.0.6", "it-drain": "^1.0.5", "it-foreach": "^0.1.1", diff --git a/src/index.ts b/src/index.ts index 8fe53b6..8e2659b 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,257 +1,15 @@ -import { pipe } from 'it-pipe' -import { Pushable, pushableV } from 'it-pushable' -import { abortableSource } from 'abortable-iterator' -import { encode } from './encode.js' -import { decode } from './decode.js' -import { restrictSize } from './restrict-size.js' -import { MessageTypes, MessageTypeNames, Message } from './message-types.js' -import { createStream } from './stream.js' -import { toString as uint8ArrayToString } from 'uint8arrays' -import { trackedMap } from '@libp2p/tracked-map' -import { logger } from '@libp2p/logger' -import type { Sink } from 'it-stream-types' -import type { Muxer, MuxerOptions } from '@libp2p/interfaces/stream-muxer' -import type { Stream } from '@libp2p/interfaces/connection' -import type { ComponentMetricsTracker } from '@libp2p/interfaces/metrics' -import each from 'it-foreach' +import type { Components } from '@libp2p/interfaces/components' +import type { StreamMuxerFactory, StreamMuxerInit } from '@libp2p/interfaces/stream-muxer' +import { MplexStreamMuxer } from './mplex.js' -const log = logger('libp2p:mplex') - -function printMessage (msg: Message) { - const output: any = { - ...msg, - type: `${MessageTypeNames[msg.type]} (${msg.type})` - } - - if (msg.type === MessageTypes.NEW_STREAM) { - output.data = uint8ArrayToString(msg.data instanceof Uint8Array ? msg.data : msg.data.slice()) - } - - if (msg.type === MessageTypes.MESSAGE_INITIATOR || msg.type === MessageTypes.MESSAGE_RECEIVER) { - output.data = uint8ArrayToString(msg.data instanceof Uint8Array ? msg.data : msg.data.slice(), 'base16') - } - - return output -} - -export interface MplexStream extends Stream { - source: Pushable -} - -export interface MplexOptions extends MuxerOptions { +export interface MplexInit extends StreamMuxerInit { maxMsgSize?: number - metrics?: ComponentMetricsTracker } -export class Mplex implements Muxer { - static multicodec = '/mplex/6.7.0' - - public sink: Sink - public source: AsyncIterable - - private _streamId: number - private readonly _streams: { initiators: Map, receivers: Map } - private readonly _options: MplexOptions - private readonly _source: { push: (val: Message) => void, end: (err?: Error) => void } - - constructor (options?: MplexOptions) { - options = options ?? {} - - this._streamId = 0 - this._streams = { - /** - * Stream to ids map - */ - initiators: trackedMap({ metrics: options.metrics, component: 'mplex', metric: 'initiatorStreams' }), - /** - * Stream to ids map - */ - receivers: trackedMap({ metrics: options.metrics, component: 'mplex', metric: 'receiverStreams' }) - } - this._options = options - - /** - * An iterable sink - */ - this.sink = this._createSink() - - /** - * An iterable source - */ - const source = this._createSource() - this._source = source - this.source = source - } - - /** - * Returns a Map of streams and their ids - */ - get streams () { - // Inbound and Outbound streams may have the same ids, so we need to make those unique - const streams: Stream[] = [] - this._streams.initiators.forEach(stream => { - streams.push(stream) - }) - this._streams.receivers.forEach(stream => { - streams.push(stream) - }) - return streams - } - - /** - * Initiate a new stream with the given name. If no name is - * provided, the id of the stream will be used. - */ - newStream (name?: string): Stream { - const id = this._streamId++ - name = name == null ? id.toString() : name.toString() - const registry = this._streams.initiators - return this._newStream({ id, name, type: 'initiator', registry }) - } - - /** - * Called whenever an inbound stream is created - */ - _newReceiverStream (options: { id: number, name: string }) { - const { id, name } = options - const registry = this._streams.receivers - return this._newStream({ id, name, type: 'receiver', registry }) - } - - _newStream (options: { id: number, name: string, type: 'initiator' | 'receiver', registry: Map }) { - const { id, name, type, registry } = options - - log('new %s stream %s %s', type, id, name) - - if (registry.has(id)) { - throw new Error(`${type} stream ${id} already exists!`) - } - - const send = (msg: Message) => { - if (log.enabled) { - log('%s stream %s send', type, id, printMessage(msg)) - } - - if (msg.type === MessageTypes.NEW_STREAM || msg.type === MessageTypes.MESSAGE_INITIATOR || msg.type === MessageTypes.MESSAGE_RECEIVER) { - msg.data = msg.data instanceof Uint8Array ? msg.data : msg.data.slice() - } - - this._source.push(msg) - } - - const onEnd = () => { - log('%s stream %s %s ended', type, id, name) - registry.delete(id) - - if (this._options.onStreamEnd != null) { - this._options.onStreamEnd(stream) - } - } - - const stream = createStream({ id, name, send, type, onEnd, maxMsgSize: this._options.maxMsgSize }) - registry.set(id, stream) - return stream - } - - /** - * Creates a sink with an abortable source. Incoming messages will - * also have their size restricted. All messages will be varint decoded. - */ - _createSink () { - const sink: Sink = async source => { - if (this._options.signal != null) { - source = abortableSource(source, this._options.signal) - } - - try { - await pipe( - source, - source => each(source, (buf) => { - // console.info('incoming', uint8ArrayToString(buf, 'base64')) - }), - decode, - restrictSize(this._options.maxMsgSize), - async source => { - for await (const msg of source) { - this._handleIncoming(msg) - } - } - ) - - this._source.end() - } catch (err: any) { - log('error in sink', err) - this._source.end(err) // End the source with an error - } - } - - return sink - } - - /** - * Creates a source that restricts outgoing message sizes - * and varint encodes them - */ - _createSource () { - const onEnd = (err?: Error) => { - const { initiators, receivers } = this._streams - // Abort all the things! - for (const s of initiators.values()) { - s.abort(err) - } - for (const s of receivers.values()) { - s.abort(err) - } - } - const source = pushableV({ onEnd }) - - return Object.assign(encode(source), { - push: source.push, - end: source.end, - return: source.return - }) - } - - _handleIncoming (message: Message) { - const { id, type } = message - - if (log.enabled) { - log('incoming message', printMessage(message)) - } - - // Create a new stream? - if (message.type === MessageTypes.NEW_STREAM) { - const stream = this._newReceiverStream({ id, name: uint8ArrayToString(message.data instanceof Uint8Array ? message.data : message.data.slice()) }) - - if (this._options.onIncomingStream != null) { - this._options.onIncomingStream(stream) - } - - return - } - - const list = (type & 1) === 1 ? this._streams.initiators : this._streams.receivers - const stream = list.get(id) - - if (stream == null) { - return log('missing stream %s', id) - } +export class Mplex implements StreamMuxerFactory { + public protocol = '/mplex/6.7.0' - switch (type) { - case MessageTypes.MESSAGE_INITIATOR: - case MessageTypes.MESSAGE_RECEIVER: - stream.source.push(message.data.slice()) - break - case MessageTypes.CLOSE_INITIATOR: - case MessageTypes.CLOSE_RECEIVER: - stream.close() - break - case MessageTypes.RESET_INITIATOR: - case MessageTypes.RESET_RECEIVER: - stream.reset() - break - default: - log('unknown message type %s', type) - } + createStreamMuxer (components: Components, init?: MplexInit) { + return new MplexStreamMuxer(components, init) } } diff --git a/src/mplex.ts b/src/mplex.ts new file mode 100644 index 0000000..d037fd7 --- /dev/null +++ b/src/mplex.ts @@ -0,0 +1,256 @@ +import { pipe } from 'it-pipe' +import { Pushable, pushableV } from 'it-pushable' +import { abortableSource } from 'abortable-iterator' +import { encode } from './encode.js' +import { decode } from './decode.js' +import { restrictSize } from './restrict-size.js' +import { MessageTypes, MessageTypeNames, Message } from './message-types.js' +import { createStream } from './stream.js' +import { toString as uint8ArrayToString } from 'uint8arrays' +import { trackedMap } from '@libp2p/tracked-map' +import { logger } from '@libp2p/logger' +import type { Components } from '@libp2p/interfaces/components' +import type { Sink } from 'it-stream-types' +import type { StreamMuxer, StreamMuxerInit } from '@libp2p/interfaces/stream-muxer' +import type { Stream } from '@libp2p/interfaces/connection' + +const log = logger('libp2p:mplex') + +function printMessage (msg: Message) { + const output: any = { + ...msg, + type: `${MessageTypeNames[msg.type]} (${msg.type})` + } + + if (msg.type === MessageTypes.NEW_STREAM) { + output.data = uint8ArrayToString(msg.data instanceof Uint8Array ? msg.data : msg.data.slice()) + } + + if (msg.type === MessageTypes.MESSAGE_INITIATOR || msg.type === MessageTypes.MESSAGE_RECEIVER) { + output.data = uint8ArrayToString(msg.data instanceof Uint8Array ? msg.data : msg.data.slice(), 'base16') + } + + return output +} + +export interface MplexStream extends Stream { + source: Pushable +} + +export interface MplexInit extends StreamMuxerInit { + maxMsgSize?: number +} + +export class MplexStreamMuxer implements StreamMuxer { + public protocol = '/mplex/6.7.0' + + public sink: Sink + public source: AsyncIterable + + private _streamId: number + private readonly _streams: { initiators: Map, receivers: Map } + private readonly _init: MplexInit + private readonly _source: { push: (val: Message) => void, end: (err?: Error) => void } + + constructor (components: Components, init?: MplexInit) { + init = init ?? {} + + this._streamId = 0 + this._streams = { + /** + * Stream to ids map + */ + initiators: trackedMap({ metrics: components.getMetrics(), component: 'mplex', metric: 'initiatorStreams' }), + /** + * Stream to ids map + */ + receivers: trackedMap({ metrics: components.getMetrics(), component: 'mplex', metric: 'receiverStreams' }) + } + this._init = init + + /** + * An iterable sink + */ + this.sink = this._createSink() + + /** + * An iterable source + */ + const source = this._createSource() + this._source = source + this.source = source + } + + init (components: Components) { + + } + + /** + * Returns a Map of streams and their ids + */ + get streams () { + // Inbound and Outbound streams may have the same ids, so we need to make those unique + const streams: Stream[] = [] + this._streams.initiators.forEach(stream => { + streams.push(stream) + }) + this._streams.receivers.forEach(stream => { + streams.push(stream) + }) + return streams + } + + /** + * Initiate a new stream with the given name. If no name is + * provided, the id of the stream will be used. + */ + newStream (name?: string): Stream { + const id = this._streamId++ + name = name == null ? id.toString() : name.toString() + const registry = this._streams.initiators + return this._newStream({ id, name, type: 'initiator', registry }) + } + + /** + * Called whenever an inbound stream is created + */ + _newReceiverStream (options: { id: number, name: string }) { + const { id, name } = options + const registry = this._streams.receivers + return this._newStream({ id, name, type: 'receiver', registry }) + } + + _newStream (options: { id: number, name: string, type: 'initiator' | 'receiver', registry: Map }) { + const { id, name, type, registry } = options + + log('new %s stream %s %s', type, id, name) + + if (registry.has(id)) { + throw new Error(`${type} stream ${id} already exists!`) + } + + const send = (msg: Message) => { + if (log.enabled) { + log.trace('%s stream %s send', type, id, printMessage(msg)) + } + + if (msg.type === MessageTypes.NEW_STREAM || msg.type === MessageTypes.MESSAGE_INITIATOR || msg.type === MessageTypes.MESSAGE_RECEIVER) { + msg.data = msg.data instanceof Uint8Array ? msg.data : msg.data.slice() + } + + this._source.push(msg) + } + + const onEnd = () => { + log('%s stream %s %s ended', type, id, name) + registry.delete(id) + + if (this._init.onStreamEnd != null) { + this._init.onStreamEnd(stream) + } + } + + const stream = createStream({ id, name, send, type, onEnd, maxMsgSize: this._init.maxMsgSize }) + registry.set(id, stream) + return stream + } + + /** + * Creates a sink with an abortable source. Incoming messages will + * also have their size restricted. All messages will be varint decoded. + */ + _createSink () { + const sink: Sink = async source => { + if (this._init.signal != null) { + source = abortableSource(source, this._init.signal) + } + + try { + await pipe( + source, + decode, + restrictSize(this._init.maxMsgSize), + async source => { + for await (const msg of source) { + this._handleIncoming(msg) + } + } + ) + + this._source.end() + } catch (err: any) { + log('error in sink', err) + this._source.end(err) // End the source with an error + } + } + + return sink + } + + /** + * Creates a source that restricts outgoing message sizes + * and varint encodes them + */ + _createSource () { + const onEnd = (err?: Error) => { + const { initiators, receivers } = this._streams + // Abort all the things! + for (const s of initiators.values()) { + s.abort(err) + } + for (const s of receivers.values()) { + s.abort(err) + } + } + const source = pushableV({ onEnd }) + + return Object.assign(encode(source), { + push: source.push, + end: source.end, + return: source.return + }) + } + + _handleIncoming (message: Message) { + const { id, type } = message + + if (log.enabled) { + log.trace('incoming message', printMessage(message)) + } + + // Create a new stream? + if (message.type === MessageTypes.NEW_STREAM) { + const stream = this._newReceiverStream({ id, name: uint8ArrayToString(message.data instanceof Uint8Array ? message.data : message.data.slice()) }) + + if (this._init.onIncomingStream != null) { + this._init.onIncomingStream(stream) + } + + return + } + + const list = (type & 1) === 1 ? this._streams.initiators : this._streams.receivers + const stream = list.get(id) + + if (stream == null) { + return log('missing stream %s', id) + } + + switch (type) { + case MessageTypes.MESSAGE_INITIATOR: + case MessageTypes.MESSAGE_RECEIVER: + stream.source.push(message.data.slice()) + break + case MessageTypes.CLOSE_INITIATOR: + case MessageTypes.CLOSE_RECEIVER: + stream.close() + break + case MessageTypes.RESET_INITIATOR: + case MessageTypes.RESET_RECEIVER: + stream.reset() + break + default: + log('unknown message type %s', type) + } + } +} diff --git a/src/stream.ts b/src/stream.ts index dcaa506..ea0a371 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -10,7 +10,7 @@ import { logger } from '@libp2p/logger' import type { Message } from './message-types.js' import type { Timeline } from '@libp2p/interfaces/connection' import type { Source } from 'it-stream-types' -import type { MplexStream } from './index.js' +import type { MplexStream } from './mplex.js' const log = logger('libp2p:mplex:stream') @@ -49,7 +49,7 @@ export function createStream (options: Options): MplexStream { } sourceEnded = true - log('%s stream %s source end', type, streamName, err) + log.trace('%s stream %s source end', type, streamName, err) if (err != null && endErr == null) { endErr = err @@ -70,7 +70,7 @@ export function createStream (options: Options): MplexStream { } sinkEnded = true - log('%s stream %s sink end - err: %o', type, streamName, err) + log.trace('%s stream %s sink end - err: %o', type, streamName, err) if (err != null && endErr == null) { endErr = err @@ -92,7 +92,7 @@ export function createStream (options: Options): MplexStream { }, // Close for reading and writing (local error) abort: (err?: Error) => { - log('%s stream %s abort', type, streamName, err) + log.trace('%s stream %s abort', type, streamName, err) // End the source with the passed error stream.source.end(err) abortController.abort() @@ -148,13 +148,13 @@ export function createStream (options: Options): MplexStream { // Send no more data if this stream was remotely reset if (err.code === ERR_MPLEX_STREAM_RESET) { - log('%s stream %s reset', type, name) + log.trace('%s stream %s reset', type, name) } else { - log('%s stream %s error', type, name, err) + log.trace('%s stream %s error', type, name, err) try { send({ id, type: Types.RESET }) } catch (err) { - log('%s stream %s error sending reset', type, name, err) + log.trace('%s stream %s error sending reset', type, name, err) } } @@ -166,7 +166,7 @@ export function createStream (options: Options): MplexStream { try { send({ id, type: Types.CLOSE }) } catch (err) { - log('%s stream %s error sending close', type, name, err) + log.trace('%s stream %s error sending close', type, name, err) } onSinkEnd() diff --git a/test/compliance.spec.ts b/test/compliance.spec.ts index 49955f2..ce6224a 100644 --- a/test/compliance.spec.ts +++ b/test/compliance.spec.ts @@ -5,8 +5,8 @@ import { Mplex } from '../src/index.js' describe('compliance', () => { tests({ - async setup (options) { - return new Mplex(options) + async setup () { + return new Mplex() }, async teardown () {} }) diff --git a/test/stream.spec.ts b/test/stream.spec.ts index b86e673..03e781e 100644 --- a/test/stream.spec.ts +++ b/test/stream.spec.ts @@ -14,7 +14,7 @@ import { MessageTypes, MessageTypeNames } from '../src/message-types.js' import { fromString as uint8ArrayFromString } from 'uint8arrays' import { messageWithBytes } from './fixtures/utils.js' import type { Message } from '../src/message-types.js' -import type { MplexStream } from '../src/index.js' +import type { MplexStream } from '../src/mplex.js' function randomInput (min = 1, max = 100) { return Array.from(Array(randomInt(min, max)), () => randomBytes(randomInt(1, 128)))