Skip to content
This repository has been archived by the owner on Mar 10, 2020. It is now read-only.

Commit

Permalink
fix: simplify write command
Browse files Browse the repository at this point in the history
Constructs a pull-stream out of the parts of the file we want to write/overwrite
and uses the unixfs importer to rewrite & rebalance dag nodes on writing.

Deletes lots of code and fixes a problem where large rewritten DAGs can become
corrupt under heavy load.

License: MIT
Signed-off-by: achingbrain <alex@achingbrain.net>
  • Loading branch information
achingbrain committed Sep 28, 2018
1 parent fedd9af commit 710a2d6
Show file tree
Hide file tree
Showing 13 changed files with 203 additions and 519 deletions.
12 changes: 6 additions & 6 deletions src/core/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ const writeOperations = {
}

// These operations are asynchronous and manage their own locking
const upwrappedOperations = {
const unwrappedOperations = {
write: require('./write'),
read: require('./read')
}

// These operations are synchronous and manage their own locking
const upwrappedSynchronousOperations = {
const unwrappedSynchronousOperations = {
readPullStream: require('./read-pull-stream'),
readReadableStream: require('./read-readable-stream')
}
Expand Down Expand Up @@ -68,12 +68,12 @@ module.exports = (ipfs, options) => {
ipfs, mfs, operations: writeOperations, lock: writeLock
})

Object.keys(upwrappedOperations).forEach(key => {
mfs[key] = promisify(upwrappedOperations[key](ipfs))
Object.keys(unwrappedOperations).forEach(key => {
mfs[key] = promisify(unwrappedOperations[key](ipfs))
})

Object.keys(upwrappedSynchronousOperations).forEach(key => {
mfs[key] = upwrappedSynchronousOperations[key](ipfs)
Object.keys(unwrappedSynchronousOperations).forEach(key => {
mfs[key] = unwrappedSynchronousOperations[key](ipfs)
})

return mfs
Expand Down
2 changes: 2 additions & 0 deletions src/core/utils/count-stream-bytes.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ const countStreamBytes = (callback) => {

return through((buffer) => {
bytesRead += buffer.length

return buffer
}, () => {
callback(bytesRead)
})
Expand Down
2 changes: 1 addition & 1 deletion src/core/utils/create-lock.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ module.exports = (repoOwner) => {
mutex[`${type}Lock`](() => {
return new Promise((resolve, reject) => {
args.push((error, result) => {
log(`${type} operation callback invoked${error ? ' with error: ' + error.message : ''}`)
log(`${type.substring(0, 1).toUpperCase()}${type.substring(1)} operation callback invoked${error ? ' with error: ' + error.message : ''}`)

if (error) {
return reject(error)
Expand Down
9 changes: 0 additions & 9 deletions src/core/utils/end-pull-stream.js

This file was deleted.

2 changes: 1 addition & 1 deletion src/core/utils/limit-stream-bytes.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const limitStreamBytes = (limit) => {

return asyncMap((buffer, cb) => {
if (bytesRead > limit) {
cb(true) // eslint-disable-line standard/no-callback-literal
return cb(true) // eslint-disable-line standard/no-callback-literal
}

// If we only need to return part of this buffer, slice it to make it smaller
Expand Down
4 changes: 2 additions & 2 deletions src/core/utils/traverse-to.js
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,10 @@ const traverseToMfsObject = (ipfs, path, options, callback) => {
log(`index ${index} pathSegments.length ${pathSegments.length} pathSegment ${pathSegment} lastComponent ${lastComponent}`, options)

if (lastComponent && !options.createLastComponent) {
log(`Last segment of ${path} did not exist`)
log(`Last segment of ${path.path} did not exist`)
return done(new NonFatalError('file does not exist'))
} else if (!lastComponent && !options.parents) {
log(`Cannot traverse to ${path} - ${pathSegment} did not exist`)
log(`Cannot traverse to ${path.path} - ${pathSegment} did not exist`)
return done(new NonFatalError('file does not exist'))
}

Expand Down
33 changes: 0 additions & 33 deletions src/core/write/import-node.js

This file was deleted.

57 changes: 17 additions & 40 deletions src/core/write/index.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
'use strict'

const promisify = require('promisify-es6')
const CID = require('cids')
const waterfall = require('async/waterfall')
const parallel = require('async/parallel')
const {
Expand All @@ -11,15 +10,11 @@ const {
traverseTo,
addLink,
updateTree,
limitStreamBytes,
toPullSource
toPullSource,
loadNode
} = require('../utils')
const values = require('pull-stream/sources/values')
const log = require('debug')('ipfs:mfs:write')
const importNode = require('./import-node')
const updateNode = require('./update-node')
const cat = require('pull-cat')
const pull = require('pull-stream/pull')

const write = require('./write')

const defaultOptions = {
offset: 0, // the offset in the file to begin writing
Expand Down Expand Up @@ -55,11 +50,7 @@ module.exports = function mfsWrite (ipfs) {
return callback(new Error('cannot have negative byte count'))
}

if (options.length === 0 && !options.truncate) {
return callback()
}

if (!options.length) {
if (!options.length && options.length !== 0) {
options.length = Infinity
}

Expand Down Expand Up @@ -111,34 +102,18 @@ const updateOrImport = (ipfs, options, path, source, containingFolder, callback)
}, null)

if (existingChild) {
const cid = new CID(existingChild.multihash)
log(`Updating linked DAGNode ${cid.toBaseEncodedString()}`)

// overwrite the existing file or part of it, possibly truncating what's left
updateNode(ipfs, cid, source, options, next)
} else {
if (!options.create) {
return next(new Error('file does not exist'))
}

if (options.offset) {
options.length += options.offset
return loadNode(ipfs, existingChild, next)
}

// pad the start of the stream with a buffer full of zeros
source = cat([
values([Buffer.alloc(options.offset, 0)]),
source
])
}
if (!options.create) {
return next(new Error('file does not exist'))
}

source = pull(
source,
limitStreamBytes(options.length)
)
next(null, null)
},

log('Importing file', path.name)
importNode(ipfs, source, options, next)
}
(existingChild, next) => {
write(ipfs, existingChild, source, options, next)
},

// The slow bit is done, now add or replace the DAGLink in the containing directory
Expand Down Expand Up @@ -176,7 +151,9 @@ const updateOrImport = (ipfs, options, path, source, containingFolder, callback)

// Update the MFS record with the new CID for the root of the tree
(newRoot, next) => updateMfsRoot(ipfs, newRoot.node.multihash, next)
], callback)
], (error, result) => {
callback(error, result)
})
})
})(next)
}], callback)
Expand Down
46 changes: 0 additions & 46 deletions src/core/write/truncate-node.js

This file was deleted.

Loading

0 comments on commit 710a2d6

Please sign in to comment.