Skip to content
This repository has been archived by the owner on Feb 26, 2021. It is now read-only.

Commit

Permalink
feat(spdy): migration to pull-streams
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire authored and daviddias committed Sep 6, 2016
1 parent 830d170 commit fed8198
Show file tree
Hide file tree
Showing 14 changed files with 397 additions and 287 deletions.
24 changes: 11 additions & 13 deletions examples/dialer.js
Original file line number Diff line number Diff line change
@@ -1,26 +1,24 @@
'use strict'

const tcp = require('net')
const pull = require('pull-stream')
const toPull = require('stream-to-pull-stream')
const libp2pSPDY = require('../src')

const socket = tcp.connect(9999)
const muxer = libp2pSPDY(socket, false)
const muxer = libp2pSPDY.dial(toPull(socket))

muxer.on('stream', (stream) => {
console.log('-> got new muxed stream')
stream.on('data', (data) => {
console.log('do I ever get data?', data)
})
stream.pipe(stream)
pull(stream, pull.log, stream)
})

console.log('-> opening a stream from my side')
muxer.newStream((err, stream) => {
if (err) {
throw err
}

console.log('-> opened the stream')
stream.write('hey, how is it going. I am dialer')
stream.end()
const stream = muxer.newStream((err) => {
if (err) throw err
})

pull(
pull.values(['hey, how is it going. I am dialer']),
stream
)
28 changes: 17 additions & 11 deletions examples/listener.js
Original file line number Diff line number Diff line change
@@ -1,30 +1,36 @@
'use strict'

const tcp = require('net')
const pull = require('pull-stream')
const toPull = require('stream-to-pull-stream')
const libp2pSPDY = require('../src')

const listener = tcp.createServer((socket) => {
console.log('-> got connection')

const muxer = libp2pSPDY(socket, true)
const muxer = libp2pSPDY.listen(toPull(socket))

muxer.on('stream', (stream) => {
console.log('-> got new muxed stream')
stream.on('data', (data) => {
console.log('DO I GET DATA?', data)
})
stream.pipe(stream)
pull(
stream,
pull.through((data) => {
console.log('DO I GET DATA?', data)
}),
stream
)
})

console.log('-> opening a stream from my side')
muxer.newStream((err, stream) => {
if (err) {
throw err
}
const stream = muxer.newStream((err) => {
if (err) throw err
console.log('-> opened the stream')
stream.write('hey, how is it going')
stream.end()
})

pull(
pull.values(['hey, how is it going']),
stream
)
})

listener.listen(9999, () => {
Expand Down
8 changes: 4 additions & 4 deletions gulpfile.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
const gulp = require('gulp')
const WSlibp2p = require('libp2p-websockets')
const multiaddr = require('multiaddr')
const pull = require('pull-stream')

const spdy = require('./src')

Expand All @@ -12,14 +13,13 @@ gulp.task('test:browser:before', (done) => {
const ws = new WSlibp2p()
const mh = multiaddr('/ip4/127.0.0.1/tcp/9095/ws')
listener = ws.createListener((transportSocket) => {
const muxedConn = spdy(transportSocket, true)

const muxedConn = spdy.listen(transportSocket)
muxedConn.on('stream', (connRx) => {
const connTx = muxedConn.newStream()
connRx.pipe(connTx)
connTx.pipe(connRx)
pull(connRx, connTx, connRx)
})
})

listener.listen(mh, done)
})

Expand Down
23 changes: 14 additions & 9 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
"main": "lib/index.js",
"jsnext:main": "src/index.js",
"scripts": {
"compliance": "node test/compliance.js | tap-spec",
"lint": "gulp lint",
"build": "gulp build",
"test": "gulp test",
Expand Down Expand Up @@ -35,22 +34,28 @@
},
"homepage": "https://github.com/libp2p/js-libp2p-spdy",
"devDependencies": {
"aegir": "^6.0.0",
"bl": "^1.1.2",
"aegir": "^6.0.1",
"chai": "^3.5.0",
"interface-stream-muxer": "^0.3.1",
"libp2p-tcp": "^0.7.4",
"libp2p-websockets": "^0.7.1",
"libp2p-tcp": "^0.8.0",
"libp2p-websockets": "^0.8.0",
"multiaddr": "^2.0.0",
"pre-commit": "^1.1.2",
"pull-file": "^0.5.0",
"pull-pair": "^1.1.0",
"pull-stream": "^3.4.3",
"run-parallel": "^1.1.6",
"stream-pair": "^1.0.3",
"stream-to-pull-stream": "^1.7.0",
"tap-spec": "^4.1.1",
"tape": "^4.2.0"
},
"dependencies": {
"browserify-zlib": "github:ipfs/browserify-zlib",
"interface-connection": "^0.1.8",
"spdy-transport": "^2.0.11"
"interface-connection": "^0.2.1",
"lodash.noop": "^3.0.1",
"pull-stream-to-stream": "^1.3.1",
"spdy-transport": "^2.0.14",
"stream-to-pull-stream": "^1.7.0"
},
"contributors": [
"David Dias <daviddias.p@gmail.com>",
Expand All @@ -59,4 +64,4 @@
"dignifiedquire <dignifiedquire@gmail.com>",
"nginnever <ginneversource@gmail.com>"
]
}
}
77 changes: 19 additions & 58 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -1,70 +1,31 @@
'use strict'

const spdy = require('spdy-transport')
const Connection = require('interface-connection').Connection
const EE = require('events').EventEmitter
const toStream = require('pull-stream-to-stream')

exports = module.exports = function (conn, isListener) {
const muxer = spdy.connection.create(conn, {
protocol: 'spdy',
isServer: isListener
})

const proxyMuxer = new EE()

muxer.start(3.1)

// method added to enable pure stream muxer feeling
proxyMuxer.newStream = (callback) => {
if (!callback) {
callback = noop
}

const muxedConn = new Connection(muxer.request({
method: 'POST',
path: '/',
headers: {}
}, callback))
const Muxer = require('./muxer')
const SPDY_CODEC = require('./spdy-codec')

if (conn.getObservedAddrs) {
muxedConn.getObservedAddrs = conn.getObservedAddrs.bind(conn)
muxedConn.getPeerInfo = conn.getPeerInfo.bind(conn)
muxedConn.setPeerInfo = conn.setPeerInfo.bind(conn)
}
function create (rawConn, isListener) {
const conn = toStream(rawConn)
// Let it flow, let it flooow
conn.resume()

return muxedConn
}

// The rest of the API comes by default with SPDY
muxer.on('close', () => {
proxyMuxer.emit('close')
})

muxer.on('error', (err) => {
proxyMuxer.emit('error', err)
conn.on('end', () => {
// Cleanup and destroy the connection when it ends
// as the converted stream doesn't emit 'close'
// but .destroy will trigger a 'close' event.
conn.destroy()
})

proxyMuxer.end = (cb) => {
muxer.end(cb)
}

// needed by other spdy impl that need the response headers
// in order to confirm the stream can be open
muxer.on('stream', (stream) => {
stream.respond(200, {})
const muxedConn = new Connection(stream)
if (conn.getObservedAddrs) {
muxedConn.getObservedAddrs = conn.getObservedAddrs.bind(conn)
muxedConn.getPeerInfo = conn.getPeerInfo.bind(conn)
muxedConn.setPeerInfo = conn.setPeerInfo.bind(conn)
}
proxyMuxer.emit('stream', muxedConn)
const spdyMuxer = spdy.connection.create(conn, {
protocol: 'spdy',
isServer: isListener
})

proxyMuxer.multicodec = exports.multicodec
return proxyMuxer
return new Muxer(rawConn, spdyMuxer)
}

exports.multicodec = '/spdy/3.1.0'

function noop () {}
exports.multicodec = SPDY_CODEC
exports.dial = (conn) => create(conn, false)
exports.listen = (conn) => create(conn, true)
61 changes: 61 additions & 0 deletions src/muxer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
'use strict'

const EventEmitter = require('events').EventEmitter
const noop = require('lodash.noop')
const Connection = require('interface-connection').Connection
const toPull = require('stream-to-pull-stream')

const SPDY_CODEC = require('./spdy-codec')

module.exports = class Muxer extends EventEmitter {
constructor (conn, spdy) {
super()

this.spdy = spdy
this.conn = conn
this.multicodec = SPDY_CODEC

spdy.start(3.1)

// The rest of the API comes by default with SPDY
spdy.on('close', () => {
this.emit('close')
})

spdy.on('error', (err) => {
this.emit('error', err)
})

// needed by other spdy impl that need the response headers
// in order to confirm the stream can be open
spdy.on('stream', (stream) => {
stream.respond(200, {})
const muxedConn = new Connection(toPull.duplex(stream), this.conn)
this.emit('stream', muxedConn)
})
}

// method added to enable pure stream muxer feeling
newStream (callback) {
if (!callback) {
callback = noop
}
const conn = new Connection(null, this.conn)

this.spdy.request({
method: 'POST',
path: '/',
headers: {}
}, (err, stream) => {
conn.setInnerConn(toPull.duplex(stream), this.conn)

callback(err, conn)
})

return conn
}

end (cb) {
this.spdy.end(cb)
}
}
3 changes: 3 additions & 0 deletions src/spdy-codec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
'use strict'

module.exports = '/spdy/3.1.0'
32 changes: 18 additions & 14 deletions test/browser.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@

const expect = require('chai').expect
const WSlibp2p = require('libp2p-websockets')
const spdy = require('../src')
const multiaddr = require('multiaddr')
const pull = require('pull-stream')

const spdy = require('../src')

describe('browser-server', () => {
let ws
Expand All @@ -16,22 +18,24 @@ describe('browser-server', () => {
it('ricochet test', (done) => {
const mh = multiaddr('/ip4/127.0.0.1/tcp/9095/ws')
const transportSocket = ws.dial(mh)
const muxedConn = spdy(transportSocket, false)
const muxedConn = spdy.dial(transportSocket)

muxedConn.on('stream', (conn) => {
conn.on('data', (data) => {
expect(data.toString()).to.equal('hey')
})

conn.on('end', () => {
conn.end()
})
pull(
conn,
pull.collect((err, chunks) => {
console.log('collect', err, chunks)
expect(err).to.not.exist
expect(chunks).to.be.eql([Buffer('hey')])
pull(pull.empty(), conn)
})
)
})

const conn = muxedConn.newStream()
conn.write('hey')
conn.end()
conn.on('data', () => {}) // let it floooow
conn.on('end', done)
pull(
pull.values([Buffer('hey')]),
muxedConn.newStream(),
pull.onEnd(done)
)
})
})
12 changes: 0 additions & 12 deletions test/compliance.js

This file was deleted.

16 changes: 16 additions & 0 deletions test/compliance.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/* eslint-env mocha */
'use strict'

const tests = require('interface-stream-muxer')
const spdy = require('../src')

describe('compliance', () => {
tests({
setup (cb) {
cb(null, spdy)
},
teardown (cb) {
cb()
}
})
})
Loading

0 comments on commit fed8198

Please sign in to comment.