From 6c17081a21c3fad3b551d80b87789906bb721e03 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 25 Jul 2019 13:29:40 +0100 Subject: [PATCH 01/27] feat: enable pubsub in the browser License: MIT Signed-off-by: Alan Shaw --- package.json | 8 +- src/lib/callbackify.js | 17 +++ src/lib/configure.browser.js | 46 +++++++ src/lib/configure.js | 44 ++++++ src/lib/fetch.js | 53 ++++++++ src/lib/multiaddr.js | 16 +++ src/lib/querystring.js | 16 +++ src/pubsub.js | 212 ----------------------------- src/pubsub/index.js | 16 +++ src/pubsub/ls.js | 20 +++ src/pubsub/peers.js | 29 ++++ src/pubsub/publish.js | 51 +++++++ src/pubsub/subscribe.js | 67 +++++++++ src/pubsub/subscription-tracker.js | 52 +++++++ src/pubsub/unsubscribe.js | 10 ++ src/utils/pubsub-message-stream.js | 34 ----- src/utils/pubsub-message-utils.js | 39 ------ src/utils/stringlist-to-array.js | 9 -- test/interface.spec.js | 14 +- 19 files changed, 450 insertions(+), 303 deletions(-) create mode 100644 src/lib/callbackify.js create mode 100644 src/lib/configure.browser.js create mode 100644 src/lib/configure.js create mode 100644 src/lib/fetch.js create mode 100644 src/lib/multiaddr.js create mode 100644 src/lib/querystring.js delete mode 100644 src/pubsub.js create mode 100644 src/pubsub/index.js create mode 100644 src/pubsub/ls.js create mode 100644 src/pubsub/peers.js create mode 100644 src/pubsub/publish.js create mode 100644 src/pubsub/subscribe.js create mode 100644 src/pubsub/subscription-tracker.js create mode 100644 src/pubsub/unsubscribe.js delete mode 100644 src/utils/pubsub-message-stream.js delete mode 100644 src/utils/pubsub-message-utils.js delete mode 100644 src/utils/stringlist-to-array.js diff --git a/package.json b/package.json index ece2c88e9..e0f780311 100644 --- a/package.json +++ b/package.json @@ -17,7 +17,8 @@ "browser": { "glob": false, "fs": false, - "stream": "readable-stream" + "stream": "readable-stream", + "./src/lib/configure.js": "./src/lib/configure.browser.js" }, "repository": "github:ipfs/js-ipfs-http-client", "scripts": { @@ -33,6 +34,7 @@ "coverage": "npx nyc -r html npm run test:node -- --bail" }, "dependencies": { + "abort-controller": "^3.0.0", "async": "^2.6.1", "bignumber.js": "^9.0.0", "bl": "^3.0.0", @@ -44,6 +46,7 @@ "detect-node": "^2.0.4", "end-of-stream": "^1.4.1", "err-code": "^1.1.2", + "explain-error": "^1.0.4", "flatmap": "0.0.3", "glob": "^7.1.3", "ipfs-block": "~0.8.1", @@ -56,6 +59,7 @@ "is-stream": "^2.0.0", "iso-stream-http": "~0.1.2", "iso-url": "~0.4.6", + "iterable-ndjson": "^1.1.0", "just-kebab-case": "^1.1.0", "just-map-keys": "^1.1.0", "kind-of": "^6.0.2", @@ -65,6 +69,7 @@ "multicodec": "~0.5.1", "multihashes": "~0.4.14", "ndjson": "github:hugomrdias/ndjson#feat/readable-stream3", + "node-fetch": "^2.6.0", "once": "^1.4.0", "peer-id": "~0.12.2", "peer-info": "~0.15.1", @@ -74,6 +79,7 @@ "pull-to-stream": "~0.1.1", "pump": "^3.0.0", "qs": "^6.5.2", + "querystring": "^0.2.0", "readable-stream": "^3.1.1", "stream-to-pull-stream": "^1.7.2", "tar-stream": "^2.0.1", diff --git a/src/lib/callbackify.js b/src/lib/callbackify.js new file mode 100644 index 000000000..3a041612d --- /dev/null +++ b/src/lib/callbackify.js @@ -0,0 +1,17 @@ +'use strict' + +module.exports = (fn, opts) => { + opts = opts || {} + // Min number of non-callback args + opts.minArgs = opts.minArgs == null ? 0 : opts.minArgs + + return (...args) => { + const cb = args[args.length - 1] + + if (typeof cb !== 'function' || args.length === opts.minArgs) { + return fn(...args) + } + + fn(...args.slice(0, -1)).then(res => cb(null, res), cb) + } +} diff --git a/src/lib/configure.browser.js b/src/lib/configure.browser.js new file mode 100644 index 000000000..2943915cc --- /dev/null +++ b/src/lib/configure.browser.js @@ -0,0 +1,46 @@ +'use strict' +/* eslint-env browser */ + +const { toUri } = require('./multiaddr') + +// Set default configuration and call create function with them +module.exports = create => config => { + config = config || {} + + if (typeof config === 'string') { + config = { apiAddr: config } + } + + // Multiaddr instance + if (config.constructor && config.constructor.isMultiaddr) { + config = { apiAddr: config } + } + + config.fetch = config.fetch || require('./fetch').fetch + config.apiAddr = (config.apiAddr || getDefaultApiAddr(config)).toString() + config.apiAddr = config.apiAddr.startsWith('/') + ? toUri(config.apiAddr) + : config.apiAddr + config.apiPath = config.apiPath || config['api-path'] || '/api/v0' + + if (config.apiPath.endsWith('/')) { + config.apiPath = config.apiPath.slice(0, -1) + } + + config.headers = new Headers(config.headers) + + return create(config) +} + +function getDefaultApiAddr ({ protocol, host, port }) { + if (!protocol) { + protocol = location.protocol.startsWith('http') + ? location.protocol.split(':')[0] + : 'http' + } + + host = host || location.hostname + port = port || location.port + + return `${protocol}://${host}${port ? ':' + port : ''}` +} diff --git a/src/lib/configure.js b/src/lib/configure.js new file mode 100644 index 000000000..3557945ee --- /dev/null +++ b/src/lib/configure.js @@ -0,0 +1,44 @@ +'use strict' + +const { Headers } = require('node-fetch') +const { toUri } = require('./multiaddr') +const pkg = require('../../package.json') + +// Set default configuration and call create function with them +module.exports = create => config => { + config = config || {} + + if (typeof config === 'string') { + config = { apiAddr: config } + } + + // Multiaddr instance + if (config.constructor && config.constructor.isMultiaddr) { + config = { apiAddr: config } + } + + config.fetch = config.fetch || require('./fetch').fetch + + if (config.protocol || config.host || config.port) { + const port = config.port ? `:${config.port}` : '' + config.apiAddr = `${config.protocol || 'http'}://${config.host || 'localhost'}${port}` + } + + config.apiAddr = (config.apiAddr || 'http://localhost:5001').toString() + config.apiAddr = config.apiAddr.startsWith('/') + ? toUri(config.apiAddr) + : config.apiAddr + config.apiPath = config.apiPath || config['api-path'] || '/api/v0' + + if (config.apiPath.endsWith('/')) { + config.apiPath = config.apiPath.slice(0, -1) + } + + config.headers = new Headers(config.headers) + + if (!config.headers.has('User-Agent')) { + config.headers.append('User-Agent', `${pkg.name}/${pkg.version}`) + } + + return create(config) +} diff --git a/src/lib/fetch.js b/src/lib/fetch.js new file mode 100644 index 000000000..b3cf03cbb --- /dev/null +++ b/src/lib/fetch.js @@ -0,0 +1,53 @@ +'use strict' + +const explain = require('explain-error') + +exports.fetch = require('node-fetch') + +// Ensure fetch response is ok (200) +// and if not, attempt to JSON parse body, extract error message and throw +exports.ok = async res => { + res = await res + + if (!res.ok) { + const { status } = res + const defaultMsg = `unexpected status ${status}` + let msg + try { + let data = await res.text() + try { + data = JSON.parse(data) + msg = data.message || data.Message + } catch (err) { + msg = data + } + } catch (err) { + throw Object.assign(explain(err, defaultMsg), { status }) + } + throw Object.assign(new Error(msg || defaultMsg), { status }) + } + + return res +} + +exports.toIterable = body => { + if (body[Symbol.asyncIterator]) return body + + if (body.getReader) { + return (async function * () { + const reader = body.getReader() + + try { + while (true) { + const { done, value } = await reader.read() + if (done) return + yield value + } + } finally { + reader.releaseLock() + } + })() + } + + throw new Error('unknown stream') +} diff --git a/src/lib/multiaddr.js b/src/lib/multiaddr.js new file mode 100644 index 000000000..1ccfe72c8 --- /dev/null +++ b/src/lib/multiaddr.js @@ -0,0 +1,16 @@ +// Convert a multiaddr to a URI +// Assumes multiaddr is in a format that can be converted to a HTTP(s) URI +exports.toUri = ma => { + const parts = `${ma}`.split('/') + const port = getPort(parts) + return `${getProtocol(parts)}://${parts[2]}${port == null ? '' : ':' + port}` +} + +function getProtocol (maParts) { + return maParts.indexOf('https') === -1 ? 'http' : 'https' +} + +function getPort (maParts) { + const tcpIndex = maParts.indexOf('tcp') + return tcpIndex === -1 ? null : maParts[tcpIndex + 1] +} diff --git a/src/lib/querystring.js b/src/lib/querystring.js new file mode 100644 index 000000000..df35b0a6e --- /dev/null +++ b/src/lib/querystring.js @@ -0,0 +1,16 @@ +'use strict' + +const QueryString = require('querystring') + +// Convert an object to a query string INCLUDING leading ? +// Excludes null/undefined values +exports.objectToQuery = obj => { + if (!obj) return '' + + const qs = Object.entries(obj).reduce((obj, [key, value]) => { + if (value != null) obj[key] = value + return obj + }, {}) + + return Object.keys(qs).length ? `?${QueryString.stringify(qs)}` : '' +} diff --git a/src/pubsub.js b/src/pubsub.js deleted file mode 100644 index 6b298351d..000000000 --- a/src/pubsub.js +++ /dev/null @@ -1,212 +0,0 @@ -'use strict' - -const promisify = require('promisify-es6') -const EventEmitter = require('events') -const eos = require('end-of-stream') -const isNode = require('detect-node') -const setImmediate = require('async/setImmediate') -const PubsubMessageStream = require('./utils/pubsub-message-stream') -const stringlistToArray = require('./utils/stringlist-to-array') -const moduleConfig = require('./utils/module-config') - -const NotSupportedError = () => new Error('pubsub is currently not supported when run in the browser') - -/* Public API */ -module.exports = (arg) => { - const send = moduleConfig(arg) - - /* Internal subscriptions state and functions */ - const ps = new EventEmitter() - const subscriptions = {} - ps.id = Math.random() - return { - subscribe: (topic, handler, options, callback) => { - const defaultOptions = { - discover: false - } - - if (typeof options === 'function') { - callback = options - options = defaultOptions - } - - if (!options) { - options = defaultOptions - } - - // Throw an error if ran in the browsers - if (!isNode) { - if (!callback) { - return Promise.reject(NotSupportedError()) - } - - return setImmediate(() => callback(NotSupportedError())) - } - - // promisify doesn't work as we always pass a - // function as last argument (`handler`) - if (!callback) { - return new Promise((resolve, reject) => { - subscribe(topic, handler, options, (err) => { - if (err) { - return reject(err) - } - resolve() - }) - }) - } - - subscribe(topic, handler, options, callback) - }, - unsubscribe: (topic, handler, callback) => { - if (!isNode) { - if (!callback) { - return Promise.reject(NotSupportedError()) - } - - return setImmediate(() => callback(NotSupportedError())) - } - - if (ps.listenerCount(topic) === 0 || !subscriptions[topic]) { - const err = new Error(`Not subscribed to '${topic}'`) - - if (!callback) { - return Promise.reject(err) - } - - return setImmediate(() => callback(err)) - } - - if (!handler && !callback) { - ps.removeAllListeners(topic) - } else { - ps.removeListener(topic, handler) - } - - // Drop the request once we are actually done - if (ps.listenerCount(topic) === 0) { - if (!callback) { - return new Promise((resolve, reject) => { - // When the response stream has ended, resolve the promise - eos(subscriptions[topic].res, (err) => { - // FIXME: Artificial timeout needed to ensure unsubscribed - setTimeout(() => { - if (err) return reject(err) - resolve() - }) - }) - subscriptions[topic].req.abort() - subscriptions[topic] = null - }) - } - - // When the response stream has ended, call the callback - eos(subscriptions[topic].res, (err) => { - // FIXME: Artificial timeout needed to ensure unsubscribed - setTimeout(() => callback(err)) - }) - subscriptions[topic].req.abort() - subscriptions[topic] = null - return - } - - if (!callback) { - return Promise.resolve() - } - - setImmediate(() => callback()) - }, - publish: promisify((topic, data, callback) => { - if (!isNode) { - return callback(NotSupportedError()) - } - - if (!Buffer.isBuffer(data)) { - return callback(new Error('data must be a Buffer')) - } - - const request = { - path: 'pubsub/pub', - args: [topic, data] - } - - send(request, callback) - }), - ls: promisify((callback) => { - if (!isNode) { - return callback(NotSupportedError()) - } - - const request = { - path: 'pubsub/ls' - } - - send.andTransform(request, stringlistToArray, callback) - }), - peers: promisify((topic, callback) => { - if (!isNode) { - return callback(NotSupportedError()) - } - - const request = { - path: 'pubsub/peers', - args: [topic] - } - - send.andTransform(request, stringlistToArray, callback) - }), - setMaxListeners (n) { - return ps.setMaxListeners(n) - } - } - - function subscribe (topic, handler, options, callback) { - ps.on(topic, handler) - - if (subscriptions[topic]) { - // TODO: should a callback error be returned? - return callback() - } - - // Request params - const request = { - path: 'pubsub/sub', - args: [topic], - qs: { - discover: options.discover - } - } - - // Start the request and transform the response - // stream to Pubsub messages stream - subscriptions[topic] = {} - subscriptions[topic].req = send.andTransform(request, PubsubMessageStream.from, (err, stream) => { - if (err) { - subscriptions[topic] = null - ps.removeListener(topic, handler) - return callback(err) - } - - subscriptions[topic].res = stream - - stream.on('data', (msg) => { - ps.emit(topic, msg) - }) - - stream.on('error', (err) => { - ps.emit('error', err) - }) - - eos(stream, (err) => { - if (err) { - ps.emit('error', err) - } - - subscriptions[topic] = null - ps.removeListener(topic, handler) - }) - - callback() - }) - } -} diff --git a/src/pubsub/index.js b/src/pubsub/index.js new file mode 100644 index 000000000..7f2897ea7 --- /dev/null +++ b/src/pubsub/index.js @@ -0,0 +1,16 @@ +const callbackify = require('../lib/callbackify') + +// This file is temporary and for compatibility with legacy usage +module.exports = (send, options) => { + if (typeof send !== 'function') { + options = send + } + + return { + ls: callbackify(require('./ls')(options)), + peers: callbackify(require('./peers')(options)), + publish: callbackify(require('./publish')(options)), + subscribe: callbackify(require('./subscribe')(options), { minArgs: 2 }), + unsubscribe: callbackify(require('./unsubscribe')(options), { minArgs: 2 }) + } +} diff --git a/src/pubsub/ls.js b/src/pubsub/ls.js new file mode 100644 index 000000000..bfbf8239e --- /dev/null +++ b/src/pubsub/ls.js @@ -0,0 +1,20 @@ +'use strict' + +const configure = require('../lib/configure') +const { ok } = require('../lib/fetch') +const { objectToQuery } = require('../lib/querystring') + +module.exports = configure(({ fetch, apiAddr, apiPath, headers }) => { + return async (options) => { + options = options || {} + + const qs = objectToQuery(options.qs) + const url = `${apiAddr}${apiPath}/pubsub/ls${qs}` + const res = await ok(fetch(url, { + signal: options.signal, + headers: options.headers || headers + })) + const data = await res.json() + return data.Strings || [] + } +}) diff --git a/src/pubsub/peers.js b/src/pubsub/peers.js new file mode 100644 index 000000000..7ee860e33 --- /dev/null +++ b/src/pubsub/peers.js @@ -0,0 +1,29 @@ +'use strict' + +const { objectToQuery } = require('../lib/querystring') +const configure = require('../lib/configure') +const { ok } = require('../lib/fetch') + +module.exports = configure(({ fetch, apiAddr, apiPath, headers }) => { + return async (topic, options) => { + if (!options && typeof topic === 'object') { + options = topic + topic = null + } + + options = options || {} + + const qs = objectToQuery({ + arg: topic, + ...(options.qs || {}) + }) + + const url = `${apiAddr}${apiPath}/pubsub/peers${qs}` + const res = await ok(fetch(url, { + signal: options.signal, + headers: options.headers || headers + })) + const data = await res.json() + return data.Strings || [] + } +}) diff --git a/src/pubsub/publish.js b/src/pubsub/publish.js new file mode 100644 index 000000000..d214ddbc8 --- /dev/null +++ b/src/pubsub/publish.js @@ -0,0 +1,51 @@ +'use strict' + +const { Buffer } = require('buffer') +const configure = require('../lib/configure') +const { objectToQuery } = require('../lib/querystring') +const { ok } = require('../lib/fetch') + +module.exports = configure(({ fetch, apiAddr, apiPath, headers }) => { + return async (topic, data, options) => { + options = options || {} + + if (!Buffer.isBuffer(data)) { + throw new Error('data must be a Buffer') + } + + let qs = objectToQuery(options.qs) + qs = qs ? `&${qs.slice(1)}` : qs + + const url = `${apiAddr}${apiPath}/pubsub/pub?arg=${encodeURIComponent(topic)}&arg=${encodeBuffer(data)}${qs}` + const res = await ok(fetch(url, { + method: 'POST', + signal: options.signal, + headers: options.headers || headers + })) + + return res.text() + } +}) + +function encodeBuffer (buf) { + let uriEncoded = '' + for (const byte of buf) { + // https://tools.ietf.org/html/rfc3986#page-14 + // ALPHA (%41-%5A and %61-%7A), DIGIT (%30-%39), hyphen (%2D), period (%2E), + // underscore (%5F), or tilde (%7E) + if ( + (byte >= 0x41 && byte <= 0x5A) || + (byte >= 0x61 && byte <= 0x7A) || + (byte >= 0x30 && byte <= 0x39) || + (byte === 0x2D) || + (byte === 0x2E) || + (byte === 0x5F) || + (byte === 0x7E) + ) { + uriEncoded += String.fromCharCode(byte) + } else { + uriEncoded += `%${byte.toString(16).padStart(2, '0')}` + } + } + return uriEncoded +} diff --git a/src/pubsub/subscribe.js b/src/pubsub/subscribe.js new file mode 100644 index 000000000..f2a303937 --- /dev/null +++ b/src/pubsub/subscribe.js @@ -0,0 +1,67 @@ +'use strict' + +const ndjson = require('iterable-ndjson') +const explain = require('explain-error') +const bs58 = require('bs58') +const { Buffer } = require('buffer') +const { objectToQuery } = require('../lib/querystring') +const configure = require('../lib/configure') +const { ok, toIterable } = require('../lib/fetch') +const SubscriptionTracker = require('./subscription-tracker') + +module.exports = configure(({ fetch, apiAddr, apiPath, headers }) => { + const subsTracker = SubscriptionTracker.singleton() + + return async (topic, handler, options) => { + options = options || {} + options.signal = subsTracker.subscribe(topic, handler, options.signal) + + const qs = objectToQuery({ + arg: topic, + discover: options.discover, + ...(options.qs || {}) + }) + + const url = `${apiAddr}${apiPath}/pubsub/sub${qs}` + let res + + try { + res = await ok(fetch(url, { + method: 'POST', + signal: options.signal, + headers: options.headers || headers + })) + } catch (err) { // Initial subscribe fail, ensure we clean up + subsTracker.unsubscribe(topic, handler) + throw err + } + + // eslint-disable-next-line no-console + const onError = options.onError || (err => console.error(err)) + + ;(async () => { + try { + for await (const msg of ndjson(toIterable(res.body))) { + try { + handler({ + from: bs58.encode(Buffer.from(msg.from, 'base64')).toString(), + data: Buffer.from(msg.data, 'base64'), + seqno: Buffer.from(msg.seqno, 'base64'), + topicIDs: msg.topicIDs + }) + } catch (err) { + onError(explain(err, 'Failed to parse pubsub message'), false) // Not fatal + } + } + } catch (err) { + // FIXME: In testing with Chrome, err.type is undefined (should not be!) + // Temporarily use the name property instead. + if (err.type !== 'aborted' && err.name !== 'AbortError') { + onError(err, true) // Fatal + } + } finally { + subsTracker.unsubscribe(topic, handler) + } + })() + } +}) diff --git a/src/pubsub/subscription-tracker.js b/src/pubsub/subscription-tracker.js new file mode 100644 index 000000000..bbd7c2d7a --- /dev/null +++ b/src/pubsub/subscription-tracker.js @@ -0,0 +1,52 @@ +'use strict' + +const AbortController = require('abort-controller') + +class SubscriptionTracker { + constructor () { + this._subs = new Map() + } + + static singleton () { + if (SubscriptionTracker.instance) return SubscriptionTracker.instance + SubscriptionTracker.instance = new SubscriptionTracker() + return SubscriptionTracker.instance + } + + subscribe (topic, handler, signal) { + const topicSubs = this._subs.get(topic) || [] + + if (topicSubs.find(s => s.handler === handler)) { + throw new Error(`Already subscribed to ${topic} with this handler`) + } + + // Create controller so a call to unsubscribe can cancel the request + const controller = new AbortController() + + this._subs.set(topic, [{ handler, controller }].concat(topicSubs)) + + // If there is an external signal, forward the abort event + if (signal) { + signal.addEventListener('abort', () => this.unsubscribe(topic, handler)) + } + + return controller.signal + } + + unsubscribe (topic, handler) { + const subs = this._subs.get(topic) || [] + let unsubs + + if (handler) { + this._subs.set(topic, subs.filter(s => s.handler !== handler)) + unsubs = subs.filter(s => s.handler === handler) + } else { + this._subs.set(topic, []) + unsubs = subs + } + + unsubs.forEach(s => s.controller.abort()) + } +} + +module.exports = SubscriptionTracker diff --git a/src/pubsub/unsubscribe.js b/src/pubsub/unsubscribe.js new file mode 100644 index 000000000..a8bd14944 --- /dev/null +++ b/src/pubsub/unsubscribe.js @@ -0,0 +1,10 @@ +'use strict' + +const configure = require('../lib/configure') +const SubscriptionTracker = require('./subscription-tracker') + +module.exports = configure(({ fetch, apiAddr, apiPath, headers }) => { + const subsTracker = SubscriptionTracker.singleton() + // eslint-disable-next-line require-await + return async (topic, handler) => subsTracker.unsubscribe(topic, handler) +}) diff --git a/src/utils/pubsub-message-stream.js b/src/utils/pubsub-message-stream.js deleted file mode 100644 index 992529213..000000000 --- a/src/utils/pubsub-message-stream.js +++ /dev/null @@ -1,34 +0,0 @@ -'use strict' - -const TransformStream = require('readable-stream').Transform -const PubsubMessage = require('./pubsub-message-utils') - -class PubsubMessageStream extends TransformStream { - constructor (options) { - const opts = Object.assign(options || {}, { objectMode: true }) - super(opts) - } - - static from (inputStream, callback) { - let outputStream = inputStream.pipe(new PubsubMessageStream()) - inputStream.on('end', () => outputStream.emit('end')) - callback(null, outputStream) - } - - _transform (obj, enc, callback) { - // go-ipfs returns '{}' as the very first object atm, we skip that - if (Object.keys(obj).length === 0) { - return callback() - } - - try { - const msg = PubsubMessage.deserialize(obj, 'base64') - this.push(msg) - callback() - } catch (err) { - return callback(err) - } - } -} - -module.exports = PubsubMessageStream diff --git a/src/utils/pubsub-message-utils.js b/src/utils/pubsub-message-utils.js deleted file mode 100644 index 53d1e397a..000000000 --- a/src/utils/pubsub-message-utils.js +++ /dev/null @@ -1,39 +0,0 @@ -'use strict' - -const bs58 = require('bs58') - -module.exports = { - deserialize (data, enc) { - enc = enc ? enc.toLowerCase() : 'json' - - if (enc === 'json') { - return deserializeFromJson(data) - } else if (enc === 'base64') { - return deserializeFromBase64(data) - } - - throw new Error(`Unsupported encoding: '${enc}'`) - } -} - -function deserializeFromJson (data) { - const json = JSON.parse(data) - return deserializeFromBase64(json) -} - -function deserializeFromBase64 (obj) { - if (!isPubsubMessage(obj)) { - throw new Error(`Not a pubsub message`) - } - - return { - from: bs58.encode(Buffer.from(obj.from, 'base64')).toString(), - seqno: Buffer.from(obj.seqno, 'base64'), - data: Buffer.from(obj.data, 'base64'), - topicIDs: obj.topicIDs || obj.topicCIDs - } -} - -function isPubsubMessage (obj) { - return obj && obj.from && obj.seqno && obj.data && (obj.topicIDs || obj.topicCIDs) -} diff --git a/src/utils/stringlist-to-array.js b/src/utils/stringlist-to-array.js deleted file mode 100644 index df28ee6df..000000000 --- a/src/utils/stringlist-to-array.js +++ /dev/null @@ -1,9 +0,0 @@ -'use strict' - -// Converts a go-ipfs "stringList" to an array -// { Strings: ['A', 'B'] } --> ['A', 'B'] -function stringlistToArray (res, cb) { - cb(null, res.Strings || []) -} - -module.exports = stringlistToArray diff --git a/test/interface.spec.js b/test/interface.spec.js index 962a076b4..11231fe8b 100644 --- a/test/interface.spec.js +++ b/test/interface.spec.js @@ -275,19 +275,17 @@ describe('interface-ipfs-core tests', () => { initOptions: { bits: 1024 } } }), { - skip: isNode ? [ + skip: isWindows ? [ // pubsub.subscribe - isWindows ? { + { name: 'should send/receive 100 messages', reason: 'FIXME https://github.com/ipfs/interface-ipfs-core/pull/188#issuecomment-354673246 and https://github.com/ipfs/go-ipfs/issues/4778' - } : null, - isWindows ? { + }, + { name: 'should receive multiple messages', reason: 'FIXME https://github.com/ipfs/interface-ipfs-core/pull/188#issuecomment-354673246 and https://github.com/ipfs/go-ipfs/issues/4778' - } : null - ] : { - reason: 'FIXME pubsub is not supported in the browser https://github.com/ipfs/js-ipfs-http-client/issues/518' - } + } + ] : null }) tests.repo(defaultCommonFactory) From 0974b734d3427f619af504576bc4101b8d35d2ae Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 25 Jul 2019 13:59:57 +0100 Subject: [PATCH 02/27] fix: tests License: MIT Signed-off-by: Alan Shaw --- src/lib/configure.browser.js | 7 +- src/lib/configure.js | 7 +- test/interface.spec.js | 4 +- test/pubsub-in-browser.spec.js | 162 --------------------------------- 4 files changed, 8 insertions(+), 172 deletions(-) delete mode 100644 test/pubsub-in-browser.spec.js diff --git a/src/lib/configure.browser.js b/src/lib/configure.browser.js index 2943915cc..3597554ec 100644 --- a/src/lib/configure.browser.js +++ b/src/lib/configure.browser.js @@ -9,11 +9,10 @@ module.exports = create => config => { if (typeof config === 'string') { config = { apiAddr: config } - } - - // Multiaddr instance - if (config.constructor && config.constructor.isMultiaddr) { + } else if (config.constructor && config.constructor.isMultiaddr) { config = { apiAddr: config } + } else { + config = { ...config } } config.fetch = config.fetch || require('./fetch').fetch diff --git a/src/lib/configure.js b/src/lib/configure.js index 3557945ee..9dd48b32a 100644 --- a/src/lib/configure.js +++ b/src/lib/configure.js @@ -10,11 +10,10 @@ module.exports = create => config => { if (typeof config === 'string') { config = { apiAddr: config } - } - - // Multiaddr instance - if (config.constructor && config.constructor.isMultiaddr) { + } else if (config.constructor && config.constructor.isMultiaddr) { config = { apiAddr: config } + } else { + config = { ...config } } config.fetch = config.fetch || require('./fetch').fetch diff --git a/test/interface.spec.js b/test/interface.spec.js index 11231fe8b..736da0154 100644 --- a/test/interface.spec.js +++ b/test/interface.spec.js @@ -231,7 +231,7 @@ describe('interface-ipfs-core tests', () => { tests.namePubsub(CommonFactory.create({ spawnOptions: { args: ['--enable-namesys-pubsub'], - initOptions: { bits: 1024 } + initOptions: { bits: 1024, profile: 'test' } } }), { skip: [ @@ -272,7 +272,7 @@ describe('interface-ipfs-core tests', () => { tests.pubsub(CommonFactory.create({ spawnOptions: { args: ['--enable-pubsub-experiment'], - initOptions: { bits: 1024 } + initOptions: { bits: 1024, profile: 'test' } } }), { skip: isWindows ? [ diff --git a/test/pubsub-in-browser.spec.js b/test/pubsub-in-browser.spec.js deleted file mode 100644 index ff1a22347..000000000 --- a/test/pubsub-in-browser.spec.js +++ /dev/null @@ -1,162 +0,0 @@ -/* - We currently don't support pubsub when run in the browser, - and we test it with separate set of tests to make sure - if it's being used in the browser, pubsub errors. - - More info: https://github.com/ipfs/js-ipfs-http-client/issues/518 - - This means: - - You can use pubsub from js-ipfs-http-client in Node.js - - You can use pubsub from js-ipfs-http-client in Electron - (when js-ipfs-http-client is ran in the main process of Electron) - - - You can't use pubsub from js-ipfs-http-client in the browser - - You can't use pubsub from js-ipfs-http-client in Electron's - renderer process - - - You can use pubsub from js-ipfs in the browsers - - You can use pubsub from js-ipfs in Node.js - - You can use pubsub from js-ipfs in Electron - (in both the main process and the renderer process) - - See https://github.com/ipfs/js-ipfs for details on - pubsub in js-ipfs -*/ - -/* eslint-env mocha */ -/* eslint max-nested-callbacks: ['error', 8] */ -'use strict' - -const isNode = require('detect-node') -const chai = require('chai') -const dirtyChai = require('dirty-chai') -const expect = chai.expect -chai.use(dirtyChai) - -const ipfsClient = require('../src') -const f = require('./utils/factory') - -const expectedError = 'pubsub is currently not supported when run in the browser' - -describe('.pubsub is not supported in the browser, yet!', function () { - this.timeout(50 * 1000) - - if (isNode) { return } - - const topic = 'pubsub-tests' - let ipfs - let ipfsd - - before((done) => { - f.spawn({ initOptions: { bits: 1024, profile: 'test' } }, (err, _ipfsd) => { - expect(err).to.not.exist() - ipfsd = _ipfsd - ipfs = ipfsClient(_ipfsd.apiAddr) - done() - }) - }) - - after((done) => { - if (!ipfsd) return done() - ipfsd.stop(done) - }) - - describe('everything errors', () => { - describe('Callback API', () => { - describe('.publish', () => { - it('throws an error if called in the browser', (done) => { - ipfs.pubsub.publish(topic, 'hello friend', (err, topics) => { - expect(err).to.exist() - expect(err.message).to.equal(expectedError) - done() - }) - }) - }) - - describe('.subscribe', () => { - const handler = () => {} - it('throws an error if called in the browser', (done) => { - ipfs.pubsub.subscribe(topic, handler, {}, (err, topics) => { - expect(err).to.exist() - expect(err.message).to.equal(expectedError) - done() - }) - }) - }) - - describe('.peers', () => { - it('throws an error if called in the browser', (done) => { - ipfs.pubsub.peers(topic, (err, topics) => { - expect(err).to.exist() - expect(err.message).to.equal(expectedError) - done() - }) - }) - }) - - describe('.ls', () => { - it('throws an error if called in the browser', (done) => { - ipfs.pubsub.ls((err, topics) => { - expect(err).to.exist() - expect(err.message).to.equal(expectedError) - done() - }) - }) - }) - }) - - describe('Promise API', () => { - describe('.publish', () => { - it('throws an error if called in the browser', () => { - return ipfs.pubsub.publish(topic, 'hello friend') - .catch((err) => { - expect(err).to.exist() - expect(err.message).to.equal(expectedError) - }) - }) - }) - - describe('.subscribe', () => { - const handler = () => {} - it('throws an error if called in the browser', (done) => { - ipfs.pubsub.subscribe(topic, handler, {}) - .catch((err) => { - expect(err).to.exist() - expect(err.message).to.equal(expectedError) - done() - }) - }) - }) - - describe('.peers', () => { - it('throws an error if called in the browser', (done) => { - ipfs.pubsub.peers(topic) - .catch((err) => { - expect(err).to.exist() - expect(err.message).to.equal(expectedError) - done() - }) - }) - }) - - describe('.ls', () => { - it('throws an error if called in the browser', () => { - return ipfs.pubsub.ls() - .catch((err) => { - expect(err).to.exist() - expect(err.message).to.equal(expectedError) - }) - }) - }) - }) - - describe('.unsubscribe', () => { - it('throws an error if called in the browser', (done) => { - ipfs.pubsub.unsubscribe('test', () => {}, (err) => { - expect(err).to.exist() - expect(err.message).to.equal(expectedError) - done() - }) - }) - }) - }) -}) From 0f3e64052e7b5bb592107634f6b555529476bcdc Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 25 Jul 2019 14:16:29 +0100 Subject: [PATCH 03/27] fix: use included querystring module License: MIT Signed-off-by: Alan Shaw --- package.json | 1 - src/lib/querystring.js | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/package.json b/package.json index e0f780311..f66a5ccb7 100644 --- a/package.json +++ b/package.json @@ -79,7 +79,6 @@ "pull-to-stream": "~0.1.1", "pump": "^3.0.0", "qs": "^6.5.2", - "querystring": "^0.2.0", "readable-stream": "^3.1.1", "stream-to-pull-stream": "^1.7.2", "tar-stream": "^2.0.1", diff --git a/src/lib/querystring.js b/src/lib/querystring.js index df35b0a6e..07de0edda 100644 --- a/src/lib/querystring.js +++ b/src/lib/querystring.js @@ -1,6 +1,6 @@ 'use strict' -const QueryString = require('querystring') +const Qs = require('qs') // Convert an object to a query string INCLUDING leading ? // Excludes null/undefined values @@ -12,5 +12,5 @@ exports.objectToQuery = obj => { return obj }, {}) - return Object.keys(qs).length ? `?${QueryString.stringify(qs)}` : '' + return Object.keys(qs).length ? `?${Qs.stringify(qs, { arrayFormat: 'repeat' })}` : '' } From 850e1fb87a64d56a0589324b27ec918bb5f775a8 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 25 Jul 2019 14:29:09 +0100 Subject: [PATCH 04/27] chore: appease linter License: MIT Signed-off-by: Alan Shaw --- src/lib/multiaddr.js | 2 ++ src/pubsub/index.js | 2 ++ 2 files changed, 4 insertions(+) diff --git a/src/lib/multiaddr.js b/src/lib/multiaddr.js index 1ccfe72c8..09462ab34 100644 --- a/src/lib/multiaddr.js +++ b/src/lib/multiaddr.js @@ -1,3 +1,5 @@ +'use strict' + // Convert a multiaddr to a URI // Assumes multiaddr is in a format that can be converted to a HTTP(s) URI exports.toUri = ma => { diff --git a/src/pubsub/index.js b/src/pubsub/index.js index 7f2897ea7..8562e9ea3 100644 --- a/src/pubsub/index.js +++ b/src/pubsub/index.js @@ -1,3 +1,5 @@ +'use strict' + const callbackify = require('../lib/callbackify') // This file is temporary and for compatibility with legacy usage From 4de1e0934e9df6f2601a4d40be384fbb1c3afc18 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 25 Jul 2019 14:54:39 +0100 Subject: [PATCH 05/27] chore: update interface-ipfs-core License: MIT Signed-off-by: Alan Shaw --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index f66a5ccb7..bb5c78129 100644 --- a/package.json +++ b/package.json @@ -91,7 +91,7 @@ "cross-env": "^5.2.0", "dirty-chai": "^2.0.1", "go-ipfs-dep": "~0.4.21", - "interface-ipfs-core": "^0.109.0", + "interface-ipfs-core": "github:ipfs/interface-js-ipfs-core#fix/max-pubsub-reqs-browser", "ipfsd-ctl": "~0.43.0", "nock": "^10.0.2", "stream-equal": "^1.1.1" From 2437cf4091563a8e416494dc29273e1f9ac9a1bd Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 25 Jul 2019 15:21:56 +0100 Subject: [PATCH 06/27] chore: skip test License: MIT Signed-off-by: Alan Shaw --- test/interface.spec.js | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/test/interface.spec.js b/test/interface.spec.js index 736da0154..910b82094 100644 --- a/test/interface.spec.js +++ b/test/interface.spec.js @@ -169,6 +169,10 @@ describe('interface-ipfs-core tests', () => { name: 'should ls with a base58 encoded CID', reason: 'FIXME https://github.com/ipfs/js-ipfs-http-client/issues/339' }, + { + name: 'should ls directory with long option', + reason: 'TODO unskip when go-ipfs supports --long https://github.com/ipfs/go-ipfs/pull/6528' + }, // .lsPullStream isNode ? null : { name: 'should pull stream ls with a base58 encoded CID', From 4a89c6a127a06acb0172b53642f29008bfa89dd2 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 25 Jul 2019 15:37:57 +0100 Subject: [PATCH 07/27] fix: skip in the right place License: MIT Signed-off-by: Alan Shaw --- test/interface.spec.js | 4 ---- 1 file changed, 4 deletions(-) diff --git a/test/interface.spec.js b/test/interface.spec.js index 910b82094..736da0154 100644 --- a/test/interface.spec.js +++ b/test/interface.spec.js @@ -169,10 +169,6 @@ describe('interface-ipfs-core tests', () => { name: 'should ls with a base58 encoded CID', reason: 'FIXME https://github.com/ipfs/js-ipfs-http-client/issues/339' }, - { - name: 'should ls directory with long option', - reason: 'TODO unskip when go-ipfs supports --long https://github.com/ipfs/go-ipfs/pull/6528' - }, // .lsPullStream isNode ? null : { name: 'should pull stream ls with a base58 encoded CID', From b656aab86cc717301c385c3e364708fa69a24f0d Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 25 Jul 2019 18:19:31 +0100 Subject: [PATCH 08/27] refactor: more readable code for consuming message stream License: MIT Signed-off-by: Alan Shaw --- src/pubsub/subscribe.js | 54 +++++++++++++++++++++++------------------ 1 file changed, 30 insertions(+), 24 deletions(-) diff --git a/src/pubsub/subscribe.js b/src/pubsub/subscribe.js index f2a303937..7e431c87c 100644 --- a/src/pubsub/subscribe.js +++ b/src/pubsub/subscribe.js @@ -36,32 +36,38 @@ module.exports = configure(({ fetch, apiAddr, apiPath, headers }) => { throw err } - // eslint-disable-next-line no-console - const onError = options.onError || (err => console.error(err)) + readMessages(ndjson(toIterable(res.body)), { + onMessage: handler, + onEnd: () => subsTracker.unsubscribe(topic, handler), + onError: options.onError + }) + } +}) + +async function readMessages (msgStream, { onMessage, onEnd, onError }) { + // eslint-disable-next-line no-console + onError = onError || (err => console.error(err)) - ;(async () => { + try { + for await (const msg of msgStream) { try { - for await (const msg of ndjson(toIterable(res.body))) { - try { - handler({ - from: bs58.encode(Buffer.from(msg.from, 'base64')).toString(), - data: Buffer.from(msg.data, 'base64'), - seqno: Buffer.from(msg.seqno, 'base64'), - topicIDs: msg.topicIDs - }) - } catch (err) { - onError(explain(err, 'Failed to parse pubsub message'), false) // Not fatal - } - } + onMessage({ + from: bs58.encode(Buffer.from(msg.from, 'base64')).toString(), + data: Buffer.from(msg.data, 'base64'), + seqno: Buffer.from(msg.seqno, 'base64'), + topicIDs: msg.topicIDs + }) } catch (err) { - // FIXME: In testing with Chrome, err.type is undefined (should not be!) - // Temporarily use the name property instead. - if (err.type !== 'aborted' && err.name !== 'AbortError') { - onError(err, true) // Fatal - } - } finally { - subsTracker.unsubscribe(topic, handler) + onError(explain(err, 'Failed to parse pubsub message'), false) // Not fatal } - })() + } + } catch (err) { + // FIXME: In testing with Chrome, err.type is undefined (should not be!) + // Temporarily use the name property instead. + if (err.type !== 'aborted' && err.name !== 'AbortError') { + onError(err, true) // Fatal + } + } finally { + onEnd() } -}) +} From e6f97d399293f0028826c2a31bcf8e6a1c0e785d Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Fri, 26 Jul 2019 10:05:33 +0100 Subject: [PATCH 09/27] fix: add workaround for subscribe in Firefox License: MIT Signed-off-by: Alan Shaw --- src/pubsub/subscribe.js | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/src/pubsub/subscribe.js b/src/pubsub/subscribe.js index 7e431c87c..90eeaed0f 100644 --- a/src/pubsub/subscribe.js +++ b/src/pubsub/subscribe.js @@ -4,6 +4,7 @@ const ndjson = require('iterable-ndjson') const explain = require('explain-error') const bs58 = require('bs58') const { Buffer } = require('buffer') +const log = require('debug')('ipfs-http-client:pubsub:subscribe') const { objectToQuery } = require('../lib/querystring') const configure = require('../lib/configure') const { ok, toIterable } = require('../lib/fetch') @@ -11,6 +12,7 @@ const SubscriptionTracker = require('./subscription-tracker') module.exports = configure(({ fetch, apiAddr, apiPath, headers }) => { const subsTracker = SubscriptionTracker.singleton() + const publish = require('./publish')({ fetch, apiAddr, apiPath, headers }) return async (topic, handler, options) => { options = options || {} @@ -25,6 +27,18 @@ module.exports = configure(({ fetch, apiAddr, apiPath, headers }) => { const url = `${apiAddr}${apiPath}/pubsub/sub${qs}` let res + // In Firefox, the initial call to fetch does not resolve until some data + // is received. If this doesn't happen within 1 second send an empty message + // to kickstart the process. + const ffWorkaround = setTimeout(async () => { + log(`Publishing empty message to "${topic}" to resolve subscription request`) + try { + await publish(topic, Buffer.alloc(0), options) + } catch (err) { + log('Failed to publish empty message', err) + } + }, 1000) + try { res = await ok(fetch(url, { method: 'POST', @@ -36,6 +50,8 @@ module.exports = configure(({ fetch, apiAddr, apiPath, headers }) => { throw err } + clearTimeout(ffWorkaround) + readMessages(ndjson(toIterable(res.body)), { onMessage: handler, onEnd: () => subsTracker.unsubscribe(topic, handler), @@ -45,8 +61,7 @@ module.exports = configure(({ fetch, apiAddr, apiPath, headers }) => { }) async function readMessages (msgStream, { onMessage, onEnd, onError }) { - // eslint-disable-next-line no-console - onError = onError || (err => console.error(err)) + onError = onError || log try { for await (const msg of msgStream) { @@ -58,7 +73,7 @@ async function readMessages (msgStream, { onMessage, onEnd, onError }) { topicIDs: msg.topicIDs }) } catch (err) { - onError(explain(err, 'Failed to parse pubsub message'), false) // Not fatal + onError(explain(err, 'Failed to parse pubsub message'), false, msg) // Not fatal } } } catch (err) { From d84716adf8086edfd4b0093d2357ae55f385f309 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Fri, 26 Jul 2019 17:09:28 +0100 Subject: [PATCH 10/27] refactor: use promise-nodeify License: MIT Signed-off-by: Alan Shaw --- package.json | 1 + src/lib/callbackify.js | 17 ---------------- src/pubsub/index.js | 44 ++++++++++++++++++++++++++++++++++++------ 3 files changed, 39 insertions(+), 23 deletions(-) delete mode 100644 src/lib/callbackify.js diff --git a/package.json b/package.json index bb5c78129..f9959164b 100644 --- a/package.json +++ b/package.json @@ -73,6 +73,7 @@ "once": "^1.4.0", "peer-id": "~0.12.2", "peer-info": "~0.15.1", + "promise-nodeify": "^3.0.1", "promisify-es6": "^1.0.3", "pull-defer": "~0.2.3", "pull-stream": "^3.6.9", diff --git a/src/lib/callbackify.js b/src/lib/callbackify.js deleted file mode 100644 index 3a041612d..000000000 --- a/src/lib/callbackify.js +++ /dev/null @@ -1,17 +0,0 @@ -'use strict' - -module.exports = (fn, opts) => { - opts = opts || {} - // Min number of non-callback args - opts.minArgs = opts.minArgs == null ? 0 : opts.minArgs - - return (...args) => { - const cb = args[args.length - 1] - - if (typeof cb !== 'function' || args.length === opts.minArgs) { - return fn(...args) - } - - fn(...args.slice(0, -1)).then(res => cb(null, res), cb) - } -} diff --git a/src/pubsub/index.js b/src/pubsub/index.js index 8562e9ea3..2738bd5ac 100644 --- a/src/pubsub/index.js +++ b/src/pubsub/index.js @@ -1,6 +1,6 @@ 'use strict' -const callbackify = require('../lib/callbackify') +const nodeify = require('promise-nodeify') // This file is temporary and for compatibility with legacy usage module.exports = (send, options) => { @@ -8,11 +8,43 @@ module.exports = (send, options) => { options = send } + const ls = require('./ls')(options) + const peers = require('./peers')(options) + const publish = require('./publish')(options) + const subscribe = require('./subscribe')(options) + const unsubscribe = require('./unsubscribe')(options) + return { - ls: callbackify(require('./ls')(options)), - peers: callbackify(require('./peers')(options)), - publish: callbackify(require('./publish')(options)), - subscribe: callbackify(require('./subscribe')(options), { minArgs: 2 }), - unsubscribe: callbackify(require('./unsubscribe')(options), { minArgs: 2 }) + ls: (options, callback) => { + if (typeof options === 'function') { + callback = options + options = {} + } + return nodeify(ls(options), callback) + }, + peers: (topic, options, callback) => { + if (typeof options === 'function') { + callback = options + options = {} + } + return nodeify(peers(topic, options), callback) + }, + publish: (topic, data, options, callback) => { + if (typeof options === 'function') { + callback = options + options = {} + } + return nodeify(publish(topic, data, options), callback) + }, + subscribe: (topic, handler, options, callback) => { + if (typeof options === 'function') { + callback = options + options = {} + } + return nodeify(subscribe(topic, handler, options), callback) + }, + unsubscribe: (topic, handler, callback) => { + return nodeify(unsubscribe(topic, handler), callback) + } } } From 684b2a165d01147a50d5d3b441274d4b80f98494 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Fri, 26 Jul 2019 22:06:16 +0100 Subject: [PATCH 11/27] test: add tests for lib fns License: MIT Signed-off-by: Alan Shaw --- test/lib.configure.spec.js | 60 +++++++++++++++++++++++++ test/lib.fetch.spec.js | 92 ++++++++++++++++++++++++++++++++++++++ test/utils/throws-async.js | 8 ++++ 3 files changed, 160 insertions(+) create mode 100644 test/lib.configure.spec.js create mode 100644 test/lib.fetch.spec.js create mode 100644 test/utils/throws-async.js diff --git a/test/lib.configure.spec.js b/test/lib.configure.spec.js new file mode 100644 index 000000000..81a2dbee9 --- /dev/null +++ b/test/lib.configure.spec.js @@ -0,0 +1,60 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const expect = chai.expect +chai.use(dirtyChai) +const Multiaddr = require('multiaddr') + +const configure = require('../src/lib/configure') + +describe('lib/configure', () => { + it('should accept no config', () => { + configure(config => { + expect(config.apiAddr).to.eql('http://localhost:5001') + })() + }) + + it('should accept string multiaddr', () => { + const input = '/ip4/127.0.0.1/tcp/5001' + configure(config => { + expect(config.apiAddr).to.eql('http://127.0.0.1:5001') + })(input) + }) + + it('should accept multiaddr instance', () => { + const input = Multiaddr('/ip4/127.0.0.1') + configure(config => { + expect(config.apiAddr).to.eql('http://127.0.0.1') + })(input) + }) + + it('should accept object with protocol, host and port', () => { + const input = { protocol: 'https', host: 'ipfs.io', port: 138 } + configure(config => { + expect(config.apiAddr).to.eql('https://ipfs.io:138') + })(input) + }) + + it('should accept object with protocol only', () => { + const input = { protocol: 'https' } + configure(config => { + expect(config.apiAddr).to.eql('https://localhost') + })(input) + }) + + it('should accept object with host only', () => { + const input = { host: 'ipfs.io' } + configure(config => { + expect(config.apiAddr).to.eql('http://ipfs.io') + })(input) + }) + + it('should accept object with port only', () => { + const input = { port: 138 } + configure(config => { + expect(config.apiAddr).to.eql('http://localhost:138') + })(input) + }) +}) diff --git a/test/lib.fetch.spec.js b/test/lib.fetch.spec.js new file mode 100644 index 000000000..1db560113 --- /dev/null +++ b/test/lib.fetch.spec.js @@ -0,0 +1,92 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const expect = chai.expect +chai.use(dirtyChai) +const throwsAsync = require('./utils/throws-async') + +const { ok, toIterable } = require('../src/lib/fetch') + +describe('lib/fetch', () => { + describe('ok', () => { + it('should parse json error response', async () => { + const res = { + ok: false, + text: () => Promise.resolve(JSON.stringify({ + Message: 'boom', + Code: 0, + Type: 'error' + })), + status: 500 + } + + const err = await throwsAsync(ok(res)) + + expect(err.message).to.eql('boom') + expect(err.status).to.eql(500) + }) + + it('should gracefully fail on parse json', async () => { + const res = { + ok: false, + text: () => 'boom', // not valid json! + status: 500 + } + + const err = await throwsAsync(ok(res)) + + expect(err.message).to.eql('boom') + expect(err.status).to.eql(500) + }) + + it('should gracefully fail on read text', async () => { + const res = { + ok: false, + text: () => Promise.reject(new Error('boom')), + status: 500 + } + + const err = await throwsAsync(ok(res)) + + expect(err.message).to.eql('unexpected status 500') + expect(err.status).to.eql(500) + }) + }) + + describe('toIterable', () => { + it('should return input if already async iterable', () => { + const input = { [Symbol.asyncIterator] () { return this } } + expect(toIterable(input)).to.equal(input) + }) + + it('should convert reader to async iterable', async () => { + const inputData = [2, 31, 3, 4] + const input = { + getReader () { + let i = 0 + return { + read: async () => { + return i === inputData.length + ? { done: true } + : { value: inputData[i++] } + }, + releaseLock: () => {} + } + } + } + + const chunks = [] + for await (const chunk of toIterable(input)) { + chunks.push(chunk) + } + + expect(chunks).to.eql(inputData) + }) + + it('should throw on unknown stream', () => { + expect(() => toIterable({})).to.throw('unknown stream') + }) + }) +}) diff --git a/test/utils/throws-async.js b/test/utils/throws-async.js new file mode 100644 index 000000000..6d44ae972 --- /dev/null +++ b/test/utils/throws-async.js @@ -0,0 +1,8 @@ +module.exports = async fnOrPromise => { + try { + await (fnOrPromise.then ? fnOrPromise : fnOrPromise()) + } catch (err) { + return err + } + throw new Error('did not throw') +} From 176556ca52cf5d639c0dc42b00c533960be1c627 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Fri, 26 Jul 2019 22:10:55 +0100 Subject: [PATCH 12/27] chore: appease linter License: MIT Signed-off-by: Alan Shaw --- test/lib.fetch.spec.js | 4 ++-- test/utils/throws-async.js | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/test/lib.fetch.spec.js b/test/lib.fetch.spec.js index 1db560113..3b260a022 100644 --- a/test/lib.fetch.spec.js +++ b/test/lib.fetch.spec.js @@ -67,12 +67,12 @@ describe('lib/fetch', () => { getReader () { let i = 0 return { - read: async () => { + read () { return i === inputData.length ? { done: true } : { value: inputData[i++] } }, - releaseLock: () => {} + releaseLock () {} } } } diff --git a/test/utils/throws-async.js b/test/utils/throws-async.js index 6d44ae972..0d4e677fd 100644 --- a/test/utils/throws-async.js +++ b/test/utils/throws-async.js @@ -1,3 +1,5 @@ +'use strict' + module.exports = async fnOrPromise => { try { await (fnOrPromise.then ? fnOrPromise : fnOrPromise()) From cea1bd5ef9a38554d619e2117e360c27fae54a06 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Fri, 26 Jul 2019 22:40:52 +0100 Subject: [PATCH 13/27] fix: tests in browser License: MIT Signed-off-by: Alan Shaw --- test/lib.configure.spec.js | 27 ++++++++++++++++++++++----- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/test/lib.configure.spec.js b/test/lib.configure.spec.js index 81a2dbee9..e28abd6c6 100644 --- a/test/lib.configure.spec.js +++ b/test/lib.configure.spec.js @@ -1,4 +1,4 @@ -/* eslint-env mocha */ +/* eslint-env mocha, browser */ 'use strict' const chai = require('chai') @@ -6,13 +6,18 @@ const dirtyChai = require('dirty-chai') const expect = chai.expect chai.use(dirtyChai) const Multiaddr = require('multiaddr') +const { isBrowser, isWebWorker } = require('ipfs-utils/src/env') const configure = require('../src/lib/configure') describe('lib/configure', () => { it('should accept no config', () => { configure(config => { - expect(config.apiAddr).to.eql('http://localhost:5001') + if (isBrowser || isWebWorker) { + expect(config.apiAddr).to.eql(location.origin) + } else { + expect(config.apiAddr).to.eql('http://localhost:5001') + } })() }) @@ -40,21 +45,33 @@ describe('lib/configure', () => { it('should accept object with protocol only', () => { const input = { protocol: 'https' } configure(config => { - expect(config.apiAddr).to.eql('https://localhost') + if (isBrowser || isWebWorker) { + expect(config.apiAddr).to.eql(`https://${location.host}`) + } else { + expect(config.apiAddr).to.eql('https://localhost') + } })(input) }) it('should accept object with host only', () => { const input = { host: 'ipfs.io' } configure(config => { - expect(config.apiAddr).to.eql('http://ipfs.io') + if (isBrowser || isWebWorker) { + expect(config.apiAddr).to.eql(`http://ipfs.io:${location.port}`) + } else { + expect(config.apiAddr).to.eql('http://ipfs.io') + } })(input) }) it('should accept object with port only', () => { const input = { port: 138 } configure(config => { - expect(config.apiAddr).to.eql('http://localhost:138') + if (isBrowser || isWebWorker) { + expect(config.apiAddr).to.eql(`http://${location.hostname}:138`) + } else { + expect(config.apiAddr).to.eql('http://localhost:138') + } })(input) }) }) From 636d302ed0c7bb8275a501dc694ffe3997ebc6b2 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Mon, 29 Jul 2019 09:42:32 +0100 Subject: [PATCH 14/27] perf: use URLSearchParams in the browser License: MIT Signed-off-by: Alan Shaw --- package.json | 3 ++- src/lib/querystring.browser.js | 23 +++++++++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) create mode 100644 src/lib/querystring.browser.js diff --git a/package.json b/package.json index f9959164b..260c55163 100644 --- a/package.json +++ b/package.json @@ -18,7 +18,8 @@ "glob": false, "fs": false, "stream": "readable-stream", - "./src/lib/configure.js": "./src/lib/configure.browser.js" + "./src/lib/configure.js": "./src/lib/configure.browser.js", + "./src/lib/querystring.js": "./src/lib/querystring.browser.js" }, "repository": "github:ipfs/js-ipfs-http-client", "scripts": { diff --git a/src/lib/querystring.browser.js b/src/lib/querystring.browser.js new file mode 100644 index 000000000..d71843dbe --- /dev/null +++ b/src/lib/querystring.browser.js @@ -0,0 +1,23 @@ +'use strict' + +// Convert an object to a query string INCLUDING leading ? +// Excludes null/undefined values +exports.objectToQuery = obj => { + if (!obj) return '' + + let qs = new URLSearchParams() + + for (const [key, value] of Object.entries(obj)) { + if (value != null) { + if (Array.isArray(value)) { + value.forEach(v => qs.append(key, v)) + } else { + qs.append(key, value) + } + } + } + + qs = qs.toString() + + return qs ? `?${qs}` : qs +} From 62529c357a018e5a7de089e0949f018c3dce1ed6 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Mon, 29 Jul 2019 09:55:55 +0100 Subject: [PATCH 15/27] chore: appease linter License: MIT Signed-off-by: Alan Shaw --- src/lib/querystring.browser.js | 6 ++---- src/lib/querystring.js | 6 ++++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/lib/querystring.browser.js b/src/lib/querystring.browser.js index d71843dbe..4c4c5d2a8 100644 --- a/src/lib/querystring.browser.js +++ b/src/lib/querystring.browser.js @@ -5,9 +5,7 @@ exports.objectToQuery = obj => { if (!obj) return '' - let qs = new URLSearchParams() - - for (const [key, value] of Object.entries(obj)) { + let qs = Object.entries(obj).forEach(([key, value]) => { if (value != null) { if (Array.isArray(value)) { value.forEach(v => qs.append(key, v)) @@ -15,7 +13,7 @@ exports.objectToQuery = obj => { qs.append(key, value) } } - } + }) qs = qs.toString() diff --git a/src/lib/querystring.js b/src/lib/querystring.js index 07de0edda..e7d64e152 100644 --- a/src/lib/querystring.js +++ b/src/lib/querystring.js @@ -7,10 +7,12 @@ const Qs = require('qs') exports.objectToQuery = obj => { if (!obj) return '' - const qs = Object.entries(obj).reduce((obj, [key, value]) => { + let qs = Object.entries(obj).reduce((obj, [key, value]) => { if (value != null) obj[key] = value return obj }, {}) - return Object.keys(qs).length ? `?${Qs.stringify(qs, { arrayFormat: 'repeat' })}` : '' + qs = Qs.stringify(qs, { arrayFormat: 'repeat' }) + + return qs ? `?${qs}` : qs } From b58c489648a7e6ffda7174850185f2f0fabe8f9c Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Mon, 29 Jul 2019 10:30:32 +0100 Subject: [PATCH 16/27] fix: oops, removed more code than I should have License: MIT Signed-off-by: Alan Shaw --- src/lib/querystring.browser.js | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/lib/querystring.browser.js b/src/lib/querystring.browser.js index 4c4c5d2a8..c9e737818 100644 --- a/src/lib/querystring.browser.js +++ b/src/lib/querystring.browser.js @@ -5,7 +5,9 @@ exports.objectToQuery = obj => { if (!obj) return '' - let qs = Object.entries(obj).forEach(([key, value]) => { + let qs = new URLSearchParams() + + Object.entries(obj).forEach(([key, value]) => { if (value != null) { if (Array.isArray(value)) { value.forEach(v => qs.append(key, v)) From 70653adff2eb65c528b2fa88ed7172218253c5ca Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Tue, 30 Jul 2019 11:53:22 +0100 Subject: [PATCH 17/27] refactor: use ky License: MIT Signed-off-by: Alan Shaw --- package.json | 7 +-- src/lib/configure.browser.js | 45 -------------- src/lib/configure.js | 50 +++++++++++----- src/lib/error-handler.js | 31 ++++++++++ src/lib/fetch.js | 53 ----------------- src/lib/querystring.browser.js | 23 -------- src/lib/querystring.js | 18 ------ src/lib/stream-to-iterable.js | 25 ++++++++ src/pubsub/ls.js | 18 +++--- src/pubsub/peers.js | 23 ++++---- src/pubsub/publish.js | 19 +++--- src/pubsub/subscribe.js | 25 ++++---- src/pubsub/unsubscribe.js | 2 +- test/lib.error-handler.spec.js | 54 +++++++++++++++++ test/lib.fetch.spec.js | 92 ----------------------------- test/lib.stream-to-iterable.spec.js | 43 ++++++++++++++ 16 files changed, 228 insertions(+), 300 deletions(-) delete mode 100644 src/lib/configure.browser.js create mode 100644 src/lib/error-handler.js delete mode 100644 src/lib/fetch.js delete mode 100644 src/lib/querystring.browser.js delete mode 100644 src/lib/querystring.js create mode 100644 src/lib/stream-to-iterable.js create mode 100644 test/lib.error-handler.spec.js delete mode 100644 test/lib.fetch.spec.js create mode 100644 test/lib.stream-to-iterable.spec.js diff --git a/package.json b/package.json index 260c55163..fd5e04c79 100644 --- a/package.json +++ b/package.json @@ -17,9 +17,7 @@ "browser": { "glob": false, "fs": false, - "stream": "readable-stream", - "./src/lib/configure.js": "./src/lib/configure.browser.js", - "./src/lib/querystring.js": "./src/lib/querystring.browser.js" + "stream": "readable-stream" }, "repository": "github:ipfs/js-ipfs-http-client", "scripts": { @@ -64,13 +62,14 @@ "just-kebab-case": "^1.1.0", "just-map-keys": "^1.1.0", "kind-of": "^6.0.2", + "ky": "^0.11.2", + "ky-universal": "^0.2.2", "lru-cache": "^5.1.1", "multiaddr": "^6.0.6", "multibase": "~0.6.0", "multicodec": "~0.5.1", "multihashes": "~0.4.14", "ndjson": "github:hugomrdias/ndjson#feat/readable-stream3", - "node-fetch": "^2.6.0", "once": "^1.4.0", "peer-id": "~0.12.2", "peer-info": "~0.15.1", diff --git a/src/lib/configure.browser.js b/src/lib/configure.browser.js deleted file mode 100644 index 3597554ec..000000000 --- a/src/lib/configure.browser.js +++ /dev/null @@ -1,45 +0,0 @@ -'use strict' -/* eslint-env browser */ - -const { toUri } = require('./multiaddr') - -// Set default configuration and call create function with them -module.exports = create => config => { - config = config || {} - - if (typeof config === 'string') { - config = { apiAddr: config } - } else if (config.constructor && config.constructor.isMultiaddr) { - config = { apiAddr: config } - } else { - config = { ...config } - } - - config.fetch = config.fetch || require('./fetch').fetch - config.apiAddr = (config.apiAddr || getDefaultApiAddr(config)).toString() - config.apiAddr = config.apiAddr.startsWith('/') - ? toUri(config.apiAddr) - : config.apiAddr - config.apiPath = config.apiPath || config['api-path'] || '/api/v0' - - if (config.apiPath.endsWith('/')) { - config.apiPath = config.apiPath.slice(0, -1) - } - - config.headers = new Headers(config.headers) - - return create(config) -} - -function getDefaultApiAddr ({ protocol, host, port }) { - if (!protocol) { - protocol = location.protocol.startsWith('http') - ? location.protocol.split(':')[0] - : 'http' - } - - host = host || location.hostname - port = port || location.port - - return `${protocol}://${host}${port ? ':' + port : ''}` -} diff --git a/src/lib/configure.js b/src/lib/configure.js index 9dd48b32a..3acb10db6 100644 --- a/src/lib/configure.js +++ b/src/lib/configure.js @@ -1,8 +1,10 @@ 'use strict' +/* eslint-env browser */ -const { Headers } = require('node-fetch') +const ky = require('ky-universal') +const { isBrowser, isWebWorker } = require('ipfs-utils/src/env') const { toUri } = require('./multiaddr') -const pkg = require('../../package.json') +const errorHandler = require('./error-handler') // Set default configuration and call create function with them module.exports = create => config => { @@ -16,28 +18,46 @@ module.exports = create => config => { config = { ...config } } - config.fetch = config.fetch || require('./fetch').fetch - if (config.protocol || config.host || config.port) { const port = config.port ? `:${config.port}` : '' config.apiAddr = `${config.protocol || 'http'}://${config.host || 'localhost'}${port}` } - config.apiAddr = (config.apiAddr || 'http://localhost:5001').toString() - config.apiAddr = config.apiAddr.startsWith('/') - ? toUri(config.apiAddr) - : config.apiAddr + config.apiAddr = (config.apiAddr || getDefaultApiAddr(config)).toString() + config.apiAddr = config.apiAddr.startsWith('/') ? toUri(config.apiAddr) : config.apiAddr config.apiPath = config.apiPath || config['api-path'] || '/api/v0' - if (config.apiPath.endsWith('/')) { - config.apiPath = config.apiPath.slice(0, -1) - } + return create({ + // TODO configure ky to use config.fetch when this is released: + // https://github.com/sindresorhus/ky/pull/153 + ky: ky.extend({ + prefixUrl: config.apiAddr + config.apiPath, + timeout: config.timeout || 60 * 1000, + headers: config.headers, + hooks: { + afterResponse: [errorHandler] + } + }) + }) +} + +function getDefaultApiAddr ({ protocol, host, port }) { + if (isBrowser || isWebWorker) { + if (!protocol && !host && !port) { // Use current origin + return '' + } + + if (!protocol) { + protocol = location.protocol.startsWith('http') + ? location.protocol.split(':')[0] + : 'http' + } - config.headers = new Headers(config.headers) + host = host || location.hostname + port = port || location.port - if (!config.headers.has('User-Agent')) { - config.headers.append('User-Agent', `${pkg.name}/${pkg.version}`) + return `${protocol}://${host}${port ? ':' + port : ''}` } - return create(config) + return `${protocol || 'http'}://${host || 'localhost'}:${port || 5001}` } diff --git a/src/lib/error-handler.js b/src/lib/error-handler.js new file mode 100644 index 000000000..1e788227c --- /dev/null +++ b/src/lib/error-handler.js @@ -0,0 +1,31 @@ +'use strict' + +const { HTTPError } = require('ky-universal') +const log = require('debug')('ipfs-http-client:lib:error-handler') + +function isJsonResponse (res) { + return (res.headers.get('Content-Type') || '').startsWith('application/json') +} + +module.exports = async function errorHandler (response) { + if (response.ok) return + + let msg + + try { + if (isJsonResponse(response)) { + const data = await response.json() + log(data) + msg = data.Message || data.message + } else { + msg = await response.text() + } + } catch (err) { + log('Failed to parse error response', err) + // Failed to extract/parse error message from response + throw new HTTPError(response) + } + + if (!msg) throw new HTTPError(response) + throw Object.assign(new Error(msg), { status: response.status }) +} diff --git a/src/lib/fetch.js b/src/lib/fetch.js deleted file mode 100644 index b3cf03cbb..000000000 --- a/src/lib/fetch.js +++ /dev/null @@ -1,53 +0,0 @@ -'use strict' - -const explain = require('explain-error') - -exports.fetch = require('node-fetch') - -// Ensure fetch response is ok (200) -// and if not, attempt to JSON parse body, extract error message and throw -exports.ok = async res => { - res = await res - - if (!res.ok) { - const { status } = res - const defaultMsg = `unexpected status ${status}` - let msg - try { - let data = await res.text() - try { - data = JSON.parse(data) - msg = data.message || data.Message - } catch (err) { - msg = data - } - } catch (err) { - throw Object.assign(explain(err, defaultMsg), { status }) - } - throw Object.assign(new Error(msg || defaultMsg), { status }) - } - - return res -} - -exports.toIterable = body => { - if (body[Symbol.asyncIterator]) return body - - if (body.getReader) { - return (async function * () { - const reader = body.getReader() - - try { - while (true) { - const { done, value } = await reader.read() - if (done) return - yield value - } - } finally { - reader.releaseLock() - } - })() - } - - throw new Error('unknown stream') -} diff --git a/src/lib/querystring.browser.js b/src/lib/querystring.browser.js deleted file mode 100644 index c9e737818..000000000 --- a/src/lib/querystring.browser.js +++ /dev/null @@ -1,23 +0,0 @@ -'use strict' - -// Convert an object to a query string INCLUDING leading ? -// Excludes null/undefined values -exports.objectToQuery = obj => { - if (!obj) return '' - - let qs = new URLSearchParams() - - Object.entries(obj).forEach(([key, value]) => { - if (value != null) { - if (Array.isArray(value)) { - value.forEach(v => qs.append(key, v)) - } else { - qs.append(key, value) - } - } - }) - - qs = qs.toString() - - return qs ? `?${qs}` : qs -} diff --git a/src/lib/querystring.js b/src/lib/querystring.js deleted file mode 100644 index e7d64e152..000000000 --- a/src/lib/querystring.js +++ /dev/null @@ -1,18 +0,0 @@ -'use strict' - -const Qs = require('qs') - -// Convert an object to a query string INCLUDING leading ? -// Excludes null/undefined values -exports.objectToQuery = obj => { - if (!obj) return '' - - let qs = Object.entries(obj).reduce((obj, [key, value]) => { - if (value != null) obj[key] = value - return obj - }, {}) - - qs = Qs.stringify(qs, { arrayFormat: 'repeat' }) - - return qs ? `?${qs}` : qs -} diff --git a/src/lib/stream-to-iterable.js b/src/lib/stream-to-iterable.js new file mode 100644 index 000000000..5e06a99c6 --- /dev/null +++ b/src/lib/stream-to-iterable.js @@ -0,0 +1,25 @@ +'use strict' + +module.exports = function toIterable (body) { + // Node.js stream + if (body[Symbol.asyncIterator]) return body + + // Browser ReadableStream + if (body.getReader) { + return (async function * () { + const reader = body.getReader() + + try { + while (true) { + const { done, value } = await reader.read() + if (done) return + yield value + } + } finally { + reader.releaseLock() + } + })() + } + + throw new Error('unknown stream') +} diff --git a/src/pubsub/ls.js b/src/pubsub/ls.js index bfbf8239e..177dcd491 100644 --- a/src/pubsub/ls.js +++ b/src/pubsub/ls.js @@ -1,20 +1,18 @@ 'use strict' const configure = require('../lib/configure') -const { ok } = require('../lib/fetch') -const { objectToQuery } = require('../lib/querystring') -module.exports = configure(({ fetch, apiAddr, apiPath, headers }) => { +module.exports = configure(({ ky }) => { return async (options) => { options = options || {} - const qs = objectToQuery(options.qs) - const url = `${apiAddr}${apiPath}/pubsub/ls${qs}` - const res = await ok(fetch(url, { + const { Strings } = await ky.get('pubsub/ls', { + timeout: options.timeout, signal: options.signal, - headers: options.headers || headers - })) - const data = await res.json() - return data.Strings || [] + headers: options.headers, + searchParams: options.searchParams + }).json() + + return Strings || [] } }) diff --git a/src/pubsub/peers.js b/src/pubsub/peers.js index 7ee860e33..bdeca60e4 100644 --- a/src/pubsub/peers.js +++ b/src/pubsub/peers.js @@ -1,10 +1,8 @@ 'use strict' -const { objectToQuery } = require('../lib/querystring') const configure = require('../lib/configure') -const { ok } = require('../lib/fetch') -module.exports = configure(({ fetch, apiAddr, apiPath, headers }) => { +module.exports = configure(({ ky }) => { return async (topic, options) => { if (!options && typeof topic === 'object') { options = topic @@ -13,17 +11,16 @@ module.exports = configure(({ fetch, apiAddr, apiPath, headers }) => { options = options || {} - const qs = objectToQuery({ - arg: topic, - ...(options.qs || {}) - }) + const searchParams = new URLSearchParams(options.searchParams) + searchParams.set('arg', topic) - const url = `${apiAddr}${apiPath}/pubsub/peers${qs}` - const res = await ok(fetch(url, { + const { Strings } = await ky.get('pubsub/peers', { + timeout: options.timeout, signal: options.signal, - headers: options.headers || headers - })) - const data = await res.json() - return data.Strings || [] + headers: options.headers, + searchParams + }).json() + + return Strings || [] } }) diff --git a/src/pubsub/publish.js b/src/pubsub/publish.js index d214ddbc8..3e63ee6f4 100644 --- a/src/pubsub/publish.js +++ b/src/pubsub/publish.js @@ -2,10 +2,8 @@ const { Buffer } = require('buffer') const configure = require('../lib/configure') -const { objectToQuery } = require('../lib/querystring') -const { ok } = require('../lib/fetch') -module.exports = configure(({ fetch, apiAddr, apiPath, headers }) => { +module.exports = configure(({ ky }) => { return async (topic, data, options) => { options = options || {} @@ -13,17 +11,14 @@ module.exports = configure(({ fetch, apiAddr, apiPath, headers }) => { throw new Error('data must be a Buffer') } - let qs = objectToQuery(options.qs) - qs = qs ? `&${qs.slice(1)}` : qs + const searchParams = new URLSearchParams(options.searchParams) + searchParams.set('arg', topic) - const url = `${apiAddr}${apiPath}/pubsub/pub?arg=${encodeURIComponent(topic)}&arg=${encodeBuffer(data)}${qs}` - const res = await ok(fetch(url, { - method: 'POST', + return ky.post(`pubsub/pub?${searchParams}&arg=${encodeBuffer(data)}`, { + timeout: options.timeout, signal: options.signal, - headers: options.headers || headers - })) - - return res.text() + headers: options.headers + }).text() } }) diff --git a/src/pubsub/subscribe.js b/src/pubsub/subscribe.js index 90eeaed0f..ae95ec5c8 100644 --- a/src/pubsub/subscribe.js +++ b/src/pubsub/subscribe.js @@ -5,26 +5,22 @@ const explain = require('explain-error') const bs58 = require('bs58') const { Buffer } = require('buffer') const log = require('debug')('ipfs-http-client:pubsub:subscribe') -const { objectToQuery } = require('../lib/querystring') const configure = require('../lib/configure') -const { ok, toIterable } = require('../lib/fetch') +const toIterable = require('../lib/stream-to-iterable') const SubscriptionTracker = require('./subscription-tracker') -module.exports = configure(({ fetch, apiAddr, apiPath, headers }) => { +module.exports = configure(({ ky }) => { const subsTracker = SubscriptionTracker.singleton() - const publish = require('./publish')({ fetch, apiAddr, apiPath, headers }) + const publish = require('./publish')({ ky }) return async (topic, handler, options) => { options = options || {} options.signal = subsTracker.subscribe(topic, handler, options.signal) - const qs = objectToQuery({ - arg: topic, - discover: options.discover, - ...(options.qs || {}) - }) + const searchParams = new URLSearchParams(options.searchParams) + searchParams.set('arg', topic) + if (options.discover != null) searchParams.set('discover', options.discover) - const url = `${apiAddr}${apiPath}/pubsub/sub${qs}` let res // In Firefox, the initial call to fetch does not resolve until some data @@ -40,11 +36,12 @@ module.exports = configure(({ fetch, apiAddr, apiPath, headers }) => { }, 1000) try { - res = await ok(fetch(url, { - method: 'POST', + res = await ky.post('pubsub/sub', { + timeout: options.timeout, signal: options.signal, - headers: options.headers || headers - })) + headers: options.headers, + searchParams + }) } catch (err) { // Initial subscribe fail, ensure we clean up subsTracker.unsubscribe(topic, handler) throw err diff --git a/src/pubsub/unsubscribe.js b/src/pubsub/unsubscribe.js index a8bd14944..6e7c727f4 100644 --- a/src/pubsub/unsubscribe.js +++ b/src/pubsub/unsubscribe.js @@ -3,7 +3,7 @@ const configure = require('../lib/configure') const SubscriptionTracker = require('./subscription-tracker') -module.exports = configure(({ fetch, apiAddr, apiPath, headers }) => { +module.exports = configure(({ ky }) => { const subsTracker = SubscriptionTracker.singleton() // eslint-disable-next-line require-await return async (topic, handler) => subsTracker.unsubscribe(topic, handler) diff --git a/test/lib.error-handler.spec.js b/test/lib.error-handler.spec.js new file mode 100644 index 000000000..4e97260ec --- /dev/null +++ b/test/lib.error-handler.spec.js @@ -0,0 +1,54 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const { HTTPError } = require('ky-universal') +const expect = chai.expect +chai.use(dirtyChai) +const throwsAsync = require('./utils/throws-async') +const errorHandler = require('../src/lib/error-handler') + +describe('lib/error-handler', () => { + it('should parse json error response', async () => { + const res = { + ok: false, + headers: { get: () => 'application/json' }, + json: () => Promise.resolve({ + Message: 'boom', + Code: 0, + Type: 'error' + }), + status: 500 + } + + const err = await throwsAsync(errorHandler(res)) + + expect(err.message).to.eql('boom') + expect(err.status).to.eql(500) + }) + + it('should gracefully fail on parse json', async () => { + const res = { + ok: false, + headers: { get: () => 'application/json' }, + json: () => 'boom', // not valid json! + status: 500 + } + + const err = await throwsAsync(errorHandler(res)) + expect(err instanceof HTTPError).to.be.true() + }) + + it('should gracefully fail on read text', async () => { + const res = { + ok: false, + headers: { get: () => 'text/plain' }, + text: () => Promise.reject(new Error('boom')), + status: 500 + } + + const err = await throwsAsync(errorHandler(res)) + expect(err instanceof HTTPError).to.be.true() + }) +}) diff --git a/test/lib.fetch.spec.js b/test/lib.fetch.spec.js deleted file mode 100644 index 3b260a022..000000000 --- a/test/lib.fetch.spec.js +++ /dev/null @@ -1,92 +0,0 @@ -/* eslint-env mocha */ -'use strict' - -const chai = require('chai') -const dirtyChai = require('dirty-chai') -const expect = chai.expect -chai.use(dirtyChai) -const throwsAsync = require('./utils/throws-async') - -const { ok, toIterable } = require('../src/lib/fetch') - -describe('lib/fetch', () => { - describe('ok', () => { - it('should parse json error response', async () => { - const res = { - ok: false, - text: () => Promise.resolve(JSON.stringify({ - Message: 'boom', - Code: 0, - Type: 'error' - })), - status: 500 - } - - const err = await throwsAsync(ok(res)) - - expect(err.message).to.eql('boom') - expect(err.status).to.eql(500) - }) - - it('should gracefully fail on parse json', async () => { - const res = { - ok: false, - text: () => 'boom', // not valid json! - status: 500 - } - - const err = await throwsAsync(ok(res)) - - expect(err.message).to.eql('boom') - expect(err.status).to.eql(500) - }) - - it('should gracefully fail on read text', async () => { - const res = { - ok: false, - text: () => Promise.reject(new Error('boom')), - status: 500 - } - - const err = await throwsAsync(ok(res)) - - expect(err.message).to.eql('unexpected status 500') - expect(err.status).to.eql(500) - }) - }) - - describe('toIterable', () => { - it('should return input if already async iterable', () => { - const input = { [Symbol.asyncIterator] () { return this } } - expect(toIterable(input)).to.equal(input) - }) - - it('should convert reader to async iterable', async () => { - const inputData = [2, 31, 3, 4] - const input = { - getReader () { - let i = 0 - return { - read () { - return i === inputData.length - ? { done: true } - : { value: inputData[i++] } - }, - releaseLock () {} - } - } - } - - const chunks = [] - for await (const chunk of toIterable(input)) { - chunks.push(chunk) - } - - expect(chunks).to.eql(inputData) - }) - - it('should throw on unknown stream', () => { - expect(() => toIterable({})).to.throw('unknown stream') - }) - }) -}) diff --git a/test/lib.stream-to-iterable.spec.js b/test/lib.stream-to-iterable.spec.js new file mode 100644 index 000000000..6c14cac94 --- /dev/null +++ b/test/lib.stream-to-iterable.spec.js @@ -0,0 +1,43 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const expect = chai.expect +chai.use(dirtyChai) +const toIterable = require('../src/lib/stream-to-iterable') + +describe('lib/stream-to-iterable', () => { + it('should return input if already async iterable', () => { + const input = { [Symbol.asyncIterator] () { return this } } + expect(toIterable(input)).to.equal(input) + }) + + it('should convert reader to async iterable', async () => { + const inputData = [2, 31, 3, 4] + const input = { + getReader () { + let i = 0 + return { + read () { + return i === inputData.length + ? { done: true } + : { value: inputData[i++] } + }, + releaseLock () {} + } + } + } + + const chunks = [] + for await (const chunk of toIterable(input)) { + chunks.push(chunk) + } + + expect(chunks).to.eql(inputData) + }) + + it('should throw on unknown stream', () => { + expect(() => toIterable({})).to.throw('unknown stream') + }) +}) From e600d98a4617eb0c8d7b234eb4297f536c653422 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Tue, 30 Jul 2019 12:15:25 +0100 Subject: [PATCH 18/27] fix: configure tests License: MIT Signed-off-by: Alan Shaw --- src/lib/configure.js | 10 +++------- test/lib.configure.spec.js | 6 +++--- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/src/lib/configure.js b/src/lib/configure.js index 3acb10db6..a9036d1cd 100644 --- a/src/lib/configure.js +++ b/src/lib/configure.js @@ -1,7 +1,7 @@ 'use strict' /* eslint-env browser */ -const ky = require('ky-universal') +const ky = require('ky-universal').default const { isBrowser, isWebWorker } = require('ipfs-utils/src/env') const { toUri } = require('./multiaddr') const errorHandler = require('./error-handler') @@ -18,11 +18,6 @@ module.exports = create => config => { config = { ...config } } - if (config.protocol || config.host || config.port) { - const port = config.port ? `:${config.port}` : '' - config.apiAddr = `${config.protocol || 'http'}://${config.host || 'localhost'}${port}` - } - config.apiAddr = (config.apiAddr || getDefaultApiAddr(config)).toString() config.apiAddr = config.apiAddr.startsWith('/') ? toUri(config.apiAddr) : config.apiAddr config.apiPath = config.apiPath || config['api-path'] || '/api/v0' @@ -37,7 +32,8 @@ module.exports = create => config => { hooks: { afterResponse: [errorHandler] } - }) + }), + ...config }) } diff --git a/test/lib.configure.spec.js b/test/lib.configure.spec.js index e28abd6c6..f58ca4de7 100644 --- a/test/lib.configure.spec.js +++ b/test/lib.configure.spec.js @@ -14,7 +14,7 @@ describe('lib/configure', () => { it('should accept no config', () => { configure(config => { if (isBrowser || isWebWorker) { - expect(config.apiAddr).to.eql(location.origin) + expect(config.apiAddr).to.eql('') } else { expect(config.apiAddr).to.eql('http://localhost:5001') } @@ -48,7 +48,7 @@ describe('lib/configure', () => { if (isBrowser || isWebWorker) { expect(config.apiAddr).to.eql(`https://${location.host}`) } else { - expect(config.apiAddr).to.eql('https://localhost') + expect(config.apiAddr).to.eql('https://localhost:5001') } })(input) }) @@ -59,7 +59,7 @@ describe('lib/configure', () => { if (isBrowser || isWebWorker) { expect(config.apiAddr).to.eql(`http://ipfs.io:${location.port}`) } else { - expect(config.apiAddr).to.eql('http://ipfs.io') + expect(config.apiAddr).to.eql('http://ipfs.io:5001') } })(input) }) From 29647dba6049d14033a57c20ed501cceda9f068e Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Tue, 30 Jul 2019 12:23:07 +0100 Subject: [PATCH 19/27] chore: appease linter License: MIT Signed-off-by: Alan Shaw --- src/pubsub/publish.js | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/pubsub/publish.js b/src/pubsub/publish.js index 3e63ee6f4..c13130e7f 100644 --- a/src/pubsub/publish.js +++ b/src/pubsub/publish.js @@ -14,11 +14,13 @@ module.exports = configure(({ ky }) => { const searchParams = new URLSearchParams(options.searchParams) searchParams.set('arg', topic) - return ky.post(`pubsub/pub?${searchParams}&arg=${encodeBuffer(data)}`, { + const res = await ky.post(`pubsub/pub?${searchParams}&arg=${encodeBuffer(data)}`, { timeout: options.timeout, signal: options.signal, headers: options.headers }).text() + + return res } }) From f973a13efd74fda98e0b18331d9213c69892d755 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Mon, 29 Jul 2019 20:43:09 +0100 Subject: [PATCH 20/27] feat: add, addPullStream, addFromFs and addFromStream License: MIT Signed-off-by: Alan Shaw --- package.json | 5 +- src/add-from-url.js | 25 +++++ src/add/form-data.browser.js | 30 ++++++ src/add/form-data.js | 42 ++++++++ src/add/index.js | 56 +++++++++++ src/add/normalise-input.js | 130 +++++++++++++++++++++++++ src/files-regular/add-from-url.js | 70 ------------- src/files-regular/add-pull-stream.js | 13 --- src/files-regular/add.js | 55 ----------- src/lib/blob-to-async-iterable.js | 30 ++++++ src/lib/file-data-to-async-iterable.js | 54 ++++++++++ src/lib/iterable-to-readable-stream.js | 101 +++++++++++++++++++ src/lib/iterable.js | 14 +++ src/lib/object-to-camel.js | 21 ++++ src/utils/load-commands.js | 43 +++++++- test/interface.spec.js | 12 +++ 16 files changed, 558 insertions(+), 143 deletions(-) create mode 100644 src/add-from-url.js create mode 100644 src/add/form-data.browser.js create mode 100644 src/add/form-data.js create mode 100644 src/add/index.js create mode 100644 src/add/normalise-input.js delete mode 100644 src/files-regular/add-from-url.js delete mode 100644 src/files-regular/add-pull-stream.js delete mode 100644 src/files-regular/add.js create mode 100644 src/lib/blob-to-async-iterable.js create mode 100644 src/lib/file-data-to-async-iterable.js create mode 100644 src/lib/iterable-to-readable-stream.js create mode 100644 src/lib/iterable.js create mode 100644 src/lib/object-to-camel.js diff --git a/package.json b/package.json index fd5e04c79..68ff24f34 100644 --- a/package.json +++ b/package.json @@ -17,7 +17,8 @@ "browser": { "glob": false, "fs": false, - "stream": "readable-stream" + "stream": "readable-stream", + "./src/add/form-data.js": "./src/add/form-data.browser.js" }, "repository": "github:ipfs/js-ipfs-http-client", "scripts": { @@ -35,6 +36,7 @@ "dependencies": { "abort-controller": "^3.0.0", "async": "^2.6.1", + "async-iterator-to-pull-stream": "^1.3.0", "bignumber.js": "^9.0.0", "bl": "^3.0.0", "bs58": "^4.0.1", @@ -77,6 +79,7 @@ "promisify-es6": "^1.0.3", "pull-defer": "~0.2.3", "pull-stream": "^3.6.9", + "pull-stream-to-async-iterator": "^1.0.2", "pull-to-stream": "~0.1.1", "pump": "^3.0.0", "qs": "^6.5.2", diff --git a/src/add-from-url.js b/src/add-from-url.js new file mode 100644 index 000000000..b3e21af57 --- /dev/null +++ b/src/add-from-url.js @@ -0,0 +1,25 @@ +'use strict' + +const configure = require('./lib/configure') +const { ok, toIterable } = require('./lib/fetch') + +module.exports = configure(({ fetch, apiAddr, apiPath, headers }) => { + const add = require('./add')({ fetch, apiAddr, apiPath, headers }) + + return (url, options) => (async function * () { + options = options || {} + const res = await ok(fetch(url, { + signal: options.signal, + headers: options.headers || headers + })) + + const input = { + path: decodeURIComponent(new URL(url).pathname.split('/').pop() || ''), + content: toIterable(res.body) + } + + for await (const file of add(input, options)) { + yield file + } + })() +}) diff --git a/src/add/form-data.browser.js b/src/add/form-data.browser.js new file mode 100644 index 000000000..46f0254d5 --- /dev/null +++ b/src/add/form-data.browser.js @@ -0,0 +1,30 @@ +'use strict' +/* eslint-env browser */ + +const normaliseInput = require('./normalise-input') + +exports.toFormData = async (input) => { + const files = normaliseInput(input) + const formData = new FormData() + let i = 0 + + for await (const file of files) { + if (file.content) { + // In the browser there's _currently_ no streaming upload, buffer up our + // async iterator chunks and append a big Blob :( + // One day, this will be browser streams + const bufs = [] + for await (const chunk of file.content) { + bufs.push(Buffer.isBuffer(chunk) ? chunk.buffer : chunk) + } + + formData.append(`file-${i}`, new Blob(bufs, { type: 'application/octet-stream' }), file.path) + } else { + formData.append(`dir-${i}`, new Blob([], { type: 'application/x-directory' }), file.path) + } + + i++ + } + + return formData +} diff --git a/src/add/form-data.js b/src/add/form-data.js new file mode 100644 index 000000000..1ffa66f1d --- /dev/null +++ b/src/add/form-data.js @@ -0,0 +1,42 @@ +'use strict' + +const FormData = require('form-data') +const { Buffer } = require('buffer') +const normaliseInput = require('./normalise-input') +const toStream = require('../lib/iterable-to-readable-stream') + +exports.toFormData = async (input) => { + const files = normaliseInput(input) + const formData = new FormData() + let i = 0 + + for await (const file of files) { + if (file.content) { + // In Node.js, FormData can be passed a stream so no need to buffer + formData.append( + `file-${i}`, + // FIXME: add a `path` property to the stream so `form-data` doesn't set + // a Content-Length header that is only the sum of the size of the + // header/footer when knownLength option (below) is null. + Object.assign( + toStream(file.content), + { path: file.path || `file-${i}` } + ), + { + filepath: file.path, + contentType: 'application/octet-stream', + knownLength: file.content.length // Send Content-Length header if known + } + ) + } else { + formData.append(`dir-${i}`, Buffer.alloc(0), { + filepath: file.path, + contentType: 'application/x-directory' + }) + } + + i++ + } + + return formData +} diff --git a/src/add/index.js b/src/add/index.js new file mode 100644 index 000000000..1abd1a098 --- /dev/null +++ b/src/add/index.js @@ -0,0 +1,56 @@ +'use strict' + +const ndjson = require('iterable-ndjson') +const { objectToQuery } = require('../lib/querystring') +const configure = require('../lib/configure') +const { ok, toIterable } = require('../lib/fetch') +const { toFormData } = require('./form-data') +const toCamel = require('../lib/object-to-camel') + +module.exports = configure(({ fetch, apiAddr, apiPath, headers }) => { + return (input, options) => (async function * () { + options = options || {} + + const qs = objectToQuery({ + 'stream-channels': true, + chunker: options.chunker, + 'cid-version': options.cidVersion, + 'cid-base': options.cidBase, + 'enable-sharding-experiment': options.enableShardingExperiment, + hash: options.hashAlg, + 'only-hash': options.onlyHash, + pin: options.pin, + progress: options.progress ? true : null, + quiet: options.quiet, + quieter: options.quieter, + 'raw-leaves': options.rawLeaves, + 'shard-split-threshold': options.shardSplitThreshold, + silent: options.silent, + trickle: options.trickle, + 'wrap-with-directory': options.wrapWithDirectory, + ...(options.qs || {}) + }) + + const url = `${apiAddr}${apiPath}/add${qs}` + const res = await ok(fetch(url, { + method: 'POST', + signal: options.signal, + headers: options.headers || headers, + body: await toFormData(input) + })) + + for await (let file of ndjson(toIterable(res.body))) { + file = toCamel(file) + // console.log(file) + if (options.progress && file.bytes) { + options.progress(file.bytes) + } else { + yield toCoreInterface(file) + } + } + })() +}) + +function toCoreInterface ({ name, hash, size }) { + return { path: name, hash, size: parseInt(size) } +} diff --git a/src/add/normalise-input.js b/src/add/normalise-input.js new file mode 100644 index 000000000..561324838 --- /dev/null +++ b/src/add/normalise-input.js @@ -0,0 +1,130 @@ +'use strict' +/* eslint-env browser */ + +const { Buffer } = require('buffer') +const errCode = require('err-code') +const toAsyncIterable = require('../lib/file-data-to-async-iterable') + +/* +Transform one of: + +Buffer|ArrayBuffer|TypedArray +Blob|File +{ path, content: Buffer } +{ path, content: Blob } +{ path, content: Iterable } +{ path, content: AsyncIterable } +{ path, content: PullStream } +Iterable +Iterable<{ path, content: Buffer }> +Iterable<{ path, content: Blob }> +Iterable<{ path, content: Iterable }> +Iterable<{ path, content: AsyncIterable }> +Iterable<{ path, content: PullStream }> +AsyncIterable +AsyncIterable<{ path, content: Buffer }> +AsyncIterable<{ path, content: Blob }> +AsyncIterable<{ path, content: Iterable }> +AsyncIterable<{ path, content: AsyncIterable }> +AsyncIterable<{ path, content: PullStream }> +PullStream + +Into: + +AsyncIterable<{ path, content: AsyncIterable }> +*/ + +module.exports = function normalizeInput (input) { + // Buffer|ArrayBuffer|TypedArray + if (Buffer.isBuffer(input) || ArrayBuffer.isView(input) || input instanceof ArrayBuffer) { + return (async function * () { // eslint-disable-line require-await + yield normalizeTuple({ path: '', content: input }) + })() + } + + // Blob|File + if (typeof Blob !== 'undefined' && input instanceof Blob) { + return (async function * () { // eslint-disable-line require-await + yield normalizeTuple({ path: '', content: input }) + })() + } + + // Iterable + // Iterable<{ path, content: Buffer }> + // Iterable<{ path, content: Blob }> + // Iterable<{ path, content: Iterable }> + // Iterable<{ path, content: AsyncIterable }> + // Iterable<{ path, content: PullStream }> + if (input[Symbol.iterator]) { + return (async function * () { // eslint-disable-line require-await + for (const chunk of input) { + if (typeof chunk === 'object' && (chunk.path || chunk.content)) { + yield normalizeTuple(chunk) + } else if (Number.isInteger(chunk)) { // Must be an Iterable i.e. Buffer/ArrayBuffer/Array of bytes + yield normalizeTuple({ path: '', content: input }) + return + } else { + throw errCode(new Error('Unexpected input: ' + typeof chunk), 'ERR_UNEXPECTED_INPUT') + } + } + })() + } + + // AsyncIterable + // AsyncIterable<{ path, content: Buffer }> + // AsyncIterable<{ path, content: Blob }> + // AsyncIterable<{ path, content: Iterable }> + // AsyncIterable<{ path, content: AsyncIterable }> + // AsyncIterable<{ path, content: PullStream }> + if (input[Symbol.asyncIterator]) { + return (async function * () { + for await (const chunk of input) { + if (typeof chunk === 'object' && (chunk.path || chunk.content)) { + yield normalizeTuple(chunk) + } else { // Must be an AsyncIterable i.e. a Stream + let path = '' + + // fs.createReadStream will create a stream with a `path` prop + // If available, use it here! + if (input.path && input.path.split) { + path = input.path.split(/[/\\]/).pop() || '' + } + + yield normalizeTuple({ + path, + content: (async function * () { + yield chunk + for await (const restChunk of input) { + yield restChunk + } + })() + }) + return + } + } + })() + } + + // { path, content: Buffer } + // { path, content: Blob } + // { path, content: Iterable } + // { path, content: AsyncIterable } + // { path, content: PullStream } + if (typeof input === 'object' && (input.path || input.content)) { + // eslint-disable-next-line require-await + return (async function * () { yield normalizeTuple(input) })() + } + + // PullStream + if (typeof input === 'function') { + return (async function * () { // eslint-disable-line require-await + yield normalizeTuple({ path: '', content: input }) + })() + } + + throw errCode(new Error('Unexpected input: ' + typeof input), 'ERR_UNEXPECTED_INPUT') +} + +function normalizeTuple ({ path, content }) { + return { path: path || '', content: content ? toAsyncIterable(content) : null } +} diff --git a/src/files-regular/add-from-url.js b/src/files-regular/add-from-url.js deleted file mode 100644 index f1065f2de..000000000 --- a/src/files-regular/add-from-url.js +++ /dev/null @@ -1,70 +0,0 @@ -'use strict' - -const promisify = require('promisify-es6') -const { URL } = require('iso-url') -const { getRequest } = require('iso-stream-http') -const SendOneFile = require('../utils/send-one-file-multiple-results') -const FileResultStreamConverter = require('../utils/file-result-stream-converter') - -module.exports = (send) => { - const sendOneFile = SendOneFile(send, 'add') - - return promisify((url, opts, callback) => { - if (typeof (opts) === 'function' && - callback === undefined) { - callback = opts - opts = {} - } - - // opts is the real callback -- - // 'callback' is being injected by promisify - if (typeof opts === 'function' && - typeof callback === 'function') { - callback = opts - opts = {} - } - - if (!validUrl(url)) { - return callback(new Error('"url" param must be an http(s) url')) - } - - requestWithRedirect(url, opts, sendOneFile, callback) - }) -} - -const validUrl = (url) => typeof url === 'string' && url.startsWith('http') - -const requestWithRedirect = (url, opts, sendOneFile, callback) => { - const parsedUrl = new URL(url) - - const req = getRequest(parsedUrl, (res) => { - if (res.statusCode >= 400) { - return callback(new Error(`Failed to download with ${res.statusCode}`)) - } - - const redirection = res.headers.location - - if (res.statusCode >= 300 && res.statusCode < 400 && redirection) { - if (!validUrl(redirection)) { - return callback(new Error('redirection url must be an http(s) url')) - } - - requestWithRedirect(redirection, opts, sendOneFile, callback) - } else { - const requestOpts = { - qs: opts, - converter: FileResultStreamConverter - } - const fileName = decodeURIComponent(parsedUrl.pathname.split('/').pop()) - - sendOneFile({ - content: res, - path: fileName - }, requestOpts, callback) - } - }) - - req.once('error', callback) - - req.end() -} diff --git a/src/files-regular/add-pull-stream.js b/src/files-regular/add-pull-stream.js deleted file mode 100644 index 2076ffa8d..000000000 --- a/src/files-regular/add-pull-stream.js +++ /dev/null @@ -1,13 +0,0 @@ -'use strict' - -const SendFilesStream = require('../utils/send-files-stream') -const FileResultStreamConverter = require('../utils/file-result-stream-converter') -const toPull = require('stream-to-pull-stream') - -module.exports = (send) => { - return (options) => { - options = options || {} - options.converter = FileResultStreamConverter - return toPull(SendFilesStream(send, 'add')({ qs: options })) - } -} diff --git a/src/files-regular/add.js b/src/files-regular/add.js deleted file mode 100644 index cb5898265..000000000 --- a/src/files-regular/add.js +++ /dev/null @@ -1,55 +0,0 @@ -'use strict' - -const promisify = require('promisify-es6') -const ConcatStream = require('concat-stream') -const once = require('once') -const { isSource } = require('is-pull-stream') -const FileResultStreamConverter = require('../utils/file-result-stream-converter') -const SendFilesStream = require('../utils/send-files-stream') -const validateAddInput = require('ipfs-utils/src/files/add-input-validation') - -module.exports = (send) => { - const createAddStream = SendFilesStream(send, 'add') - - const add = promisify((_files, options, _callback) => { - if (typeof options === 'function') { - _callback = options - options = null - } - const callback = once(_callback) - - if (!options) { - options = {} - } - options.converter = FileResultStreamConverter - - try { - validateAddInput(_files) - } catch (err) { - return callback(err) - } - - const files = [].concat(_files) - - const stream = createAddStream({ qs: options }) - const concat = ConcatStream((result) => callback(null, result)) - stream.once('error', callback) - stream.pipe(concat) - - files.forEach((file) => stream.write(file)) - stream.end() - }) - - return function () { - const args = Array.from(arguments) - - // If we files.add(), then promisify thinks the pull stream is - // a callback! Add an empty options object in this case so that a promise - // is returned. - if (args.length === 1 && isSource(args[0])) { - args.push({}) - } - - return add.apply(null, args) - } -} diff --git a/src/lib/blob-to-async-iterable.js b/src/lib/blob-to-async-iterable.js new file mode 100644 index 000000000..3f9e982c0 --- /dev/null +++ b/src/lib/blob-to-async-iterable.js @@ -0,0 +1,30 @@ +'use strict' +/* eslint-env browser */ + +// Convert a Blob into an AsyncIterable +module.exports = (blob, options) => (async function * () { + options = options || {} + + const reader = new FileReader() + const chunkSize = options.chunkSize || 1024 * 1024 + let offset = options.offset || 0 + + const getNextChunk = () => new Promise((resolve, reject) => { + reader.onloadend = e => { + const data = e.target.result + resolve(data.byteLength === 0 ? null : data) + } + reader.onerror = reject + + const end = offset + chunkSize + const slice = blob.slice(offset, end) + reader.readAsArrayBuffer(slice) + offset = end + }) + + while (true) { + const data = await getNextChunk() + if (data == null) return + yield data + } +})() diff --git a/src/lib/file-data-to-async-iterable.js b/src/lib/file-data-to-async-iterable.js new file mode 100644 index 000000000..ffa0f8d11 --- /dev/null +++ b/src/lib/file-data-to-async-iterable.js @@ -0,0 +1,54 @@ +'use strict' +/* eslint-env browser */ + +const toIterator = require('pull-stream-to-async-iterator') +const { Buffer } = require('buffer') +const blobToAsyncIterable = require('../lib/blob-to-async-iterable') + +/* +Transform one of: + +Buffer|ArrayBuffer|TypedArray +Blob|File +Iterable +AsyncIterable +PullStream + +Into: + +AsyncIterable +*/ +module.exports = function toAsyncIterable (input) { + // Buffer|ArrayBuffer|TypedArray|array of bytes + if (input[Symbol.iterator]) { + const buf = Buffer.from(input) + return Object.assign( + (async function * () { yield buf })(), // eslint-disable-line require-await + { length: buf.length } + ) + } + + // Blob|File + if (typeof Blob !== 'undefined' && input instanceof Blob) { + return Object.assign( + blobToAsyncIterable(input), + { length: input.size } + ) + } + + // AsyncIterable + if (input[Symbol.asyncIterator]) { + return (async function * () { + for await (const chunk of input) { + yield Buffer.from(chunk) + } + })() + } + + // PullStream + if (typeof input === 'function') { + return toIterator(input) + } + + throw new Error('Unexpected input: ' + typeof input) +} diff --git a/src/lib/iterable-to-readable-stream.js b/src/lib/iterable-to-readable-stream.js new file mode 100644 index 000000000..06b3783dd --- /dev/null +++ b/src/lib/iterable-to-readable-stream.js @@ -0,0 +1,101 @@ +'use strict' + +const { Readable, Writable } = require('stream') + +function toReadable (source) { + let reading = false + return new Readable({ + async read (size) { + if (reading) return + reading = true + + try { + while (true) { + const { value, done } = await source.next(size) + if (done) return this.push(null) + if (!this.push(value)) break + } + } catch (err) { + this.emit('error', err) + if (source.return) source.return() + } finally { + reading = false + } + } + }) +} + +module.exports = toReadable +module.exports.readable = toReadable + +function toWritable (sink) { + const END_CHUNK = {} + + class Buf { + constructor () { + this._buffer = [] + this._waitingConsumers = [] + this._consuming = false + } + + push (chunk) { + let resolver + const pushPromise = new Promise((resolve, reject) => { + resolver = { resolve, reject } + }) + this._buffer.push({ chunk, resolver }) + this._consume() + return pushPromise + } + + _consume () { + if (this._consuming) return + this._consuming = true + + while (this._waitingConsumers.length && this._buffer.length) { + const nextConsumer = this._waitingConsumers.shift() + const nextChunk = this._buffer.shift() + nextConsumer.resolver.resolve(nextChunk) + nextChunk.resolver.resolve() + } + + this._consuming = false + } + + consume () { + let resolver + const consumePromise = new Promise((resolve, reject) => { + resolver = { resolve, reject } + }) + this._waitingConsumers.push({ resolver }) + this._consume() + return consumePromise + } + } + + const buf = new Buf() + + const it = { + async next () { + const chunk = await buf.consume() + return chunk === END_CHUNK ? { done: true } : { value: chunk } + } + } + + sink({ + [Symbol.asyncIterator] () { + return it + } + }) + + return new Writable({ + write (chunk, enc, cb) { + buf.push(chunk).then(() => cb(), cb) + }, + final (cb) { + buf.push(END_CHUNK).then(() => cb(), cb) + } + }) +} + +module.exports.toWritable = toWritable diff --git a/src/lib/iterable.js b/src/lib/iterable.js new file mode 100644 index 000000000..4c59ff510 --- /dev/null +++ b/src/lib/iterable.js @@ -0,0 +1,14 @@ +'use strict' + +const toPull = require('async-iterator-to-pull-stream') + +exports.collectify = fn => async (...args) => { + const items = [] + for await (const item of fn(...args)) items.push(item) + return items +} + +exports.pullify = { + source: fn => (...args) => toPull(fn(...args)), + transform: fn => (...args) => toPull.transform(source => fn(source, ...args)) +} diff --git a/src/lib/object-to-camel.js b/src/lib/object-to-camel.js new file mode 100644 index 000000000..f13b2b6a1 --- /dev/null +++ b/src/lib/object-to-camel.js @@ -0,0 +1,21 @@ +'use strict' + +// Convert object properties to camel case. +// NOT recursive! +// e.g. +// AgentVersion => agentVersion +// ID => id +module.exports = obj => { + if (obj == null) return obj + const caps = /^[A-Z]+$/ + return Object.keys(obj).reduce((camelObj, k) => { + if (caps.test(k)) { // all caps + camelObj[k.toLowerCase()] = obj[k] + } else if (caps.test(k[0])) { // pascal + camelObj[k[0].toLowerCase() + k.slice(1)] = obj[k] + } else { + camelObj[k] = obj[k] + } + return camelObj + }, {}) +} diff --git a/src/utils/load-commands.js b/src/utils/load-commands.js index e4a914dd0..0598cecc5 100644 --- a/src/utils/load-commands.js +++ b/src/utils/load-commands.js @@ -1,14 +1,49 @@ 'use strict' +const nodeify = require('promise-nodeify') +const { collectify, pullify } = require('../lib/iterable') + function requireCommands () { return { // Files Regular (not MFS) - add: require('../files-regular/add'), + add: (_, config) => { + const add = collectify(require('../add')(config)) + return (input, options, callback) => { + if (typeof options === 'function') { + callback = options + options = {} + } + return nodeify(add(input, options), callback) + } + }, + // TODO: convert addReadableStream: require('../files-regular/add-readable-stream'), - addPullStream: require('../files-regular/add-pull-stream'), + addPullStream: (_, config) => { + const add = require('../add')(config) + return pullify.transform(add) + }, + // TODO: convert addFromFs: require('../files-regular/add-from-fs'), - addFromURL: require('../files-regular/add-from-url'), - addFromStream: require('../files-regular/add'), + addFromURL: (_, config) => { + const addFromURL = collectify(require('../add-from-url')(config)) + return (url, options, callback) => { + if (typeof options === 'function') { + callback = options + options = {} + } + return nodeify(addFromURL(url, options), callback) + } + }, + addFromStream: (_, config) => { + const add = collectify(require('../add')(config)) + return (input, options, callback) => { + if (typeof options === 'function') { + callback = options + options = {} + } + return nodeify(add(input, options), callback) + } + }, cat: require('../files-regular/cat'), catReadableStream: require('../files-regular/cat-readable-stream'), catPullStream: require('../files-regular/cat-pull-stream'), diff --git a/test/interface.spec.js b/test/interface.spec.js index 736da0154..928aef66a 100644 --- a/test/interface.spec.js +++ b/test/interface.spec.js @@ -116,6 +116,14 @@ describe('interface-ipfs-core tests', () => { name: 'should add a nested directory as array of tupples with progress', reason: 'FIXME https://github.com/ipfs/js-ipfs-http-client/issues/339' }, + { + name: 'should not be able to add a string', + reason: 'FIXME test needs to change to inspect error code ERR_UNEXPECTED_INPUT' + }, + { + name: 'should not be able to add a non-Buffer TypedArray', + reason: 'TODO remove test, this should be supported' + }, // .addPullStream isNode ? null : { name: 'should add pull stream of valid files and dirs', @@ -131,6 +139,10 @@ describe('interface-ipfs-core tests', () => { name: 'addFromStream', reason: 'Not designed to run in the browser' }, + { + name: 'should add from a stream', + reason: 'TODO change test to use readable-stream@3 with async iterator support' + }, // .addFromFs isNode ? null : { name: 'addFromFs', From 625a6a26251f6f34944dabfa35fcf4e3ce5b4a4a Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Tue, 30 Jul 2019 20:26:00 +0100 Subject: [PATCH 21/27] refactor: use ky License: MIT Signed-off-by: Alan Shaw --- src/add-from-url.js | 15 ++++++------- src/add/index.js | 52 ++++++++++++++++++++++----------------------- 2 files changed, 32 insertions(+), 35 deletions(-) diff --git a/src/add-from-url.js b/src/add-from-url.js index b3e21af57..e98169eed 100644 --- a/src/add-from-url.js +++ b/src/add-from-url.js @@ -1,21 +1,20 @@ 'use strict' +const kyDefault = require('ky-universal').default const configure = require('./lib/configure') -const { ok, toIterable } = require('./lib/fetch') +const toIterable = require('./lib/stream-to-iterable') -module.exports = configure(({ fetch, apiAddr, apiPath, headers }) => { - const add = require('./add')({ fetch, apiAddr, apiPath, headers }) +module.exports = configure(({ ky }) => { + const add = require('./add')({ ky }) return (url, options) => (async function * () { options = options || {} - const res = await ok(fetch(url, { - signal: options.signal, - headers: options.headers || headers - })) + + const { body } = await kyDefault.get(url) const input = { path: decodeURIComponent(new URL(url).pathname.split('/').pop() || ''), - content: toIterable(res.body) + content: toIterable(body) } for await (const file of add(input, options)) { diff --git a/src/add/index.js b/src/add/index.js index 1abd1a098..34a5d33bd 100644 --- a/src/add/index.js +++ b/src/add/index.js @@ -1,43 +1,41 @@ 'use strict' const ndjson = require('iterable-ndjson') -const { objectToQuery } = require('../lib/querystring') const configure = require('../lib/configure') -const { ok, toIterable } = require('../lib/fetch') +const toIterable = require('../lib/stream-to-iterable') const { toFormData } = require('./form-data') const toCamel = require('../lib/object-to-camel') -module.exports = configure(({ fetch, apiAddr, apiPath, headers }) => { +module.exports = configure(({ ky }) => { return (input, options) => (async function * () { options = options || {} - const qs = objectToQuery({ - 'stream-channels': true, - chunker: options.chunker, - 'cid-version': options.cidVersion, - 'cid-base': options.cidBase, - 'enable-sharding-experiment': options.enableShardingExperiment, - hash: options.hashAlg, - 'only-hash': options.onlyHash, - pin: options.pin, - progress: options.progress ? true : null, - quiet: options.quiet, - quieter: options.quieter, - 'raw-leaves': options.rawLeaves, - 'shard-split-threshold': options.shardSplitThreshold, - silent: options.silent, - trickle: options.trickle, - 'wrap-with-directory': options.wrapWithDirectory, - ...(options.qs || {}) - }) + const searchParams = new URLSearchParams(options.searchParams) + + searchParams.set('stream-channels', true) + if (options.chunker) searchParams.set('chunker', options.chunker) + if (options.cidVersion) searchParams.set('cid-version', options.cidVersion) + if (options.cidBase) searchParams.set('cid-base', options.cidBase) + if (options.enableShardingExperiment != null) searchParams.set('enable-sharding-experiment', options.enableShardingExperiment) + if (options.hashAlg) searchParams.set('hash', options.hashAlg) + if (options.onlyHash != null) searchParams.set('only-hash', options.onlyHash) + if (options.pin != null) searchParams.set('pin', options.pin) + if (options.progress) searchParams.set('progress', true) + if (options.quiet != null) searchParams.set('quiet', options.quiet) + if (options.quieter != null) searchParams.set('quieter', options.quieter) + if (options.rawLeaves != null) searchParams.set('raw-leaves', options.rawLeaves) + if (options.shardSplitThreshold) searchParams.set('shard-split-threshold', options.shardSplitThreshold) + if (options.silent) searchParams.set('silent', options.silent) + if (options.trickle != null) searchParams.set('trickle', options.trickle) + if (options.wrapWithDirectory != null) searchParams.set('wrap-with-directory', options.wrapWithDirectory) - const url = `${apiAddr}${apiPath}/add${qs}` - const res = await ok(fetch(url, { - method: 'POST', + const res = await ky.post('add', { + timeout: options.timeout, signal: options.signal, - headers: options.headers || headers, + headers: options.headers, + searchParams, body: await toFormData(input) - })) + }) for await (let file of ndjson(toIterable(res.body))) { file = toCamel(file) From c0ef7698250622c6306c83c86bb8b926b86f021a Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Sat, 3 Aug 2019 13:45:25 +0100 Subject: [PATCH 22/27] refactor: convert addReadableStream License: MIT Signed-off-by: Alan Shaw --- package.json | 1 + src/lib/iterable.js | 5 +++++ src/utils/load-commands.js | 8 +++++--- 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/package.json b/package.json index 68ff24f34..30a13a11c 100644 --- a/package.json +++ b/package.json @@ -60,6 +60,7 @@ "is-stream": "^2.0.0", "iso-stream-http": "~0.1.2", "iso-url": "~0.4.6", + "it-to-stream": "^0.1.1", "iterable-ndjson": "^1.1.0", "just-kebab-case": "^1.1.0", "just-map-keys": "^1.1.0", diff --git a/src/lib/iterable.js b/src/lib/iterable.js index 4c59ff510..78de19f5e 100644 --- a/src/lib/iterable.js +++ b/src/lib/iterable.js @@ -1,6 +1,7 @@ 'use strict' const toPull = require('async-iterator-to-pull-stream') +const toStream = require('it-to-stream') exports.collectify = fn => async (...args) => { const items = [] @@ -12,3 +13,7 @@ exports.pullify = { source: fn => (...args) => toPull(fn(...args)), transform: fn => (...args) => toPull.transform(source => fn(source, ...args)) } + +exports.streamify = { + transform: fn => (...args) => toStream.transform(source => fn(source, ...args), { objectMode: true }) +} diff --git a/src/utils/load-commands.js b/src/utils/load-commands.js index 0598cecc5..a003f4c05 100644 --- a/src/utils/load-commands.js +++ b/src/utils/load-commands.js @@ -1,7 +1,7 @@ 'use strict' const nodeify = require('promise-nodeify') -const { collectify, pullify } = require('../lib/iterable') +const { collectify, pullify, streamify } = require('../lib/iterable') function requireCommands () { return { @@ -16,8 +16,10 @@ function requireCommands () { return nodeify(add(input, options), callback) } }, - // TODO: convert - addReadableStream: require('../files-regular/add-readable-stream'), + addReadableStream: (_, config) => { + const add = require('../add')(config) + return streamify.transform(add) + }, addPullStream: (_, config) => { const add = require('../add')(config) return pullify.transform(add) From 916efbd36255f9c0ee6d490763bafc545e46b4a5 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Sat, 3 Aug 2019 14:25:46 +0100 Subject: [PATCH 23/27] refactor: convert addFromFs License: MIT Signed-off-by: Alan Shaw --- package.json | 4 +- src/add-from-fs/glob-source.js | 94 ++++++++++++++++++++++++ src/add-from-fs/index.browser.js | 3 + src/add-from-fs/index.js | 9 +++ src/files-regular/add-from-fs.js | 40 ---------- src/files-regular/add-readable-stream.js | 12 --- src/utils/load-commands.js | 12 ++- 7 files changed, 119 insertions(+), 55 deletions(-) create mode 100644 src/add-from-fs/glob-source.js create mode 100644 src/add-from-fs/index.browser.js create mode 100644 src/add-from-fs/index.js delete mode 100644 src/files-regular/add-from-fs.js delete mode 100644 src/files-regular/add-readable-stream.js diff --git a/package.json b/package.json index 30a13a11c..c2669d069 100644 --- a/package.json +++ b/package.json @@ -18,7 +18,8 @@ "glob": false, "fs": false, "stream": "readable-stream", - "./src/add/form-data.js": "./src/add/form-data.browser.js" + "./src/add/form-data.js": "./src/add/form-data.browser.js", + "./src/add-from-fs/index.js": "./src/add-from-fs/index.browser.js" }, "repository": "github:ipfs/js-ipfs-http-client", "scripts": { @@ -60,6 +61,7 @@ "is-stream": "^2.0.0", "iso-stream-http": "~0.1.2", "iso-url": "~0.4.6", + "it-pushable": "^1.2.1", "it-to-stream": "^0.1.1", "iterable-ndjson": "^1.1.0", "just-kebab-case": "^1.1.0", diff --git a/src/add-from-fs/glob-source.js b/src/add-from-fs/glob-source.js new file mode 100644 index 000000000..b48a31cdc --- /dev/null +++ b/src/add-from-fs/glob-source.js @@ -0,0 +1,94 @@ +'use strict' + +const Fs = require('fs') +const Path = require('path') +const glob = require('glob') +const pushable = require('it-pushable') +const errCode = require('err-code') + +/** +* Create an AsyncIterable that can be passed to ipfs.add for the +* provided file paths. +* +* @param {String} ...paths File system path(s) to glob from +* @param {Object} [options] Optional options +* @param {Boolean} [options.recursive] Recursively glob all paths in directories +* @param {Boolean} [options.hidden] Include .dot files in matched paths +* @param {Array} [options.ignore] Glob paths to ignore +* @param {Boolean} [options.followSymlinks] follow symlinks +* @returns {AsyncIterable} +*/ +module.exports = (...args) => (async function * () { + const options = typeof args[args.length - 1] === 'string' ? {} : args.pop() + const paths = args + + const globSourceOptions = { + recursive: options.recursive, + glob: { + dot: Boolean(options.hidden), + ignore: Array.isArray(options.ignore) ? options.ignore : [], + follow: options.followSymlinks != null ? options.followSymlinks : true + } + } + + // Check the input paths comply with options.recursive and convert to glob sources + const results = await Promise.all(paths.map(pathAndType)) + const globSources = results.map(r => toGlobSource(r, globSourceOptions)) + + for (const globSource of globSources) { + for await (const { path, contentPath } of globSource) { + yield { path, content: Fs.createReadStream(contentPath) } + } + } +})() + +function toGlobSource ({ path, type }, options) { + return (async function * () { + options = options || {} + + const baseName = Path.basename(path) + + if (type === 'file') { + yield { path: baseName, contentPath: path } + return + } + + if (type === 'dir' && !options.recursive) { + throw errCode( + new Error(`'${path}' is a directory and recursive option not set`), + 'ERR_DIR_NON_RECURSIVE', + { path } + ) + } + + const globOptions = Object.assign({}, options.glob, { + cwd: path, + nodir: true, + realpath: false, + absolute: false + }) + + // TODO: want to use pull-glob but it doesn't have the features... + const pusher = pushable() + + glob('**/*', globOptions) + .on('match', m => pusher.push(m)) + .on('end', () => pusher.end()) + .on('abort', () => pusher.end()) + .on('error', err => pusher.end(err)) + + for await (const p of pusher) { + yield { + path: `${baseName}/${toPosix(p)}`, + contentPath: Path.join(path, p) + } + } + })() +} + +async function pathAndType (path) { + const stat = await Fs.promises.stat(path) + return { path, type: stat.isDirectory() ? 'dir' : 'file' } +} + +const toPosix = path => path.replace(/\\/g, '/') diff --git a/src/add-from-fs/index.browser.js b/src/add-from-fs/index.browser.js new file mode 100644 index 000000000..0a33f4c2e --- /dev/null +++ b/src/add-from-fs/index.browser.js @@ -0,0 +1,3 @@ +'use strict' + +module.exports = () => { throw new Error('unavailable in the browser') } diff --git a/src/add-from-fs/index.js b/src/add-from-fs/index.js new file mode 100644 index 000000000..225acf9c3 --- /dev/null +++ b/src/add-from-fs/index.js @@ -0,0 +1,9 @@ +'use strict' + +const configure = require('../lib/configure') +const globSource = require('./glob-source') + +module.exports = configure(({ ky }) => { + const add = require('../add')({ ky }) + return (path, options) => add(globSource(path, options), options) +}) diff --git a/src/files-regular/add-from-fs.js b/src/files-regular/add-from-fs.js deleted file mode 100644 index 2320fc537..000000000 --- a/src/files-regular/add-from-fs.js +++ /dev/null @@ -1,40 +0,0 @@ -'use strict' - -const isNode = require('detect-node') -const promisify = require('promisify-es6') -const SendOneFile = require('../utils/send-one-file-multiple-results') -const FileResultStreamConverter = require('../utils/file-result-stream-converter') - -module.exports = (send) => { - const sendOneFile = SendOneFile(send, 'add') - - return promisify((path, opts, callback) => { - if (typeof opts === 'function' && - callback === undefined) { - callback = opts - opts = {} - } - - // opts is the real callback -- - // 'callback' is being injected by promisify - if (typeof opts === 'function' && - typeof callback === 'function') { - callback = opts - opts = {} - } - - if (!isNode) { - return callback(new Error('fsAdd does not work in the browser')) - } - - if (typeof path !== 'string') { - return callback(new Error('"path" must be a string')) - } - - const requestOpts = { - qs: opts, - converter: FileResultStreamConverter - } - sendOneFile(path, requestOpts, callback) - }) -} diff --git a/src/files-regular/add-readable-stream.js b/src/files-regular/add-readable-stream.js deleted file mode 100644 index 320abe692..000000000 --- a/src/files-regular/add-readable-stream.js +++ /dev/null @@ -1,12 +0,0 @@ -'use strict' - -const SendFilesStream = require('../utils/send-files-stream') -const FileResultStreamConverter = require('../utils/file-result-stream-converter') - -module.exports = (send) => { - return (options) => { - options = options || {} - options.converter = FileResultStreamConverter - return SendFilesStream(send, 'add')(options) - } -} diff --git a/src/utils/load-commands.js b/src/utils/load-commands.js index a003f4c05..3479c8d7c 100644 --- a/src/utils/load-commands.js +++ b/src/utils/load-commands.js @@ -24,8 +24,16 @@ function requireCommands () { const add = require('../add')(config) return pullify.transform(add) }, - // TODO: convert - addFromFs: require('../files-regular/add-from-fs'), + addFromFs: (_, config) => { + const addFromFs = collectify(require('../add-from-fs')(config)) + return (path, options, callback) => { + if (typeof options === 'function') { + callback = options + options = {} + } + return nodeify(addFromFs(path, options), callback) + } + }, addFromURL: (_, config) => { const addFromURL = collectify(require('../add-from-url')(config)) return (url, options, callback) => { From c21333640a25d4e9401c86c7d39116d11b4360e3 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Sat, 3 Aug 2019 14:44:35 +0100 Subject: [PATCH 24/27] refactor: remove unused code License: MIT Signed-off-by: Alan Shaw --- src/add/form-data.js | 4 +- src/files-mfs/write.js | 4 +- src/lib/iterable-to-readable-stream.js | 101 ---------------------- src/utils/file-result-stream-converter.js | 44 ---------- 4 files changed, 3 insertions(+), 150 deletions(-) delete mode 100644 src/lib/iterable-to-readable-stream.js delete mode 100644 src/utils/file-result-stream-converter.js diff --git a/src/add/form-data.js b/src/add/form-data.js index 1ffa66f1d..1d61d9a83 100644 --- a/src/add/form-data.js +++ b/src/add/form-data.js @@ -2,8 +2,8 @@ const FormData = require('form-data') const { Buffer } = require('buffer') +const toStream = require('it-to-stream') const normaliseInput = require('./normalise-input') -const toStream = require('../lib/iterable-to-readable-stream') exports.toFormData = async (input) => { const files = normaliseInput(input) @@ -19,7 +19,7 @@ exports.toFormData = async (input) => { // a Content-Length header that is only the sum of the size of the // header/footer when knownLength option (below) is null. Object.assign( - toStream(file.content), + toStream.readable(file.content), { path: file.path || `file-${i}` } ), { diff --git a/src/files-mfs/write.js b/src/files-mfs/write.js index 0485406bd..33f1ec973 100644 --- a/src/files-mfs/write.js +++ b/src/files-mfs/write.js @@ -3,7 +3,6 @@ const promisify = require('promisify-es6') const concatStream = require('concat-stream') const once = require('once') -const FileResultStreamConverter = require('../utils/file-result-stream-converter') const SendFilesStream = require('../utils/send-files-stream') module.exports = (send) => { @@ -29,8 +28,7 @@ module.exports = (send) => { const options = { args: pathDst, - qs: opts, - converter: FileResultStreamConverter + qs: opts } const stream = sendFilesStream({ qs: options }) diff --git a/src/lib/iterable-to-readable-stream.js b/src/lib/iterable-to-readable-stream.js deleted file mode 100644 index 06b3783dd..000000000 --- a/src/lib/iterable-to-readable-stream.js +++ /dev/null @@ -1,101 +0,0 @@ -'use strict' - -const { Readable, Writable } = require('stream') - -function toReadable (source) { - let reading = false - return new Readable({ - async read (size) { - if (reading) return - reading = true - - try { - while (true) { - const { value, done } = await source.next(size) - if (done) return this.push(null) - if (!this.push(value)) break - } - } catch (err) { - this.emit('error', err) - if (source.return) source.return() - } finally { - reading = false - } - } - }) -} - -module.exports = toReadable -module.exports.readable = toReadable - -function toWritable (sink) { - const END_CHUNK = {} - - class Buf { - constructor () { - this._buffer = [] - this._waitingConsumers = [] - this._consuming = false - } - - push (chunk) { - let resolver - const pushPromise = new Promise((resolve, reject) => { - resolver = { resolve, reject } - }) - this._buffer.push({ chunk, resolver }) - this._consume() - return pushPromise - } - - _consume () { - if (this._consuming) return - this._consuming = true - - while (this._waitingConsumers.length && this._buffer.length) { - const nextConsumer = this._waitingConsumers.shift() - const nextChunk = this._buffer.shift() - nextConsumer.resolver.resolve(nextChunk) - nextChunk.resolver.resolve() - } - - this._consuming = false - } - - consume () { - let resolver - const consumePromise = new Promise((resolve, reject) => { - resolver = { resolve, reject } - }) - this._waitingConsumers.push({ resolver }) - this._consume() - return consumePromise - } - } - - const buf = new Buf() - - const it = { - async next () { - const chunk = await buf.consume() - return chunk === END_CHUNK ? { done: true } : { value: chunk } - } - } - - sink({ - [Symbol.asyncIterator] () { - return it - } - }) - - return new Writable({ - write (chunk, enc, cb) { - buf.push(chunk).then(() => cb(), cb) - }, - final (cb) { - buf.push(END_CHUNK).then(() => cb(), cb) - } - }) -} - -module.exports.toWritable = toWritable diff --git a/src/utils/file-result-stream-converter.js b/src/utils/file-result-stream-converter.js deleted file mode 100644 index 7f5b19aeb..000000000 --- a/src/utils/file-result-stream-converter.js +++ /dev/null @@ -1,44 +0,0 @@ -'use strict' - -const TransformStream = require('readable-stream').Transform - -/* - Transforms a stream of {Name, Hash} objects to include size - of the DAG object. - - Usage: inputStream.pipe(new FileResultStreamConverter()) - - Input object format: - { - Name: '/path/to/file/foo.txt', - Hash: 'Qma4hjFTnCasJ8PVp3mZbZK5g2vGDT4LByLJ7m8ciyRFZP' - Size: '20' - } - - Output object format: - { - path: '/path/to/file/foo.txt', - hash: 'Qma4hjFTnCasJ8PVp3mZbZK5g2vGDT4LByLJ7m8ciyRFZP', - size: 20 - } -*/ -class FileResultStreamConverter extends TransformStream { - constructor (options) { - const opts = Object.assign({}, options || {}, { objectMode: true }) - super(opts) - } - - _transform (obj, enc, callback) { - if (!obj.Hash) { - return callback() - } - - callback(null, { - path: obj.Name, - hash: obj.Hash, - size: parseInt(obj.Size, 10) - }) - } -} - -module.exports = FileResultStreamConverter From 9b302b8dd520bd440474d983ee487c4e51c6d42f Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Sun, 4 Aug 2019 13:35:14 +0100 Subject: [PATCH 25/27] fix: tests License: MIT Signed-off-by: Alan Shaw --- src/add-from-fs/index.browser.js | 2 +- src/files-regular/index.js | 6 ------ test/files-mfs.spec.js | 6 +++--- 3 files changed, 4 insertions(+), 10 deletions(-) diff --git a/src/add-from-fs/index.browser.js b/src/add-from-fs/index.browser.js index 0a33f4c2e..81d551294 100644 --- a/src/add-from-fs/index.browser.js +++ b/src/add-from-fs/index.browser.js @@ -1,3 +1,3 @@ 'use strict' -module.exports = () => { throw new Error('unavailable in the browser') } +module.exports = () => () => { throw new Error('unavailable in the browser') } diff --git a/src/files-regular/index.js b/src/files-regular/index.js index 059d7ea1c..408f76494 100644 --- a/src/files-regular/index.js +++ b/src/files-regular/index.js @@ -6,12 +6,6 @@ module.exports = (arg) => { const send = moduleConfig(arg) return { - add: require('../files-regular/add')(send), - addReadableStream: require('../files-regular/add-readable-stream')(send), - addPullStream: require('../files-regular/add-pull-stream')(send), - addFromFs: require('../files-regular/add-from-fs')(send), - addFromURL: require('../files-regular/add-from-url')(send), - addFromStream: require('../files-regular/add')(send), cat: require('../files-regular/cat')(send), catReadableStream: require('../files-regular/cat-readable-stream')(send), catPullStream: require('../files-regular/cat-pull-stream')(send), diff --git a/test/files-mfs.spec.js b/test/files-mfs.spec.js index f9b46cdd7..847e1b975 100644 --- a/test/files-mfs.spec.js +++ b/test/files-mfs.spec.js @@ -96,7 +96,7 @@ describe('.files (the MFS API part)', function () { it('.add with cid-version=1 and raw-leaves=false', (done) => { const expectedCid = 'bafybeifogzovjqrcxvgt7g36y7g63hvwvoakledwk4b2fr2dl4wzawpnny' - const options = { 'cid-version': 1, 'raw-leaves': false } + const options = { cidVersion: 1, rawLeaves: false } ipfs.add(testfile, options, (err, res) => { expect(err).to.not.exist() @@ -174,7 +174,7 @@ describe('.files (the MFS API part)', function () { path: content + '.txt', content: Buffer.from(content) } - const options = { hash: name, 'raw-leaves': false } + const options = { hashAlg: name, rawLeaves: false } ipfs.add([file], options, (err, res) => { if (err) return done(err) @@ -264,7 +264,7 @@ describe('.files (the MFS API part)', function () { path: content + '.txt', content: Buffer.from(content) } - const options = { hash: name, 'raw-leaves': false } + const options = { hashAlg: name, rawLeaves: false } ipfs.add([file], options, (err, res) => { expect(err).to.not.exist() From 8481fb6096e5380592928f535c06caaf8bb46d7b Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Sun, 4 Aug 2019 13:44:10 +0100 Subject: [PATCH 26/27] fix: tests License: MIT Signed-off-by: Alan Shaw --- src/add/form-data.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/add/form-data.js b/src/add/form-data.js index 1d61d9a83..be268320f 100644 --- a/src/add/form-data.js +++ b/src/add/form-data.js @@ -23,14 +23,14 @@ exports.toFormData = async (input) => { { path: file.path || `file-${i}` } ), { - filepath: file.path, + filepath: encodeURIComponent(file.path), contentType: 'application/octet-stream', knownLength: file.content.length // Send Content-Length header if known } ) } else { formData.append(`dir-${i}`, Buffer.alloc(0), { - filepath: file.path, + filepath: encodeURIComponent(file.path), contentType: 'application/x-directory' }) } From cb5e2b01f29a9c394eac3c2fc666736720122dec Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Sun, 4 Aug 2019 14:11:08 +0100 Subject: [PATCH 27/27] fix: normalize input for iterable of buffer or blob License: MIT Signed-off-by: Alan Shaw --- src/add/normalise-input.js | 29 +++++++++++++++----------- src/lib/file-data-to-async-iterable.js | 3 ++- src/lib/is-bloby.js | 7 +++++++ src/lib/is-bytes.js | 8 +++++++ 4 files changed, 34 insertions(+), 13 deletions(-) create mode 100644 src/lib/is-bloby.js create mode 100644 src/lib/is-bytes.js diff --git a/src/add/normalise-input.js b/src/add/normalise-input.js index 561324838..e38ba3a31 100644 --- a/src/add/normalise-input.js +++ b/src/add/normalise-input.js @@ -1,9 +1,9 @@ 'use strict' -/* eslint-env browser */ -const { Buffer } = require('buffer') const errCode = require('err-code') const toAsyncIterable = require('../lib/file-data-to-async-iterable') +const isBytes = require('../lib/is-bytes') +const isBloby = require('../lib/is-bloby') /* Transform one of: @@ -16,6 +16,8 @@ Blob|File { path, content: AsyncIterable } { path, content: PullStream } Iterable +Iterable +Iterable Iterable<{ path, content: Buffer }> Iterable<{ path, content: Blob }> Iterable<{ path, content: Iterable }> @@ -36,20 +38,16 @@ AsyncIterable<{ path, content: AsyncIterable }> module.exports = function normalizeInput (input) { // Buffer|ArrayBuffer|TypedArray - if (Buffer.isBuffer(input) || ArrayBuffer.isView(input) || input instanceof ArrayBuffer) { - return (async function * () { // eslint-disable-line require-await - yield normalizeTuple({ path: '', content: input }) - })() - } - // Blob|File - if (typeof Blob !== 'undefined' && input instanceof Blob) { + if (isBytes(input) || isBloby(input)) { return (async function * () { // eslint-disable-line require-await yield normalizeTuple({ path: '', content: input }) })() } // Iterable + // Iterable + // Iterable // Iterable<{ path, content: Buffer }> // Iterable<{ path, content: Blob }> // Iterable<{ path, content: Iterable }> @@ -58,7 +56,9 @@ module.exports = function normalizeInput (input) { if (input[Symbol.iterator]) { return (async function * () { // eslint-disable-line require-await for (const chunk of input) { - if (typeof chunk === 'object' && (chunk.path || chunk.content)) { + if (isBytes(chunk) || isBloby(chunk)) { + yield normalizeTuple({ path: '', content: chunk }) + } else if (isFileObject(chunk)) { yield normalizeTuple(chunk) } else if (Number.isInteger(chunk)) { // Must be an Iterable i.e. Buffer/ArrayBuffer/Array of bytes yield normalizeTuple({ path: '', content: input }) @@ -79,7 +79,7 @@ module.exports = function normalizeInput (input) { if (input[Symbol.asyncIterator]) { return (async function * () { for await (const chunk of input) { - if (typeof chunk === 'object' && (chunk.path || chunk.content)) { + if (isFileObject(chunk)) { yield normalizeTuple(chunk) } else { // Must be an AsyncIterable i.e. a Stream let path = '' @@ -110,7 +110,7 @@ module.exports = function normalizeInput (input) { // { path, content: Iterable } // { path, content: AsyncIterable } // { path, content: PullStream } - if (typeof input === 'object' && (input.path || input.content)) { + if (isFileObject(input)) { // eslint-disable-next-line require-await return (async function * () { yield normalizeTuple(input) })() } @@ -128,3 +128,8 @@ module.exports = function normalizeInput (input) { function normalizeTuple ({ path, content }) { return { path: path || '', content: content ? toAsyncIterable(content) : null } } + +// An object with a path or content property +function isFileObject (obj) { + return typeof obj === 'object' && (obj.path || obj.content) +} diff --git a/src/lib/file-data-to-async-iterable.js b/src/lib/file-data-to-async-iterable.js index ffa0f8d11..32614324e 100644 --- a/src/lib/file-data-to-async-iterable.js +++ b/src/lib/file-data-to-async-iterable.js @@ -4,6 +4,7 @@ const toIterator = require('pull-stream-to-async-iterator') const { Buffer } = require('buffer') const blobToAsyncIterable = require('../lib/blob-to-async-iterable') +const isBloby = require('../lib/is-bloby') /* Transform one of: @@ -29,7 +30,7 @@ module.exports = function toAsyncIterable (input) { } // Blob|File - if (typeof Blob !== 'undefined' && input instanceof Blob) { + if (isBloby(input)) { return Object.assign( blobToAsyncIterable(input), { length: input.size } diff --git a/src/lib/is-bloby.js b/src/lib/is-bloby.js new file mode 100644 index 000000000..a8682f315 --- /dev/null +++ b/src/lib/is-bloby.js @@ -0,0 +1,7 @@ +'use strict' +/* eslint-env browser */ + +// Blob|File +module.exports = function isBloby (obj) { + return typeof Blob !== 'undefined' && obj instanceof Blob +} diff --git a/src/lib/is-bytes.js b/src/lib/is-bytes.js new file mode 100644 index 000000000..adc996835 --- /dev/null +++ b/src/lib/is-bytes.js @@ -0,0 +1,8 @@ +'use strict' + +const { Buffer } = require('buffer') + +// Buffer|ArrayBuffer|TypedArray +module.exports = function isBytes (obj) { + return Buffer.isBuffer(obj) || ArrayBuffer.isView(obj) || obj instanceof ArrayBuffer +}