Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

Implementing the new interfaces #1086

Merged
merged 33 commits into from
Nov 20, 2017
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
878496a
refactor: first clean up, add stubs, move things around
daviddias Nov 17, 2017
5cf7d76
feat .add .addReadableStream and .addPullStream
daviddias Nov 17, 2017
ca5d71a
.get getReadableStream and getPullStream implemented
daviddias Nov 17, 2017
f0b11ac
ls is done too!
daviddias Nov 17, 2017
292e3e5
docs: update browser-browserify
daviddias Nov 17, 2017
8d62b0a
docs: update browser-script-tag
daviddias Nov 17, 2017
b9f13e1
docs: update browser-video-streaming
daviddias Nov 17, 2017
6cd3ae4
docs: update browser webpack
daviddias Nov 17, 2017
4193dea
update ipfs 101
daviddias Nov 17, 2017
6701f09
update exchange files example
daviddias Nov 17, 2017
c292b5a
wip
daviddias Nov 17, 2017
ef4bf22
fix large file tests
daviddias Nov 17, 2017
e9821ce
update example to pull-streams
daviddias Nov 17, 2017
136a5e0
core tests all passing
daviddias Nov 17, 2017
cbdb828
concat buffers once
daviddias Nov 17, 2017
59c4483
run ALL tests on CI
daviddias Nov 17, 2017
8ebc043
make sure CI runs all tests
daviddias Nov 17, 2017
c3dd81c
always use pull-streams for cli ipfs files add
daviddias Nov 17, 2017
a006ba7
refactor cli daemon tests, find that 3 of them do not pass
daviddias Nov 17, 2017
62b39cc
fix gateway tests
daviddias Nov 17, 2017
35248d4
more fixes
daviddias Nov 17, 2017
1993a2f
chore
daviddias Nov 17, 2017
a33b5d5
.add in the example works again, just missing .get
daviddias Nov 17, 2017
09ea377
update the example
daviddias Nov 17, 2017
f5e7561
chore: update deps
daviddias Nov 17, 2017
91f7a4e
chore: update deps
daviddias Nov 20, 2017
2372c90
wip
daviddias Nov 20, 2017
01dad53
fix
daviddias Nov 20, 2017
4130b9c
skip dht
daviddias Nov 20, 2017
fb1b1dd
fix test script in package.json
daviddias Nov 20, 2017
13a5fb8
apply cr
daviddias Nov 20, 2017
54e2427
avoid passing empty array if path does not exist for ls
daviddias Nov 20, 2017
ef1680c
fix linting
daviddias Nov 20, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
286 changes: 156 additions & 130 deletions src/core/components/files.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,104 @@ const sort = require('pull-sort')
const pushable = require('pull-pushable')
const toStream = require('pull-stream-to-stream')
const toPull = require('stream-to-pull-stream')
const deferred = require('pull-defer')
const waterfall = require('async/waterfall')
const isStream = require('is-stream')
const Duplex = require('stream').Duplex
const Duplex = require('readable-stream').Duplex
const CID = require('cids')
const toB58String = require('multihashes').toB58String

function noop () {}

function prepareFile (self, opts, file, callback) {
opts = opts || {}

waterfall([
(cb) => self.object.get(file.multihash, cb),
(node, cb) => {
let cid = new CID(node.multihash)

if (opts['cid-version'] === 1) {
cid = cid.toV1()
}

const b58Hash = cid.toBaseEncodedString()

cb(null, {
path: file.path || b58Hash,
hash: b58Hash,
size: node.size
})
}
], callback)
}

function normalizeContent (content) {
if (!Array.isArray(content)) {
content = [content]
}

return content.map((data) => {
// Buffer input
if (Buffer.isBuffer(data)) {
data = { path: '', content: pull.values([data]) }
}

// Readable stream input
if (isStream.readable(data)) {
data = { path: '', content: toPull.source(data) }
}

if (data && data.content && typeof data.content !== 'function') {
if (Buffer.isBuffer(data.content)) {
data.content = pull.values([data.content])
}

if (isStream.readable(data.content)) {
data.content = toPull.source(data.content)
}
}

return data
})
}

class AddHelper extends Duplex {
constructor (pullStream, push, options) {
super(Object.assign({ objectMode: true }, options))
this._pullStream = pullStream
this._pushable = push
this._waitingPullFlush = []
}

_read () {
this._pullStream(null, (end, data) => {
while (this._waitingPullFlush.length) {
const cb = this._waitingPullFlush.shift()
cb()
}
if (end) {
if (end instanceof Error) {
this.emit('error', end)
}
} else {
this.push(data)
}
})
}

_write (chunk, encoding, callback) {
this._waitingPullFlush.push(callback)
this._pushable.push(chunk)
}
}

module.exports = function files (self) {
const createAddPullStream = (options) => {
function _addPullStream (options) {
const opts = Object.assign({}, {
shardSplitThreshold: self._options.EXPERIMENTAL.sharding ? 1000 : Infinity
shardSplitThreshold: self._options.EXPERIMENTAL.sharding
? 1000
: Infinity
}, options)

let total = 0
Expand All @@ -37,29 +125,32 @@ module.exports = function files (self) {
)
}

return {
createAddStream: (options, callback) => {
if (typeof options === 'function') {
callback = options
options = undefined
}

const addPullStream = createAddPullStream(options)
const p = pushable()
const s = pull(
p,
addPullStream
)
function _catPullStream (ipfsPath) {
if (typeof ipfsPath === 'function') {
throw new Error('You must supply an ipfsPath')
}

const retStream = new AddStreamDuplex(s, p)
const d = deferred.source()

retStream.once('finish', () => p.end())
pull(
exporter(ipfsPath, self._ipldResolver),
pull.collect((err, files) => {
if (err) { d.end(err) }
if (!files || !files.length) {
return d.end(new Error('No such file'))
}

callback(null, retStream)
},
const content = files[files.length - 1].content
console.log('do I resolve?', content)
d.resolve(content)
// toStream.source(content).pipe(concat((data) => callback(null, data)))
})
)

createAddPullStream: createAddPullStream,
return d
}

return {
add: promisify((data, options, callback) => {
if (typeof options === 'function') {
callback = options
Expand All @@ -76,7 +167,7 @@ module.exports = function files (self) {

pull(
pull.values([data]),
createAddPullStream(options),
_addPullStream(options),
sort((a, b) => {
if (a.path < b.path) return 1
if (a.path > b.path) return -1
Expand All @@ -86,23 +177,40 @@ module.exports = function files (self) {
)
}),

cat: promisify((ipfsPath, callback) => {
if (typeof ipfsPath === 'function') {
return callback(new Error('You must supply an ipfsPath'))
}
addReadableStream: (options) => {
options = options || {}

const p = pushable()
const s = pull(
p,
_addPullStream(options)
)

const retStream = new AddHelper(s, p)

retStream.once('finish', () => p.end())

return retStream
},

addPullStream: _addPullStream,

cat: promisify((ipfsPath, callback) => {
const p = _catPullStream(ipfsPath)
pull(
exporter(ipfsPath, self._ipldResolver),
pull.collect((err, files) => {
if (err) {
return callback(err)
}
if (!files || !files.length) return callback(new Error('No such file'))
callback(null, toStream.source(files[files.length - 1].content))
})
p,
pull.concat(callback)
)
}),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pgte the .cat for a large file (15Mb) is failing with:

image

Seems that some of the streams is giving up too early. Could you help me figuring this one out?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think I got it, I can't use concat to concat buffers https://github.com/pull-stream/pull-stream/blob/master/sinks/concat.js


catReadableStream: (ipfsPath) => {
const p = _catPullStream(ipfsPath)

return toStream.source(p)
},

catPullStream: _catPullStream,

get: promisify((ipfsPath, callback) => {
callback(null, toStream.source(pull(
exporter(ipfsPath, self._ipldResolver),
Expand All @@ -117,112 +225,30 @@ module.exports = function files (self) {
)))
}),

getPull: promisify((ipfsPath, callback) => {
callback(null, exporter(ipfsPath, self._ipldResolver))
}),
getReadableStream: (ipfsPath) => {
// TODO
return exporter(ipfsPath, self._ipldResolver)
},

immutableLs: promisify((ipfsPath, callback) => {
getPullStream: (ipfsPath) => {
return exporter(ipfsPath, self._ipldResolver)
},

lsImmutable: promisify((ipfsPath, callback) => {
pull(
self.files.immutableLsPullStream(ipfsPath),
pull.collect(callback))
}),

immutableLsPullStream: (ipfsPath) => {
lsReadableStreamImmutable: (ipfsPath) => {
// TODO
},

lsPullStreamImmutable: (ipfsPath) => {
return pull(
exporter(ipfsPath, self._ipldResolver, { maxDepth: 1 }),
pull.filter((node) => node.depth === 1),
pull.map((node) => Object.assign({}, node, { hash: toB58String(node.hash) })))
}
}
}

function prepareFile (self, opts, file, callback) {
opts = opts || {}

waterfall([
(cb) => self.object.get(file.multihash, cb),
(node, cb) => {
let cid = new CID(node.multihash)

if (opts['cid-version'] === 1) {
cid = cid.toV1()
}

const b58Hash = cid.toBaseEncodedString()

cb(null, {
path: file.path || b58Hash,
hash: b58Hash,
size: node.size
})
}
], callback)
}

function normalizeContent (content) {
if (!Array.isArray(content)) {
content = [content]
}

return content.map((data) => {
// Buffer input
if (Buffer.isBuffer(data)) {
data = {
path: '',
content: pull.values([data])
}
}

// Readable stream input
if (isStream.readable(data)) {
data = {
path: '',
content: toPull.source(data)
}
}

if (data && data.content && typeof data.content !== 'function') {
if (Buffer.isBuffer(data.content)) {
data.content = pull.values([data.content])
}

if (isStream.readable(data.content)) {
data.content = toPull.source(data.content)
}
}

return data
})
}

function noop () {}

class AddStreamDuplex extends Duplex {
constructor (pullStream, push, options) {
super(Object.assign({ objectMode: true }, options))
this._pullStream = pullStream
this._pushable = push
this._waitingPullFlush = []
}

_read () {
this._pullStream(null, (end, data) => {
while (this._waitingPullFlush.length) {
const cb = this._waitingPullFlush.shift()
cb()
}
if (end) {
if (end instanceof Error) {
this.emit('error', end)
}
} else {
this.push(data)
}
})
}

_write (chunk, encoding, callback) {
this._waitingPullFlush.push(callback)
this._pushable.push(chunk)
}
}
4 changes: 3 additions & 1 deletion src/core/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ class IPFS extends EventEmitter {
this.state = require('./state')(this)

// ipfs.ls
this.ls = this.files.immutableLs
this.ls = this.files.lsImmutable
this.lsReadableStream = this.files.lsReadableStreamImmutable
this.lsPullStream = this.files.lsPullStreamImmutable

boot(this)
}
Expand Down