From af76378d40825209f9133bed02b94fa74a0dfa2c Mon Sep 17 00:00:00 2001 From: achingbrain Date: Wed, 23 Nov 2022 18:18:04 +0000 Subject: [PATCH] fix: pipe muxer/connection input/output streams in parallel Instead of using `it-pipe` to tie the inputs and outputs of the muxer and underlying connection together, pipe them in parallel. When sending 105MB in 32b chunks: ``` testing 0.36.x sender 3276810 messages 1638409 invocations <-- how many mplex messages are sent in how many batches sender 1638412 bufs 68 b <-- how many buffers are passed to the tcp socket and their average size 105 MB in 32 B chunks in 9238ms ``` ``` testing 0.40.x-mplex sender 3276811 messages 3276808 invocations sender 3276811 bufs 34 b 105 MB in 32 B chunks in 15963ms ``` ``` testing 0.40.x-mplex sender 3276811 messages 1638408 invocations 1638411 bufs 68 b 105 MB in 32 B chunks in 8611ms ``` Fixes #1342 --- src/upgrader.ts | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/upgrader.ts b/src/upgrader.ts index 090d44b60b..3a15f3ff29 100644 --- a/src/upgrader.ts +++ b/src/upgrader.ts @@ -1,7 +1,6 @@ import { logger } from '@libp2p/logger' import errCode from 'err-code' import * as mss from '@libp2p/multistream-select' -import { pipe } from 'it-pipe' import { codes } from './errors.js' import { createConnection } from '@libp2p/connection' import { CustomEvent, EventEmitter } from '@libp2p/interfaces/events' @@ -466,7 +465,12 @@ export class DefaultUpgrader extends EventEmitter implements Upg } // Pipe all data through the muxer - pipe(upgradedConn, muxer, upgradedConn).catch(log.error) + void Promise.all([ + muxer.sink(upgradedConn.source), + upgradedConn.sink(muxer.source) + ]).catch(err => { + log.error(err) + }) } const _timeline = maConn.timeline