Skip to content
This repository has been archived by the owner on Aug 12, 2020. It is now read-only.

Commit

Permalink
Merge pull request #42 from ipfs/exporter/duplex
Browse files Browse the repository at this point in the history
export is now a readable object stream
  • Loading branch information
daviddias committed May 21, 2016
2 parents 89a02de + 984b42d commit 1fe5848
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 77 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ add.on('end', () => {
// Calling write on the importer to add the file/object tuples

add.write(input)
add.write(input2)
add.end()
```

Expand Down Expand Up @@ -121,13 +122,13 @@ const repo = new ipfsRepo('', { stores: memStore })
const blocks = new ipfsBlockService(repo)
const dag = new ipfsMerkleDag.DAGService(blocks)
// Create an export event with the hash you want to export and a dag service
// Create an export readable object stream with the hash you want to export and a dag service
const exportEvent = Exporter(hash, dag)
// Pipe the return stream to console
exportEvent.on('file', (result) => {
exportEvent.on('data', (result) => {
result.stream.pipe(process.stdout)
}
```
Expand All @@ -137,8 +138,7 @@ exportEvent.on('file', (result) => {
const Importer = require('ipfs-unixfs-engine').exporter
```

The exporter is an event emitter that returns a stream of the file found
by the multihash of the file from the dag service.
The exporter is a readable stream in object mode that returns an object ```{ stream: stream, path: 'path' }``` by the multihash of the file from the dag service.


## install
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"ipfs-merkle-dag": "^0.5.0",
"ipfs-unixfs": "^0.1.0",
"readable-stream": "^1.1.13",
"run-series": "^1.1.4",
"through2": "^2.0.0"
},
"contributors": [
Expand All @@ -67,4 +68,4 @@
"greenkeeperio-bot <support@greenkeeper.io>",
"nginnever <ginneversource@gmail.com>"
]
}
}
120 changes: 62 additions & 58 deletions src/exporter.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,42 +4,32 @@ const debug = require('debug')
const log = debug('exporter')
log.err = debug('exporter:error')
const UnixFS = require('ipfs-unixfs')
const series = require('run-series')
const async = require('async')
const events = require('events')
const Readable = require('readable-stream').Readable
const pathj = require('path')
const util = require('util')

exports = module.exports = exporter
exports = module.exports = Exporter

function exporter (hash, dagService, options, callback) {
if (typeof options === 'function') {
callback = options
options = {}
util.inherits(Exporter, Readable)

function Exporter (hash, dagService, options) {
if (!(this instanceof Exporter)) {
return new Exporter(hash, dagService, options)
}

const ee = new events.EventEmitter()
dagService.get(hash, (err, fetchedNode) => {
if (err) {
if (callback) {
return callback(err)
}
return
}
const data = UnixFS.unmarshal(fetchedNode.data)
const type = data.type
if (type === 'directory') {
dirExporter(fetchedNode, hash, callback)
}
if (type === 'file') {
fileExporter(fetchedNode, hash, false, callback)
}
})
return ee
Readable.call(this, { objectMode: true })

function fileExporter (node, name, dir, callback) {
this.options = options || {}

this._read = (n) => {}

let fileExporter = (node, name, callback) => {
let init

if (typeof dir === 'function') { callback = dir; dir = {} }
if (!callback) { callback = function noop () {} }

var rs = new Readable()
if (node.links.length === 0) {
const unmarshaledData = UnixFS.unmarshal(node.data)
Expand All @@ -52,10 +42,8 @@ function exporter (hash, dagService, options, callback) {
rs.push(unmarshaledData.data)
rs.push(null)
}
ee.emit('file', { stream: rs, path: name, dir: dir })
if (callback) {
callback()
}
this.push({ stream: rs, path: name })
callback()
return
} else {
init = false
Expand All @@ -64,36 +52,40 @@ function exporter (hash, dagService, options, callback) {
return
}
init = true
async.forEachSeries(node.links, (link, callback) => {
dagService.get(link.hash, (err, res) => {
if (err) {
callback(err)
}
var unmarshaledData = UnixFS.unmarshal(res.data)
rs.push(unmarshaledData.data)
callback()
})
}, (err) => {

const array = node.links.map((link) => {
return (cb) => {
dagService.get(link.hash, (err, res) => {
if (err) {
cb(err)
}
var unmarshaledData = UnixFS.unmarshal(res.data)
rs.push(unmarshaledData.data)
cb()
})
}
})
series(array, (err, res) => {
if (err) {
if (callback) {
return callback(err)
}
callback()
return
}
rs.push(null)
if (callback) {
callback()
}
callback()
return
})
}
ee.emit('file', { stream: rs, path: name, dir: dir })
this.push({ stream: rs, path: name })
callback()
return
}
}

function dirExporter (node, name, callback) {
let dirExporter = (node, name, callback) => {
let init

if (!callback) { callback = function noop () {} }

var rs = new Readable()
if (node.links.length === 0) {
init = false
Expand All @@ -105,10 +97,8 @@ function exporter (hash, dagService, options, callback) {
rs.push(node.data)
rs.push(null)
}
ee.emit('file', {stream: rs, path: name})
if (callback) {
callback()
}
this.push({stream: null, path: name})
callback()
return
} else {
async.forEachSeries(node.links, (link, callback) => {
Expand All @@ -127,16 +117,30 @@ function exporter (hash, dagService, options, callback) {
})
}, (err) => {
if (err) {
if (callback) {
return callback(err)
}
return
}
if (callback) {
callback()
return
}
callback()
return
})
}
}

dagService.get(hash, (err, fetchedNode) => {
if (err) {
this.emit('error', err)
return
}
const data = UnixFS.unmarshal(fetchedNode.data)
const type = data.type

if (type === 'directory') {
dirExporter(fetchedNode, hash)
}
if (type === 'file') {
fileExporter(fetchedNode, hash)
}
})

return this
}
74 changes: 60 additions & 14 deletions test/test-exporter.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@ const BlockService = require('ipfs-block-service')
const DAGService = require('ipfs-merkle-dag').DAGService
const UnixFS = require('ipfs-unixfs')
const bl = require('bl')
const fs = require('fs')
const path = require('path')

let ds

module.exports = function (repo) {
describe('exporter', function () {
const bigFile = fs.readFileSync(path.join(__dirname, '/test-data/1.2MiB.txt'))
before((done) => {
const bs = new BlockService(repo)
expect(bs).to.exist
Expand All @@ -25,12 +28,12 @@ module.exports = function (repo) {
const hash = 'QmQmZQxSKQppbsWfVzBvg59Cn3DKtsNVQ94bjAxg2h3Lb8'
const bs = new BlockService(repo)
const ds = new DAGService(bs)
const testExport = exporter(hash, ds)
testExport.on('file', (data) => {
ds.get(hash, (err, fetchedNode) => {
const unmarsh = UnixFS.unmarshal(fetchedNode.data)
expect(err).to.not.exist
data.stream.pipe(bl((err, bldata) => {
ds.get(hash, (err, fetchedNode) => {
const unmarsh = UnixFS.unmarshal(fetchedNode.data)
expect(err).to.not.exist
const testExport = exporter(hash, ds)
testExport.on('data', (file) => {
file.stream.pipe(bl((err, bldata) => {
expect(err).to.not.exist
expect(bldata).to.deep.equal(unmarsh.data)
done()
Expand All @@ -44,9 +47,12 @@ module.exports = function (repo) {
const bs = new BlockService(repo)
const ds = new DAGService(bs)
const testExport = exporter(hash, ds)
testExport.on('file', (data) => {
expect(data.stream).to.exist
done()
testExport.on('data', (file) => {
file.stream.pipe(bl((err, bldata) => {
expect(bldata).to.deep.equal(bigFile)
expect(err).to.not.exist
done()
}))
})
})

Expand All @@ -55,9 +61,12 @@ module.exports = function (repo) {
const bs = new BlockService(repo)
const ds = new DAGService(bs)
const testExport = exporter(hash, ds)
testExport.on('file', (data) => {
expect(data.stream).to.exist
done()
testExport.on('data', (file) => {
expect(file.path).to.equal('QmRQgufjp9vLE8XK2LGKZSsPCFCF6e4iynCQtNB5X2HBKE')
file.stream.pipe(bl((err, bldata) => {
expect(err).to.not.exist
done()
}))
})
})

Expand All @@ -67,8 +76,8 @@ module.exports = function (repo) {
const ds = new DAGService(bs)
const testExport = exporter(hash, ds)
var fsa = []
testExport.on('file', (data) => {
fsa.push(data)
testExport.on('data', (files) => {
fsa.push(files)
})
setTimeout(() => {
expect(fsa[0].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/200Bytes.txt')
Expand All @@ -78,5 +87,42 @@ module.exports = function (repo) {
done()
}, 1000)
})

it('returns a null stream for dir', (done) => {
const hash = 'QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn' // This hash doesn't exist in the repo
const bs = new BlockService(repo)
const ds = new DAGService(bs)
const testExport = exporter(hash, ds)
testExport.on('data', (dir) => {
expect(dir.stream).to.equal(null)
done()
})
})

it('fails on non existent hash', (done) => {
const hash = 'QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKj3' // This hash doesn't exist in the repo
const bs = new BlockService(repo)
const ds = new DAGService(bs)
const testExport = exporter(hash, ds)
testExport.on('error', (err) => {
const error = err.toString()
expect(err).to.exist
const browser = error.includes('Error: key not found:')
const node = error.includes('no such file or directory')
// the browser and node js return different errors
if (browser) {
expect(error).to.contain('Error: key not found:')
done()
}
if (node) {
expect(error).to.contain('no such file or directory')
done()
}
if (!node && !browser) {
expect(node).to.equal(true)
done()
}
})
})
})
}

0 comments on commit 1fe5848

Please sign in to comment.