Skip to content
This repository has been archived by the owner on Mar 10, 2020. It is now read-only.

[WIP] feat: add support for chunked uploads #851

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
"async": "^2.6.1",
"big.js": "^5.1.2",
"bs58": "^4.0.1",
"buffer-to-stream": "^1.0.0",
"cids": "~0.5.3",
"concat-stream": "^1.6.2",
"debug": "^3.1.0",
Expand Down Expand Up @@ -58,6 +59,7 @@
"pump": "^3.0.0",
"qs": "^6.5.2",
"readable-stream": "^2.3.6",
"readable-stream-node-to-web": "^1.0.1",
"stream-http": "^2.8.3",
"stream-to-pull-stream": "^1.7.2",
"streamifier": "~0.1.1",
Expand Down
161 changes: 161 additions & 0 deletions src/add2/add2.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
'use strict'

const { Readable, Transform } = require('stream')
const toStream = require('buffer-to-stream')
const pump = require('pump')
const Multipart = require('./multipart2')
const {prepareWithHeaders} = require('./../utils/prepare-file')

const arrayToStream = (data) => {
let i = 0
return new Readable({
objectMode: true,
read () {
this.push(i < data.length ? data[i++] : null)
}
})
}

const prepareTransform = (options) => new Transform({
objectMode: true,
transform (chunk, encoding, callback) {
callback(null, prepareWithHeaders(chunk, options))
}
})

module.exports = (send) => (files, options) => {
const multipart = new Multipart()

// add pump
arrayToStream([].concat(files))
.pipe(prepareTransform(options))
.pipe(multipart)

return sendChunked(multipart, send, options)
}

const sendChunked = (multipartStream, send, options) => {
return new Promise((resolve, reject) => {
const boundary = multipartStream._boundary
let index = 0
let rangeStart = 0
let rangeEnd = 0
let size = 0
let ended = false
let running = false
const name = createName()

multipartStream.on('end', () => {
ended = true
console.log('end', size)

// if multipart already ended and no request is pending send last request
if (!running) {
// sendChunk('', -1, rangeEnd, rangeEnd, name, boundary, size)
sendChunkRequest(send, options, '', -1, rangeEnd, rangeEnd, name, boundary, size)
.then(rsp => {
resolve(rsp)
})
}
})

multipartStream.on('data', (chunk) => {
console.log('Sending ', chunk.length)
multipartStream.pause()
index++
rangeEnd = rangeStart + chunk.length
size += chunk.length
running = true

// sendChunk(chunk, index, rangeStart, rangeEnd, name, boundary)
sendChunkRequest(send, options, chunk, index, rangeStart, rangeEnd, name, boundary)
.then(rsp => {
console.log('Response', rsp)
rangeStart = rangeEnd
multipartStream.resume()
// if multipart already ended send last request
if (ended) {
console.log('sending last')
// sendChunk('', -1, rangeEnd, rangeEnd, name, boundary, size)
sendChunkRequest(send, options, '', -1, rangeEnd, rangeEnd, name, boundary, size)
.then(rsp => {
resolve(rsp)
})
}
running = false
})
.catch(reject)
})
})
}

const sendChunk = (chunk, id, start, end, name, boundary, size = '*') => {
const url = new URL('http://localhost')
const search = new URLSearchParams()
search.set('stream-channels', true)
url.port = 5002
url.pathname = 'api/v0/add-chunked'
url.search = search

return window.fetch(url.href, {
method: 'POST',
body: chunk,
headers: {
'Content-Type': 'application/octet-stream',
'Content-Range': `bytes ${start}-${end}/${size}`,
'Ipfs-Chunk-Name': name,
'Ipfs-Chunk-Id': id,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Id probably should be renamed to Index (its index everywhere else)

'Ipfs-Chunk-Boundary': boundary
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use X- prefix for all custom headers?
We already have X-Ipfs-Path and I wonder if we should follow that convention.

}
})
.then(res => res.json())
}

function createName () {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ipfs-Chunk-Name

I may be missing something here, but is there a reason why we can't use UUID v5 here? JavaScript's random numbers are usually weak, so v5 sounds like a safer option:

RFC 4122 advises that "distributed applications generating UUIDs at a variety of hosts must be willing to rely on the random number source at all hosts. If this is not feasible, the namespace variant should be used."

Fast generation of RFC4122 UUIDs: require('uuid/v5').
If use of UUID here is fine, we may consider renaming the field to -Uuid or even -Chunk-Group-Uuid to remove ambiguity.

const date = new Date(Date.now()).toISOString()
function chr4 () {
return Math.random().toString(16).slice(-4)
}
return date + '--' + chr4() + chr4() +
'-' + chr4() +
'-' + chr4() +
'-' + chr4() +
'-' + chr4() + chr4() + chr4()
}

const sendChunkRequest = (send, options, chunk, id, start, end, name, boundary, size = '*') => {
return new Promise((resolve, reject) => {
const qs = {
'cid-version': options['cid-version'],
'raw-leaves': options['raw-leaves'],
'only-hash': options.onlyHash,
'wrap-with-directory': options.wrapWithDirectory,
hash: options.hashAlg || options.hash
}
const args = {
path: 'add-chunked',
qs: qs,
args: options.args,
stream: true,
// recursive: true,
// progress: options.progress,
headers: {
'Content-Type': 'application/octet-stream',
'Content-Range': `bytes ${start}-${end}/${size}`,
'Ipfs-Chunk-Name': name,
'Ipfs-Chunk-Id': id,
'Ipfs-Chunk-Boundary': boundary
}
}

const req = send(args, (err, res) => {
if (err) {
return reject(err)
}

resolve(res)
})

pump(toStream(chunk), req)
})
}
135 changes: 135 additions & 0 deletions src/add2/multipart2.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
'use strict'

const { Duplex } = require('stream')
const { isSource } = require('is-pull-stream')
const toStream = require('pull-stream-to-stream')

const PADDING = '--'
const NEW_LINE = '\r\n'
const NEW_LINE_BUFFER = Buffer.from(NEW_LINE)

const generateBoundary = () => {
var boundary = '--------------------------'
for (var i = 0; i < 24; i++) {
boundary += Math.floor(Math.random() * 10).toString(16)
}

return boundary
}

const leading = (headers = {}, boundary) => {
var leading = [PADDING + boundary]

Object.keys(headers).forEach((header) => {
leading.push(header + ': ' + headers[header])
})

leading.push('')
leading.push('')

const leadingStr = leading.join(NEW_LINE)

return Buffer.from(leadingStr)
}

class Multipart extends Duplex {
constructor (options) {
super(Object.assign({}, options, { writableObjectMode: true, writableHighWaterMark: 1 }))

this._boundary = generateBoundary()
this.source = null
this.chunkSize = 256000
this.buffer = Buffer.alloc(this.chunkSize)
this.bufferOffset = 0
this.running = true
}

_read () {
if (this.source) {
this.source.resume()
}
}

_write (file, encoding, callback) {
this.pushFile(file, () => {
this.pushChunk(Buffer.from(PADDING + this._boundary + PADDING + NEW_LINE))
callback()
})
}

_final (callback) {
// Flush the rest and finish
if (this.bufferOffset) {
this.push(this.buffer.slice(0, this.bufferOffset))
this.bufferOffset = 0
}
this.running = false
this.push(null)
callback()
}

pushChunk (chunk) {
const bytesNeeded = (this.chunkSize - this.bufferOffset)
let result = true
if (chunk === null) {
return this.push(null)
}

// If we have enough bytes in this chunk to get buffer up to chunkSize,
// fill in buffer, push it, and reset its offset.
// Otherwise, just copy the entire chunk in to buffer.
if (chunk.length >= bytesNeeded) {
chunk.copy(this.buffer, this.bufferOffset, 0, bytesNeeded)
result = this.push(this.buffer)
this.bufferOffset = 0
// Handle leftovers from the chunk
const leftovers = chunk.slice(0, chunk.length - bytesNeeded)
let size = leftovers.length
while (size >= this.chunkSize) {
result = this.push(chunk.slice(this.bufferOffset, this.bufferOffset + this.chunkSize))
this.bufferOffset += this.chunkSize
size -= this.chunkSize
}
// if we still have anything left copy to the buffer
chunk.copy(this.buffer, 0, this.bufferOffset, this.bufferOffset + size)
this.bufferOffset = size
} else {
chunk.copy(this.buffer, this.bufferOffset)
this.bufferOffset += chunk.length
}

return result
}

pushFile (file, callback) {
this.pushChunk(leading(file.headers, this._boundary))

let content = file.content || Buffer.alloc(0)

if (Buffer.isBuffer(content)) {
this.pushChunk(content)
this.pushChunk(NEW_LINE_BUFFER)
return callback() // early
}

if (isSource(content)) {
content = toStream.source(content)
}
this.source = content
// From now on we assume content is a stream

content.on('data', (data) => {
if (!this.pushChunk(data)) {
content.pause()
}
})
content.once('error', this.emit.bind(this, 'error'))

content.once('end', () => {
this.pushChunk(NEW_LINE_BUFFER)
callback()
})
}
}

module.exports = Multipart
1 change: 1 addition & 0 deletions src/utils/load-commands.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ function requireCommands () {
addFromStream: require('../files/add')(send),
addFromURL: require('../util/url-add')(send),
getEndpointConfig: require('../util/get-endpoint-config')(config),
add2: require('./../add2/add2')(send),
crypto: require('libp2p-crypto'),
isIPFS: require('is-ipfs')
}
Expand Down
Loading