Skip to content
This repository has been archived by the owner on Mar 11, 2020. It is now read-only.

0.3.0 #5

Merged
merged 1 commit into from
Mar 6, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 14 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
interface-stream-muxer
=====================

[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io) [![](https://img.shields.io/badge/freenode-%23ipfs-blue.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23ipfs)
[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io)
[![](https://img.shields.io/badge/freenode-%23ipfs-blue.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23ipfs)

> A test suite and interface you can use to implement a stream muxer. "A one stop shop for all your muxing needs"

Expand All @@ -13,8 +14,8 @@ The API is presented with both Node.js and Go primitives, however, there is not

# Modules that implement the interface

- [Node.js spdy-stream-muxer](https://github.com/diasdavid/node-spdy-stream-muxer) - stream-muxer abstraction on top of [spdy-transport](https://github.com/indutny/spdy-transport)
- [Node.js multiplex-stream-muxer](https://github.com/diasdavid/node-multiplex-stream-muxer) - stream-muxer abstraction on top of [multiplex](https://github.com/maxogden/multiplex)
- [JavaScript libp2p-spdy](https://github.com/diasdavid/js-libp2p-spdy)
- [JavaScript libp2p-multiplex](https://github.com/diasdavid/js-libp2p-multiplex)
- [Go spdy, muxado, yamux and multiplex](https://github.com/jbenet/go-stream-muxer)

Send a PR to add a new one if you happen to find or write one.
Expand All @@ -34,20 +35,18 @@ Install interface-stream-muxer as one of the dependencies of your project and as
```
var tape = require('tape')
var tests = require('interface-stream-muxer/tests')
var YourStreamMuxer = require('../src')
var yourStreamMuxer = require('../src')

var common = {
setup: function (t, cb) {
cb(null, YourStreamMuxer)
cb(null, yourStreamMuxer)
},
teardown: function (t, cb) {
cb()
}
}

var megaTest = false // a really really intensive test case

tests(tape, common, megaTest)
tests(tape, common)
```

## Go
Expand All @@ -60,22 +59,22 @@ A valid (read: that follows this abstraction) stream muxer, must implement the f

### Attach muxer to a transport

- `Node.js` conn = muxer.attach(transport, isListener)
- `Go` conn, err := muxer.Attach(transport, isListener)
- `Node.js` muxedConn = muxer(transport, isListener)
- `Go` muxedConn, err := muxer.Attach(transport, isListener)

This method attaches our stream muxer to the desired transport (UDP, TCP) and returns/callbacks with the `err, conn`(error, connection).

If `err` is passed, no operation should be made in `conn`.

`isListener` is a bool that tells the side of the socket we are, `isListener = true` for listener/server and `isListener = false` for dialer/client side.

`conn` interfaces our established Connection with the other endpoint, it must offer an interface to open a stream inside this connection and to receive incomming stream requests.
`muxedConn` interfaces our established Connection with the other endpoint, it must offer an interface to open a stream inside this connection and to receive incomming stream requests.

### Dial(open/create) a new stream


- `Node.js` stream = conn.dialStream([function (err, stream)])
- `Go` stream, err := conn.DialStream()
- `Node.js` stream = muxedConn.newStream([function (err, stream)])
- `Go` stream, err := muxedConn.newStream()

This method negotiates and opens a new stream with the other endpoint.

Expand All @@ -87,8 +86,8 @@ In the Node.js case, if no callback is passed, stream will emit an 'ready' event

### Listen(wait/accept) a new incoming stream

- `Node.js` conn.on('stream', function (stream))
- `Go` stream := conn.Accept()
- `Node.js` muxedConn.on('stream', function (stream) {})
- `Go` stream := muxedConn.Accept()

Each time a dialing peer initiates the new stream handshake, a new stream is created on the listening side.

Expand Down
Binary file added tests/.DS_Store
Binary file not shown.
115 changes: 46 additions & 69 deletions tests/base-test.js
Original file line number Diff line number Diff line change
@@ -1,121 +1,105 @@
var streamPair = require('stream-pair')

module.exports.all = function (test, common) {

test('Open a stream from the dealer', function (t) {
common.setup(test, function (err, Muxer) {
test('Open a stream from the dialer', function (t) {
common.setup(test, function (err, muxer) {
t.plan(4)
t.ifError(err, 'Should not throw')

var pair = streamPair.create()
var dialer = new Muxer()
var listener = new Muxer()
var dialer = muxer(pair, false)
var listener = muxer(pair.other, true)

var connDialer = dialer.attach(pair, false)
var connListener = listener.attach(pair.other, true)
listener.on('stream', (stream) => {
t.pass('got stream')
})

connDialer.dialStream(function (err, stream) {
dialer.newStream((err, stream) => {
t.ifError(err, 'Should not throw')
t.pass('dialed stream')
})

connListener.on('stream', function (stream) {
t.pass('got stream')
})
})
})

test('Open a stream from the listener', function (t) {
common.setup(test, function (err, Muxer) {
common.setup(test, function (err, muxer) {
t.plan(4)
t.ifError(err, 'Should not throw')

var pair = streamPair.create()
var dialer = new Muxer()
var listener = new Muxer()
var dialer = muxer(pair, false)
var listener = muxer(pair.other, true)

var connDialer = dialer.attach(pair, false)
var connListener = listener.attach(pair.other, true)
dialer.on('stream', (stream) => {
t.pass('got stream')
})

connListener.dialStream(function (err, stream) {
listener.newStream((err, stream) => {
t.ifError(err, 'Should not throw')
t.pass('dialed stream')
})

connDialer.on('stream', function (stream) {
t.pass('got stream')
})
})
})

test('Open a stream on both sides', function (t) {
common.setup(test, function (err, Muxer) {
common.setup(test, function (err, muxer) {
t.plan(7)
t.ifError(err, 'Should not throw')

var pair = streamPair.create()
var dialer = new Muxer()
var listener = new Muxer()
var dialer = muxer(pair, false)
var listener = muxer(pair.other, true)

var connDialer = dialer.attach(pair, false)
var connListener = listener.attach(pair.other, true)
dialer.on('stream', (stream) => {
t.pass('got stream')
})

connDialer.dialStream(function (err, stream) {
listener.newStream((err, stream) => {
t.ifError(err, 'Should not throw')
t.pass('dialed stream from dialer')
t.pass('dialed stream')
})

connListener.on('stream', function (stream) {
t.pass('listener got stream')
listener.on('stream', (stream) => {
t.pass('got stream')
})

connListener.dialStream(function (err, stream) {
dialer.newStream((err, stream) => {
t.ifError(err, 'Should not throw')
t.pass('dialed stream from listener')
})

connDialer.on('stream', function (stream) {
t.pass('dialer got stream')
t.pass('dialed stream')
})
})
})

test('Open a stream on one side, write, open a stream in the other side', function (t) {
common.setup(test, function (err, Muxer) {
common.setup(test, function (err, muxer) {
t.plan(9)
t.ifError(err, 'Should not throw')

var pair = streamPair.create()
var dialer = new Muxer()
var listener = new Muxer()
var dialer = muxer(pair, false)
var listener = muxer(pair.other, true)

var connDialer = dialer.attach(pair, false)
var connListener = listener.attach(pair.other, true)

connDialer.dialStream(function (err, stream) {
dialer.newStream(function (err, stream) {
t.ifError(err, 'Should not throw')
t.pass('dialed stream from dialer')

stream.write('hey')
})

connListener.on('stream', function (stream) {
listener.on('stream', function (stream) {
t.pass('listener got stream')

stream.on('data', function (chunk) {
t.equal(chunk.toString(), 'hey')
})

connListener.dialStream(function (err, stream) {
listener.newStream(function (err, stream) {
t.ifError(err, 'Should not throw')
t.pass('dialed stream from listener')

stream.write('hello')
})

})

connDialer.on('stream', function (stream) {
dialer.on('stream', function (stream) {
t.pass('dialer got stream')

stream.on('data', function (chunk) {
Expand All @@ -126,18 +110,15 @@ module.exports.all = function (test, common) {
})

test('Open a stream using the net.connect pattern', function (t) {
common.setup(test, function (err, Muxer) {
t.plan(3)
common.setup(test, function (err, muxer) {
t.plan(2)
t.ifError(err, 'Should not throw')

var pair = streamPair.create()
var dialer = new Muxer()
var listener = new Muxer()
var dialer = muxer(pair, false)
var listener = muxer(pair.other, true)

var connDialer = dialer.attach(pair, false)
var connListener = listener.attach(pair.other, true)

var stream = connListener.dialStream()
var stream = dialer.newStream()

stream.on('ready', function () {
t.pass('dialed stream')
Expand All @@ -147,25 +128,22 @@ module.exports.all = function (test, common) {
t.ifError(err, 'Should not throw')
})

connDialer.on('stream', function (stream) {
listener.on('stream', function (stream) {
t.pass('got stream')
})
})
})

test('Buffer writes Open a stream using the net.connect pattern', function (t) {
common.setup(test, function (err, Muxer) {
t.plan(4)
common.setup(test, function (err, muxer) {
t.plan(3)
t.ifError(err, 'Should not throw')

var pair = streamPair.create()
var dialer = new Muxer()
var listener = new Muxer()

var connDialer = dialer.attach(pair, false)
var connListener = listener.attach(pair.other, true)
var dialer = muxer(pair, false)
var listener = muxer(pair.other, true)

var stream = connListener.dialStream()
var stream = dialer.newStream()

stream.write('buffer this')

Expand All @@ -177,7 +155,7 @@ module.exports.all = function (test, common) {
t.ifError(err, 'Should not throw')
})

connDialer.on('stream', function (stream) {
listener.on('stream', function (stream) {
t.pass('got stream')

stream.on('data', function (chunk) {
Expand All @@ -186,5 +164,4 @@ module.exports.all = function (test, common) {
})
})
})

}
4 changes: 1 addition & 3 deletions tests/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,5 @@ module.exports = function (test, common, mega) {
test = timed(test)
require('./base-test.js').all(test, common)
require('./stress-test.js').all(test, common)
if (mega) {
require('./mega-stress-test.js').all(test, common)
}
// require('./mega-stress-test.js').all(test, common)
}
21 changes: 7 additions & 14 deletions tests/mega-stress-test.js
Original file line number Diff line number Diff line change
@@ -1,30 +1,25 @@
var streamPair = require('stream-pair')

module.exports.all = function (test, common) {

test('10000 messages of 10000 streams', function (t) {
common.setup(test, function (err, Muxer) {
common.setup(test, function (err, muxer) {
t.ifError(err, 'should not throw')
var pair = streamPair.create()

spawnGeneration(t, Muxer, pair, pair.other, 10000, 10000)
spawnGeneration(t, muxer, pair, pair.other, 10000, 10000)
})
})

}

function spawnGeneration (t, Muxer, dialerSocket, listenerSocket, nStreams, nMsg, size) {
function spawnGeneration (t, muxer, dialerSocket, listenerSocket, nStreams, nMsg, size) {
t.plan(1 + (5 * nStreams) + (nStreams * nMsg))

var msg = !size ? 'simple msg' : 'make the msg bigger'

var listenerMuxer = new Muxer()
var dialerMuxer = new Muxer()

var listenerConn = listenerMuxer.attach(listenerSocket, true)
var dialerConn = dialerMuxer.attach(dialerSocket, false)
var listener = muxer(listenerSocket, true)
var dialer = muxer(dialerSocket, false)

listenerConn.on('stream', function (stream) {
listener.on('stream', function (stream) {
t.pass('Incoming stream')

stream.on('data', function (chunk) {
Expand All @@ -35,11 +30,10 @@ function spawnGeneration (t, Muxer, dialerSocket, listenerSocket, nStreams, nMsg
t.pass('Stream ended on Listener')
stream.end()
})

})

for (var i = 0; i < nStreams; i++) {
dialerConn.dialStream(function (err, stream) {
dialer.newStream(function (err, stream) {
t.ifError(err, 'Should not throw')
t.pass('Dialed stream')

Expand All @@ -58,5 +52,4 @@ function spawnGeneration (t, Muxer, dialerSocket, listenerSocket, nStreams, nMsg
stream.end()
})
}

}
Loading