Skip to content

Commit

Permalink
storage: createReadStream: accept start/end offsets
Browse files Browse the repository at this point in the history
  • Loading branch information
stephenplusplus committed Dec 23, 2014
1 parent b7ac20e commit cae3cb6
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 14 deletions.
42 changes: 40 additions & 2 deletions lib/storage/file.js
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,14 @@ File.prototype.copy = function(destination, callback) {
* hash wasn't returned from the API. CRC32c will provide better performance
* with less reliability. You may also choose to skip validation completely,
* however this is **not recommended**.
* @param {number} options.start - A byte offset to begin the file's download
* from. NOTE: Byte ranges are inclusive; that is, `options.start = 0` and
* `options.end = 999` represent the first 1000 bytes in a file or object.
* NOTE: when specifying a byte range, data integrity is not available.
* @param {number} options.end - A byte offset to stop reading the file at.
* NOTE: Byte ranges are inclusive; that is, `options.start = 0` and
* `options.end = 999` represent the first 1000 bytes in a file or object.
* NOTE: when specifying a byte range, data integrity is not available.
*
* @example
* //-
Expand All @@ -238,12 +246,25 @@ File.prototype.copy = function(destination, callback) {
* image.createReadStream()
* .pipe(fs.createWriteStream('/Users/stephen/Photos/image.png'))
* .on('error', function(err) {});
*
* //-
* // To limit the downloaded data to only a byte range, pass an options object.
* //-
* var logFile = myBucket.file('access_log');
* logFile.createReadStream({
* start: 10000,
* end: 20000
* })
* .pipe(fs.createWriteStream('/Users/stephen/logfile.txt'))
* .on('error', function(err) {});
*/
File.prototype.createReadStream = function(options) {
options = options || {};

var that = this;
var throughStream = through();
var rangeRequest =
util.is(options.start, 'number') || util.is(options.end, 'number');
var throughStream = streamEvents(through());

var validations = ['crc32c', 'md5'];
var validation;
Expand All @@ -262,6 +283,10 @@ File.prototype.createReadStream = function(options) {
validation = 'all';
}

if (rangeRequest) {
validation = false;
}

var crc32c = validation === 'crc32c' || validation === 'all';
var md5 = validation === 'md5' || validation === 'all';

Expand All @@ -288,6 +313,12 @@ File.prototype.createReadStream = function(options) {
uri: uri
};

if (rangeRequest) {
reqOpts.headers = {
Range: 'bytes=' + [options.start || '', options.end || ''].join('-')
};
}

that.bucket.storage.makeAuthorizedRequest_(reqOpts, {
onAuthorized: function(err, authorizedReqOpts) {
if (err) {
Expand Down Expand Up @@ -318,6 +349,13 @@ File.prototype.createReadStream = function(options) {
})

.on('complete', function(res) {
if (rangeRequest) {
// Range requests can't receive data integrity checks.
throughStream.emit('complete', res);
throughStream.end();
return;
}

var failed = false;
var crcFail = true;
var md5Fail = true;
Expand Down Expand Up @@ -356,7 +394,7 @@ File.prototype.createReadStream = function(options) {

throughStream.emit('error', error);
} else {
throughStream.emit('complete');
throughStream.emit('complete', res);
}

throughStream.end();
Expand Down
24 changes: 24 additions & 0 deletions regression/storage.js
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,30 @@ describe('storage', function() {
});
});

it('should read a byte range from a file', function(done) {
bucket.upload(files.big.path, function(err, file) {
assert.ifError(err);

var fileSize = file.metadata.size;
var byteRange = {
start: Math.floor(fileSize * 1/3),
end: Math.floor(fileSize * 2/3)
};
var expectedContentSize = byteRange.start + 1;

var sizeStreamed = 0;
file.createReadStream(byteRange)
.on('data', function (chunk) {
sizeStreamed += chunk.length;
})
.on('error', done)
.on('complete', function() {
assert.equal(sizeStreamed, expectedContentSize);
file.delete(done);
});
});
});

describe('stream write', function() {
it('should stream write, then remove file (3mb)', function(done) {
var file = bucket.file('LargeFile');
Expand Down
77 changes: 65 additions & 12 deletions test/storage/file.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ nodeutil.inherits(FakeDuplexify, stream.Duplex);
var makeWritableStream_Override;
var fakeUtil = extend({}, util, {
makeWritableStream: function() {
var args = [].slice.call(arguments);
var args = util.toArray(arguments);
(makeWritableStream_Override || util.makeWritableStream).apply(null, args);
}
});
Expand All @@ -62,7 +62,7 @@ var request_Cached = request;
var request_Override;

function fakeRequest() {
var args = [].slice.apply(arguments);
var args = util.toArray(arguments);
var results = (request_Override || request_Cached).apply(null, args);
return results;
}
Expand All @@ -85,14 +85,9 @@ function FakeConfigStore() {
describe('File', function() {
var File;
var FILE_NAME = 'file-name.png';
var options = {
makeAuthorizedRequest_: function(req, callback) {
(callback.onAuthorized || callback)(null, req);
}
};
var bucket = new Bucket(options, 'bucket-name');
var file;
var directoryFile;
var bucket;

before(function() {
mockery.registerMock('configstore', FakeConfigStore);
Expand All @@ -112,14 +107,21 @@ describe('File', function() {
});

beforeEach(function() {
makeWritableStream_Override = null;
request_Override = null;
var options = {
makeAuthorizedRequest_: function(req, callback) {
(callback.onAuthorized || callback)(null, req);
}
};
bucket = new Bucket(options, 'bucket-name');

file = new File(bucket, FILE_NAME);
file.makeReq_ = util.noop;

directoryFile = new File(bucket, 'directory/file.jpg');
directoryFile.makeReq_ = util.noop;

makeWritableStream_Override = null;
request_Override = null;
});

describe('initialization', function() {
Expand Down Expand Up @@ -387,7 +389,9 @@ describe('File', function() {

file.createReadStream({ validation: 'crc32c' })
.on('error', done)
.on('complete', done);
.on('complete', function () {
done();
});
});

it('should emit an error if crc32c validation fails', function(done) {
Expand All @@ -405,7 +409,9 @@ describe('File', function() {

file.createReadStream({ validation: 'md5' })
.on('error', done)
.on('complete', done);
.on('complete', function () {
done();
});
});

it('should emit an error if md5 validation fails', function(done) {
Expand All @@ -430,6 +436,53 @@ describe('File', function() {
});
});
});

it('should accept a start range', function(done) {
var startOffset = 100;

request_Override = function(opts) {
setImmediate(function () {
assert.equal(opts.headers.Range, 'bytes=' + startOffset + '-');
done();
});
return duplexify();
};

file.metadata = metadata;
file.createReadStream({ start: startOffset });
});

it('should accept an end range', function(done) {
var endOffset = 100;

request_Override = function(opts) {
setImmediate(function () {
assert.equal(opts.headers.Range, 'bytes=-' + endOffset);
done();
});
return duplexify();
};

file.metadata = metadata;
file.createReadStream({ end: endOffset });
});

it('should accept both a start and end range', function(done) {
var startOffset = 100;
var endOffset = 101;

request_Override = function(opts) {
setImmediate(function () {
var expectedRange = 'bytes=' + startOffset + '-' + endOffset;
assert.equal(opts.headers.Range, expectedRange);
done();
});
return duplexify();
};

file.metadata = metadata;
file.createReadStream({ start: startOffset, end: endOffset });
});
});

describe('createWriteStream', function() {
Expand Down

0 comments on commit cae3cb6

Please sign in to comment.