-
Notifications
You must be signed in to change notification settings - Fork 300
[WIP] feat: add support for chunked uploads #851
base: master
Are you sure you want to change the base?
Changes from 1 commit
775213a
01a575e
695b23e
f9df326
52a6117
0da5783
d58e8cb
300af44
3cd19e7
4191525
0a4f008
90c4036
d596295
7489447
a3bf0a6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,161 @@ | ||
'use strict' | ||
|
||
const { Readable, Transform } = require('stream') | ||
const toStream = require('buffer-to-stream') | ||
const pump = require('pump') | ||
const Multipart = require('./multipart2') | ||
const {prepareWithHeaders} = require('./../utils/prepare-file') | ||
|
||
const arrayToStream = (data) => { | ||
let i = 0 | ||
return new Readable({ | ||
objectMode: true, | ||
read () { | ||
this.push(i < data.length ? data[i++] : null) | ||
} | ||
}) | ||
} | ||
|
||
const prepareTransform = (options) => new Transform({ | ||
objectMode: true, | ||
transform (chunk, encoding, callback) { | ||
callback(null, prepareWithHeaders(chunk, options)) | ||
} | ||
}) | ||
|
||
module.exports = (send) => (files, options) => { | ||
const multipart = new Multipart() | ||
|
||
// add pump | ||
arrayToStream([].concat(files)) | ||
.pipe(prepareTransform(options)) | ||
.pipe(multipart) | ||
|
||
return sendChunked(multipart, send, options) | ||
} | ||
|
||
const sendChunked = (multipartStream, send, options) => { | ||
return new Promise((resolve, reject) => { | ||
const boundary = multipartStream._boundary | ||
let index = 0 | ||
let rangeStart = 0 | ||
let rangeEnd = 0 | ||
let size = 0 | ||
let ended = false | ||
let running = false | ||
const name = createName() | ||
|
||
multipartStream.on('end', () => { | ||
ended = true | ||
console.log('end', size) | ||
|
||
// if multipart already ended and no request is pending send last request | ||
if (!running) { | ||
// sendChunk('', -1, rangeEnd, rangeEnd, name, boundary, size) | ||
sendChunkRequest(send, options, '', -1, rangeEnd, rangeEnd, name, boundary, size) | ||
.then(rsp => { | ||
resolve(rsp) | ||
}) | ||
} | ||
}) | ||
|
||
multipartStream.on('data', (chunk) => { | ||
console.log('Sending ', chunk.length) | ||
multipartStream.pause() | ||
index++ | ||
rangeEnd = rangeStart + chunk.length | ||
size += chunk.length | ||
running = true | ||
|
||
// sendChunk(chunk, index, rangeStart, rangeEnd, name, boundary) | ||
sendChunkRequest(send, options, chunk, index, rangeStart, rangeEnd, name, boundary) | ||
.then(rsp => { | ||
console.log('Response', rsp) | ||
rangeStart = rangeEnd | ||
multipartStream.resume() | ||
// if multipart already ended send last request | ||
if (ended) { | ||
console.log('sending last') | ||
// sendChunk('', -1, rangeEnd, rangeEnd, name, boundary, size) | ||
sendChunkRequest(send, options, '', -1, rangeEnd, rangeEnd, name, boundary, size) | ||
.then(rsp => { | ||
resolve(rsp) | ||
}) | ||
} | ||
running = false | ||
}) | ||
.catch(reject) | ||
}) | ||
}) | ||
} | ||
|
||
const sendChunk = (chunk, id, start, end, name, boundary, size = '*') => { | ||
const url = new URL('http://localhost') | ||
const search = new URLSearchParams() | ||
search.set('stream-channels', true) | ||
url.port = 5002 | ||
url.pathname = 'api/v0/add-chunked' | ||
url.search = search | ||
|
||
return window.fetch(url.href, { | ||
method: 'POST', | ||
body: chunk, | ||
headers: { | ||
'Content-Type': 'application/octet-stream', | ||
'Content-Range': `bytes ${start}-${end}/${size}`, | ||
'Ipfs-Chunk-Name': name, | ||
'Ipfs-Chunk-Id': id, | ||
'Ipfs-Chunk-Boundary': boundary | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we use |
||
} | ||
}) | ||
.then(res => res.json()) | ||
} | ||
|
||
function createName () { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I may be missing something here, but is there a reason why we can't use UUID v5 here? JavaScript's random numbers are usually weak, so v5 sounds like a safer option:
Fast generation of RFC4122 UUIDs: |
||
const date = new Date(Date.now()).toISOString() | ||
function chr4 () { | ||
return Math.random().toString(16).slice(-4) | ||
} | ||
return date + '--' + chr4() + chr4() + | ||
'-' + chr4() + | ||
'-' + chr4() + | ||
'-' + chr4() + | ||
'-' + chr4() + chr4() + chr4() | ||
} | ||
|
||
const sendChunkRequest = (send, options, chunk, id, start, end, name, boundary, size = '*') => { | ||
return new Promise((resolve, reject) => { | ||
const qs = { | ||
'cid-version': options['cid-version'], | ||
'raw-leaves': options['raw-leaves'], | ||
'only-hash': options.onlyHash, | ||
'wrap-with-directory': options.wrapWithDirectory, | ||
hash: options.hashAlg || options.hash | ||
} | ||
const args = { | ||
path: 'add-chunked', | ||
qs: qs, | ||
args: options.args, | ||
stream: true, | ||
// recursive: true, | ||
// progress: options.progress, | ||
headers: { | ||
'Content-Type': 'application/octet-stream', | ||
'Content-Range': `bytes ${start}-${end}/${size}`, | ||
'Ipfs-Chunk-Name': name, | ||
'Ipfs-Chunk-Id': id, | ||
'Ipfs-Chunk-Boundary': boundary | ||
} | ||
} | ||
|
||
const req = send(args, (err, res) => { | ||
if (err) { | ||
return reject(err) | ||
} | ||
|
||
resolve(res) | ||
}) | ||
|
||
pump(toStream(chunk), req) | ||
}) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,135 @@ | ||
'use strict' | ||
|
||
const { Duplex } = require('stream') | ||
const { isSource } = require('is-pull-stream') | ||
const toStream = require('pull-stream-to-stream') | ||
|
||
const PADDING = '--' | ||
const NEW_LINE = '\r\n' | ||
const NEW_LINE_BUFFER = Buffer.from(NEW_LINE) | ||
|
||
const generateBoundary = () => { | ||
var boundary = '--------------------------' | ||
for (var i = 0; i < 24; i++) { | ||
boundary += Math.floor(Math.random() * 10).toString(16) | ||
} | ||
|
||
return boundary | ||
} | ||
|
||
const leading = (headers = {}, boundary) => { | ||
var leading = [PADDING + boundary] | ||
|
||
Object.keys(headers).forEach((header) => { | ||
leading.push(header + ': ' + headers[header]) | ||
}) | ||
|
||
leading.push('') | ||
leading.push('') | ||
|
||
const leadingStr = leading.join(NEW_LINE) | ||
|
||
return Buffer.from(leadingStr) | ||
} | ||
|
||
class Multipart extends Duplex { | ||
constructor (options) { | ||
super(Object.assign({}, options, { writableObjectMode: true, writableHighWaterMark: 1 })) | ||
|
||
this._boundary = generateBoundary() | ||
this.source = null | ||
this.chunkSize = 256000 | ||
this.buffer = Buffer.alloc(this.chunkSize) | ||
this.bufferOffset = 0 | ||
this.running = true | ||
} | ||
|
||
_read () { | ||
if (this.source) { | ||
this.source.resume() | ||
} | ||
} | ||
|
||
_write (file, encoding, callback) { | ||
this.pushFile(file, () => { | ||
this.pushChunk(Buffer.from(PADDING + this._boundary + PADDING + NEW_LINE)) | ||
callback() | ||
}) | ||
} | ||
|
||
_final (callback) { | ||
// Flush the rest and finish | ||
if (this.bufferOffset) { | ||
this.push(this.buffer.slice(0, this.bufferOffset)) | ||
this.bufferOffset = 0 | ||
} | ||
this.running = false | ||
this.push(null) | ||
callback() | ||
} | ||
|
||
pushChunk (chunk) { | ||
const bytesNeeded = (this.chunkSize - this.bufferOffset) | ||
let result = true | ||
if (chunk === null) { | ||
return this.push(null) | ||
} | ||
|
||
// If we have enough bytes in this chunk to get buffer up to chunkSize, | ||
// fill in buffer, push it, and reset its offset. | ||
// Otherwise, just copy the entire chunk in to buffer. | ||
if (chunk.length >= bytesNeeded) { | ||
chunk.copy(this.buffer, this.bufferOffset, 0, bytesNeeded) | ||
result = this.push(this.buffer) | ||
this.bufferOffset = 0 | ||
// Handle leftovers from the chunk | ||
const leftovers = chunk.slice(0, chunk.length - bytesNeeded) | ||
let size = leftovers.length | ||
while (size >= this.chunkSize) { | ||
result = this.push(chunk.slice(this.bufferOffset, this.bufferOffset + this.chunkSize)) | ||
this.bufferOffset += this.chunkSize | ||
size -= this.chunkSize | ||
} | ||
// if we still have anything left copy to the buffer | ||
chunk.copy(this.buffer, 0, this.bufferOffset, this.bufferOffset + size) | ||
this.bufferOffset = size | ||
} else { | ||
chunk.copy(this.buffer, this.bufferOffset) | ||
this.bufferOffset += chunk.length | ||
} | ||
|
||
return result | ||
} | ||
|
||
pushFile (file, callback) { | ||
this.pushChunk(leading(file.headers, this._boundary)) | ||
|
||
let content = file.content || Buffer.alloc(0) | ||
|
||
if (Buffer.isBuffer(content)) { | ||
this.pushChunk(content) | ||
this.pushChunk(NEW_LINE_BUFFER) | ||
return callback() // early | ||
} | ||
|
||
if (isSource(content)) { | ||
content = toStream.source(content) | ||
} | ||
this.source = content | ||
// From now on we assume content is a stream | ||
|
||
content.on('data', (data) => { | ||
if (!this.pushChunk(data)) { | ||
content.pause() | ||
} | ||
}) | ||
content.once('error', this.emit.bind(this, 'error')) | ||
|
||
content.once('end', () => { | ||
this.pushChunk(NEW_LINE_BUFFER) | ||
callback() | ||
}) | ||
} | ||
} | ||
|
||
module.exports = Multipart |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Id
probably should be renamed toIndex
(its index everywhere else)