Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
fix: update interfaces (#162)
Browse files Browse the repository at this point in the history
Updates to latest code from libp2p/js-libp2p-interfaces#180
  • Loading branch information
achingbrain committed Mar 17, 2022
1 parent 3fc4505 commit ab9079c
Show file tree
Hide file tree
Showing 6 changed files with 284 additions and 270 deletions.
18 changes: 9 additions & 9 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,10 @@
},
"scripts": {
"lint": "aegir lint",
"dep-check": "aegir dep-check",
"dep-check": "aegir dep-check dist/src/**/*.js dist/test/**/*.js",
"build": "tsc",
"pretest": "npm run build",
"test": "aegir test -f ./dist/test",
"test": "aegir test -f \"./dist/test/**/*.js\"",
"test:chrome": "npm run test -- -t browser --cov",
"test:chrome-webworker": "npm run test -- -t webworker",
"test:firefox": "npm run test -- -t browser -- --browser firefox",
Expand All @@ -140,24 +140,24 @@
"release": "semantic-release"
},
"dependencies": {
"@libp2p/logger": "^1.0.3",
"@libp2p/tracked-map": "^1.0.1",
"@libp2p/logger": "^1.1.2",
"@libp2p/tracked-map": "^1.0.4",
"abortable-iterator": "^4.0.2",
"any-signal": "^3.0.0",
"err-code": "^3.0.1",
"it-pipe": "^2.0.3",
"it-pushable": "^2.0.1",
"it-stream-types": "^1.0.4",
"uint8arraylist": "^1.2.0",
"uint8arraylist": "^1.4.0",
"varint": "^6.0.0"
},
"devDependencies": {
"@libp2p/interface-compliance-tests": "^1.0.7",
"@libp2p/interfaces": "^1.1.1",
"@libp2p/interface-compliance-tests": "^1.1.16",
"@libp2p/interfaces": "^1.3.14",
"@types/varint": "^6.0.0",
"aegir": "^36.1.3",
"cborg": "^1.2.1",
"iso-random-stream": "^2.0.0",
"cborg": "^1.8.1",
"iso-random-stream": "^2.0.2",
"it-all": "^1.0.6",
"it-drain": "^1.0.5",
"it-foreach": "^0.1.1",
Expand Down
258 changes: 8 additions & 250 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,257 +1,15 @@
import { pipe } from 'it-pipe'
import { Pushable, pushableV } from 'it-pushable'
import { abortableSource } from 'abortable-iterator'
import { encode } from './encode.js'
import { decode } from './decode.js'
import { restrictSize } from './restrict-size.js'
import { MessageTypes, MessageTypeNames, Message } from './message-types.js'
import { createStream } from './stream.js'
import { toString as uint8ArrayToString } from 'uint8arrays'
import { trackedMap } from '@libp2p/tracked-map'
import { logger } from '@libp2p/logger'
import type { Sink } from 'it-stream-types'
import type { Muxer, MuxerOptions } from '@libp2p/interfaces/stream-muxer'
import type { Stream } from '@libp2p/interfaces/connection'
import type { ComponentMetricsTracker } from '@libp2p/interfaces/metrics'
import each from 'it-foreach'
import type { Components } from '@libp2p/interfaces/components'
import type { StreamMuxerFactory, StreamMuxerInit } from '@libp2p/interfaces/stream-muxer'
import { MplexStreamMuxer } from './mplex.js'

const log = logger('libp2p:mplex')

function printMessage (msg: Message) {
const output: any = {
...msg,
type: `${MessageTypeNames[msg.type]} (${msg.type})`
}

if (msg.type === MessageTypes.NEW_STREAM) {
output.data = uint8ArrayToString(msg.data instanceof Uint8Array ? msg.data : msg.data.slice())
}

if (msg.type === MessageTypes.MESSAGE_INITIATOR || msg.type === MessageTypes.MESSAGE_RECEIVER) {
output.data = uint8ArrayToString(msg.data instanceof Uint8Array ? msg.data : msg.data.slice(), 'base16')
}

return output
}

export interface MplexStream extends Stream {
source: Pushable<Uint8Array>
}

export interface MplexOptions extends MuxerOptions {
export interface MplexInit extends StreamMuxerInit {
maxMsgSize?: number
metrics?: ComponentMetricsTracker
}

export class Mplex implements Muxer {
static multicodec = '/mplex/6.7.0'

public sink: Sink<Uint8Array>
public source: AsyncIterable<Uint8Array>

private _streamId: number
private readonly _streams: { initiators: Map<number, MplexStream>, receivers: Map<number, MplexStream> }
private readonly _options: MplexOptions
private readonly _source: { push: (val: Message) => void, end: (err?: Error) => void }

constructor (options?: MplexOptions) {
options = options ?? {}

this._streamId = 0
this._streams = {
/**
* Stream to ids map
*/
initiators: trackedMap<number, MplexStream>({ metrics: options.metrics, component: 'mplex', metric: 'initiatorStreams' }),
/**
* Stream to ids map
*/
receivers: trackedMap<number, MplexStream>({ metrics: options.metrics, component: 'mplex', metric: 'receiverStreams' })
}
this._options = options

/**
* An iterable sink
*/
this.sink = this._createSink()

/**
* An iterable source
*/
const source = this._createSource()
this._source = source
this.source = source
}

/**
* Returns a Map of streams and their ids
*/
get streams () {
// Inbound and Outbound streams may have the same ids, so we need to make those unique
const streams: Stream[] = []
this._streams.initiators.forEach(stream => {
streams.push(stream)
})
this._streams.receivers.forEach(stream => {
streams.push(stream)
})
return streams
}

/**
* Initiate a new stream with the given name. If no name is
* provided, the id of the stream will be used.
*/
newStream (name?: string): Stream {
const id = this._streamId++
name = name == null ? id.toString() : name.toString()
const registry = this._streams.initiators
return this._newStream({ id, name, type: 'initiator', registry })
}

/**
* Called whenever an inbound stream is created
*/
_newReceiverStream (options: { id: number, name: string }) {
const { id, name } = options
const registry = this._streams.receivers
return this._newStream({ id, name, type: 'receiver', registry })
}

_newStream (options: { id: number, name: string, type: 'initiator' | 'receiver', registry: Map<number, MplexStream> }) {
const { id, name, type, registry } = options

log('new %s stream %s %s', type, id, name)

if (registry.has(id)) {
throw new Error(`${type} stream ${id} already exists!`)
}

const send = (msg: Message) => {
if (log.enabled) {
log('%s stream %s send', type, id, printMessage(msg))
}

if (msg.type === MessageTypes.NEW_STREAM || msg.type === MessageTypes.MESSAGE_INITIATOR || msg.type === MessageTypes.MESSAGE_RECEIVER) {
msg.data = msg.data instanceof Uint8Array ? msg.data : msg.data.slice()
}

this._source.push(msg)
}

const onEnd = () => {
log('%s stream %s %s ended', type, id, name)
registry.delete(id)

if (this._options.onStreamEnd != null) {
this._options.onStreamEnd(stream)
}
}

const stream = createStream({ id, name, send, type, onEnd, maxMsgSize: this._options.maxMsgSize })
registry.set(id, stream)
return stream
}

/**
* Creates a sink with an abortable source. Incoming messages will
* also have their size restricted. All messages will be varint decoded.
*/
_createSink () {
const sink: Sink<Uint8Array> = async source => {
if (this._options.signal != null) {
source = abortableSource(source, this._options.signal)
}

try {
await pipe(
source,
source => each(source, (buf) => {
// console.info('incoming', uint8ArrayToString(buf, 'base64'))
}),
decode,
restrictSize(this._options.maxMsgSize),
async source => {
for await (const msg of source) {
this._handleIncoming(msg)
}
}
)

this._source.end()
} catch (err: any) {
log('error in sink', err)
this._source.end(err) // End the source with an error
}
}

return sink
}

/**
* Creates a source that restricts outgoing message sizes
* and varint encodes them
*/
_createSource () {
const onEnd = (err?: Error) => {
const { initiators, receivers } = this._streams
// Abort all the things!
for (const s of initiators.values()) {
s.abort(err)
}
for (const s of receivers.values()) {
s.abort(err)
}
}
const source = pushableV<Message>({ onEnd })

return Object.assign(encode(source), {
push: source.push,
end: source.end,
return: source.return
})
}

_handleIncoming (message: Message) {
const { id, type } = message

if (log.enabled) {
log('incoming message', printMessage(message))
}

// Create a new stream?
if (message.type === MessageTypes.NEW_STREAM) {
const stream = this._newReceiverStream({ id, name: uint8ArrayToString(message.data instanceof Uint8Array ? message.data : message.data.slice()) })

if (this._options.onIncomingStream != null) {
this._options.onIncomingStream(stream)
}

return
}

const list = (type & 1) === 1 ? this._streams.initiators : this._streams.receivers
const stream = list.get(id)

if (stream == null) {
return log('missing stream %s', id)
}
export class Mplex implements StreamMuxerFactory {
public protocol = '/mplex/6.7.0'

switch (type) {
case MessageTypes.MESSAGE_INITIATOR:
case MessageTypes.MESSAGE_RECEIVER:
stream.source.push(message.data.slice())
break
case MessageTypes.CLOSE_INITIATOR:
case MessageTypes.CLOSE_RECEIVER:
stream.close()
break
case MessageTypes.RESET_INITIATOR:
case MessageTypes.RESET_RECEIVER:
stream.reset()
break
default:
log('unknown message type %s', type)
}
createStreamMuxer (components: Components, init?: MplexInit) {
return new MplexStreamMuxer(components, init)
}
}
Loading

0 comments on commit ab9079c

Please sign in to comment.