From 984b42d4f5b5c5ee9d5cc1a45f4f21dcd60c4ee5 Mon Sep 17 00:00:00 2001 From: nginnever Date: Thu, 19 May 2016 14:48:03 -0700 Subject: [PATCH] export is now a readable object stream --- README.md | 8 +-- package.json | 3 +- src/exporter.js | 120 ++++++++++++++++++++++-------------------- test/test-exporter.js | 74 +++++++++++++++++++++----- 4 files changed, 128 insertions(+), 77 deletions(-) diff --git a/README.md b/README.md index aee63d8c..6a1ced84 100644 --- a/README.md +++ b/README.md @@ -66,6 +66,7 @@ add.on('end', () => { // Calling write on the importer to add the file/object tuples add.write(input) +add.write(input2) add.end() ``` @@ -121,13 +122,13 @@ const repo = new ipfsRepo('', { stores: memStore }) const blocks = new ipfsBlockService(repo) const dag = new ipfsMerkleDag.DAGService(blocks) -// Create an export event with the hash you want to export and a dag service +// Create an export readable object stream with the hash you want to export and a dag service const exportEvent = Exporter(hash, dag) // Pipe the return stream to console -exportEvent.on('file', (result) => { +exportEvent.on('data', (result) => { result.stream.pipe(process.stdout) } ``` @@ -137,8 +138,7 @@ exportEvent.on('file', (result) => { const Importer = require('ipfs-unixfs-engine').exporter ``` -The exporter is an event emitter that returns a stream of the file found -by the multihash of the file from the dag service. +The exporter is a readable stream in object mode that returns an object ```{ stream: stream, path: 'path' }``` by the multihash of the file from the dag service. ## install diff --git a/package.json b/package.json index 46b5b07d..04af8ccb 100644 --- a/package.json +++ b/package.json @@ -57,6 +57,7 @@ "ipfs-merkle-dag": "^0.5.0", "ipfs-unixfs": "^0.1.0", "readable-stream": "^1.1.13", + "run-series": "^1.1.4", "through2": "^2.0.0" }, "contributors": [ @@ -67,4 +68,4 @@ "greenkeeperio-bot ", "nginnever " ] -} \ No newline at end of file +} diff --git a/src/exporter.js b/src/exporter.js index 5458d706..dacff549 100644 --- a/src/exporter.js +++ b/src/exporter.js @@ -4,42 +4,32 @@ const debug = require('debug') const log = debug('exporter') log.err = debug('exporter:error') const UnixFS = require('ipfs-unixfs') +const series = require('run-series') const async = require('async') -const events = require('events') const Readable = require('readable-stream').Readable const pathj = require('path') +const util = require('util') -exports = module.exports = exporter +exports = module.exports = Exporter -function exporter (hash, dagService, options, callback) { - if (typeof options === 'function') { - callback = options - options = {} +util.inherits(Exporter, Readable) + +function Exporter (hash, dagService, options) { + if (!(this instanceof Exporter)) { + return new Exporter(hash, dagService, options) } - const ee = new events.EventEmitter() - dagService.get(hash, (err, fetchedNode) => { - if (err) { - if (callback) { - return callback(err) - } - return - } - const data = UnixFS.unmarshal(fetchedNode.data) - const type = data.type - if (type === 'directory') { - dirExporter(fetchedNode, hash, callback) - } - if (type === 'file') { - fileExporter(fetchedNode, hash, false, callback) - } - }) - return ee + Readable.call(this, { objectMode: true }) - function fileExporter (node, name, dir, callback) { + this.options = options || {} + + this._read = (n) => {} + + let fileExporter = (node, name, callback) => { let init - if (typeof dir === 'function') { callback = dir; dir = {} } + if (!callback) { callback = function noop () {} } + var rs = new Readable() if (node.links.length === 0) { const unmarshaledData = UnixFS.unmarshal(node.data) @@ -52,10 +42,8 @@ function exporter (hash, dagService, options, callback) { rs.push(unmarshaledData.data) rs.push(null) } - ee.emit('file', { stream: rs, path: name, dir: dir }) - if (callback) { - callback() - } + this.push({ stream: rs, path: name }) + callback() return } else { init = false @@ -64,36 +52,40 @@ function exporter (hash, dagService, options, callback) { return } init = true - async.forEachSeries(node.links, (link, callback) => { - dagService.get(link.hash, (err, res) => { - if (err) { - callback(err) - } - var unmarshaledData = UnixFS.unmarshal(res.data) - rs.push(unmarshaledData.data) - callback() - }) - }, (err) => { + + const array = node.links.map((link) => { + return (cb) => { + dagService.get(link.hash, (err, res) => { + if (err) { + cb(err) + } + var unmarshaledData = UnixFS.unmarshal(res.data) + rs.push(unmarshaledData.data) + cb() + }) + } + }) + series(array, (err, res) => { if (err) { - if (callback) { - return callback(err) - } + callback() return } rs.push(null) - if (callback) { - callback() - } + callback() return }) } - ee.emit('file', { stream: rs, path: name, dir: dir }) + this.push({ stream: rs, path: name }) + callback() + return } } - function dirExporter (node, name, callback) { + let dirExporter = (node, name, callback) => { let init + if (!callback) { callback = function noop () {} } + var rs = new Readable() if (node.links.length === 0) { init = false @@ -105,10 +97,8 @@ function exporter (hash, dagService, options, callback) { rs.push(node.data) rs.push(null) } - ee.emit('file', {stream: rs, path: name}) - if (callback) { - callback() - } + this.push({stream: null, path: name}) + callback() return } else { async.forEachSeries(node.links, (link, callback) => { @@ -127,16 +117,30 @@ function exporter (hash, dagService, options, callback) { }) }, (err) => { if (err) { - if (callback) { - return callback(err) - } - return - } - if (callback) { callback() + return } + callback() return }) } } + + dagService.get(hash, (err, fetchedNode) => { + if (err) { + this.emit('error', err) + return + } + const data = UnixFS.unmarshal(fetchedNode.data) + const type = data.type + + if (type === 'directory') { + dirExporter(fetchedNode, hash) + } + if (type === 'file') { + fileExporter(fetchedNode, hash) + } + }) + + return this } diff --git a/test/test-exporter.js b/test/test-exporter.js index 9c4ee3e5..91346eaf 100644 --- a/test/test-exporter.js +++ b/test/test-exporter.js @@ -8,11 +8,14 @@ const BlockService = require('ipfs-block-service') const DAGService = require('ipfs-merkle-dag').DAGService const UnixFS = require('ipfs-unixfs') const bl = require('bl') +const fs = require('fs') +const path = require('path') let ds module.exports = function (repo) { describe('exporter', function () { + const bigFile = fs.readFileSync(path.join(__dirname, '/test-data/1.2MiB.txt')) before((done) => { const bs = new BlockService(repo) expect(bs).to.exist @@ -25,12 +28,12 @@ module.exports = function (repo) { const hash = 'QmQmZQxSKQppbsWfVzBvg59Cn3DKtsNVQ94bjAxg2h3Lb8' const bs = new BlockService(repo) const ds = new DAGService(bs) - const testExport = exporter(hash, ds) - testExport.on('file', (data) => { - ds.get(hash, (err, fetchedNode) => { - const unmarsh = UnixFS.unmarshal(fetchedNode.data) - expect(err).to.not.exist - data.stream.pipe(bl((err, bldata) => { + ds.get(hash, (err, fetchedNode) => { + const unmarsh = UnixFS.unmarshal(fetchedNode.data) + expect(err).to.not.exist + const testExport = exporter(hash, ds) + testExport.on('data', (file) => { + file.stream.pipe(bl((err, bldata) => { expect(err).to.not.exist expect(bldata).to.deep.equal(unmarsh.data) done() @@ -44,9 +47,12 @@ module.exports = function (repo) { const bs = new BlockService(repo) const ds = new DAGService(bs) const testExport = exporter(hash, ds) - testExport.on('file', (data) => { - expect(data.stream).to.exist - done() + testExport.on('data', (file) => { + file.stream.pipe(bl((err, bldata) => { + expect(bldata).to.deep.equal(bigFile) + expect(err).to.not.exist + done() + })) }) }) @@ -55,9 +61,12 @@ module.exports = function (repo) { const bs = new BlockService(repo) const ds = new DAGService(bs) const testExport = exporter(hash, ds) - testExport.on('file', (data) => { - expect(data.stream).to.exist - done() + testExport.on('data', (file) => { + expect(file.path).to.equal('QmRQgufjp9vLE8XK2LGKZSsPCFCF6e4iynCQtNB5X2HBKE') + file.stream.pipe(bl((err, bldata) => { + expect(err).to.not.exist + done() + })) }) }) @@ -67,8 +76,8 @@ module.exports = function (repo) { const ds = new DAGService(bs) const testExport = exporter(hash, ds) var fsa = [] - testExport.on('file', (data) => { - fsa.push(data) + testExport.on('data', (files) => { + fsa.push(files) }) setTimeout(() => { expect(fsa[0].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/200Bytes.txt') @@ -78,5 +87,42 @@ module.exports = function (repo) { done() }, 1000) }) + + it('returns a null stream for dir', (done) => { + const hash = 'QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn' // This hash doesn't exist in the repo + const bs = new BlockService(repo) + const ds = new DAGService(bs) + const testExport = exporter(hash, ds) + testExport.on('data', (dir) => { + expect(dir.stream).to.equal(null) + done() + }) + }) + + it('fails on non existent hash', (done) => { + const hash = 'QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKj3' // This hash doesn't exist in the repo + const bs = new BlockService(repo) + const ds = new DAGService(bs) + const testExport = exporter(hash, ds) + testExport.on('error', (err) => { + const error = err.toString() + expect(err).to.exist + const browser = error.includes('Error: key not found:') + const node = error.includes('no such file or directory') + // the browser and node js return different errors + if (browser) { + expect(error).to.contain('Error: key not found:') + done() + } + if (node) { + expect(error).to.contain('no such file or directory') + done() + } + if (!node && !browser) { + expect(node).to.equal(true) + done() + } + }) + }) }) }