Skip to content
This repository has been archived by the owner on Mar 10, 2020. It is now read-only.

[WIP] feat: add support for chunked uploads #851

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
"concat-stream": "^1.6.2",
"debug": "^3.1.0",
"detect-node": "^2.0.3",
"filereader-stream": "^2.0.0",
"flatmap": "0.0.3",
"glob": "^7.1.2",
"ipfs-block": "~0.7.1",
Expand All @@ -58,7 +59,8 @@
"pump": "^3.0.0",
"qs": "^6.5.2",
"readable-stream": "^2.3.6",
"stream-http": "^2.8.3",
"readable-stream-node-to-web": "^1.0.1",
"stream-http": "hugomrdias/stream-http#fix/body-handling",
"stream-to-pull-stream": "^1.7.2",
"streamifier": "~0.1.1",
"tar-stream": "^1.6.1"
Expand Down
49 changes: 49 additions & 0 deletions src/add2/add2.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
'use strict'

const { Readable } = require('stream')
const toPull = require('stream-to-pull-stream')
const concatStream = require('concat-stream')
const pump = require('pump')
const SendStream = require('./send-stream')

const arrayToStream = (data) => {
let i = 0
return new Readable({
objectMode: true,
read () {
this.push(i < data.length ? data[i++] : null)
}
})
}

const add = (send) => (files, options) => {
// check if we can receive pull-stream after callbackify
let result = []
return new Promise((resolve, reject) => {
pump(
arrayToStream([].concat(files)),
new SendStream(send, options),
concatStream(r => (result = r)),
(err) => {
if (err) {
return reject(err)
}
resolve(result)
}
)
})
}

const addReadableStream = (send) => (options) => {
return new SendStream(send, options)
}

const addPullStream = (send) => (options) => {
return toPull(new SendStream(send, options))
}

module.exports = {
add,
addReadableStream,
addPullStream
}
166 changes: 166 additions & 0 deletions src/add2/multipart2.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
'use strict'

const { Duplex } = require('stream')
const { isSource } = require('is-pull-stream')
const toStream = require('pull-stream-to-stream')

const PADDING = '--'
const NEW_LINE = '\r\n'
const NEW_LINE_BUFFER = Buffer.from(NEW_LINE)

const generateBoundary = () => {
var boundary = '--------------------------'
for (var i = 0; i < 24; i++) {
boundary += Math.floor(Math.random() * 10).toString(16)
}

return boundary
}

const leading = (headers = {}, boundary) => {
var leading = [PADDING + boundary]

Object.keys(headers).forEach((header) => {
leading.push(header + ': ' + headers[header])
})

leading.push('')
leading.push('')

const leadingStr = leading.join(NEW_LINE)

return Buffer.from(leadingStr)
}

class Multipart extends Duplex {
constructor ({ chunkSize }) {
super({
writableObjectMode: true,
writableHighWaterMark: 1
})

this._boundary = generateBoundary()
this.source = null
this.chunkSize = chunkSize || 0
this.buffer = Buffer.alloc(this.chunkSize)
this.bufferOffset = 0
this.extraBytes = 0
}

_read () {
if (this.source && !this.isPaused()) {
this.source.resume()
}
}

_write (file, encoding, callback) {
this.pushFile(file, () => {
callback()
})
}

_final (callback) {
this.pushChunk(Buffer.from(PADDING + this._boundary + PADDING + NEW_LINE), true)
// Flush the rest and finish
if (this.bufferOffset && !this.destroyed) {
const slice = this.buffer.slice(0, this.bufferOffset)
this.push(slice)
this.bufferOffset = 0
}
this.push(null)
callback()
}

pauseAll () {
this.pause()
if (this.source) {
this.source.pause()
}
}

resumeAll () {
this.resume()
if (this.source) {
this.source.resume()
}
}
/**
* Push chunk
*
* @param {Buffer} chunk
* @param {boolean} [isExtra=false]
* @return {boolean}
*/
pushChunk (chunk, isExtra = false) {
let result = true
if (chunk === null) {
return this.push(null)
}

if (!this.chunkSize) {
return this.push(chunk)
}

if (isExtra) {
this.extraBytes += chunk.length
}

// If we have enough bytes in this chunk to get buffer up to chunkSize,
// fill in buffer, push it, and reset its offset.
// Otherwise, just copy the entire chunk in to buffer.
const bytesNeeded = (this.chunkSize - this.bufferOffset)
if (chunk.length >= bytesNeeded) {
chunk.copy(this.buffer, this.bufferOffset, 0, bytesNeeded)
result = this.push(this.buffer)
this.bufferOffset = 0
// Handle leftovers from the chunk
const leftovers = chunk.slice(0, chunk.length - bytesNeeded)
let size = leftovers.length
while (size >= this.chunkSize) {
result = this.push(chunk.slice(this.bufferOffset, this.bufferOffset + this.chunkSize))
this.bufferOffset += this.chunkSize
size -= this.chunkSize
}
// if we still have anything left copy to the buffer
chunk.copy(this.buffer, 0, this.bufferOffset, this.bufferOffset + size)
this.bufferOffset = size
} else {
chunk.copy(this.buffer, this.bufferOffset)
this.bufferOffset += chunk.length
}

return result
}

pushFile (file, callback) {
this.pushChunk(leading(file.headers, this._boundary), true)

let content = file.content || Buffer.alloc(0)

if (Buffer.isBuffer(content)) {
this.pushChunk(content)
this.pushChunk(NEW_LINE_BUFFER, true)
return callback()
}

if (isSource(content)) {
content = toStream.source(content)
}
this.source = content

// From now on we assume content is a stream
content.on('data', (data) => {
if (!this.pushChunk(data)) {
content.pause()
}
})
content.once('error', this.emit.bind(this, 'error'))

content.once('end', () => {
this.pushChunk(NEW_LINE_BUFFER, true)
callback()
})
}
}

module.exports = Multipart
Loading