Skip to content

Commit

Permalink
Merge pull request #14348 from Automattic/vkarpov15/gh-14331
Browse files Browse the repository at this point in the history
fix(cursor): make aggregation cursor support `transform` option to match query cursor
  • Loading branch information
vkarpov15 authored Feb 14, 2024
2 parents 1f340d4 + a6790a1 commit b9e1f75
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 0 deletions.
8 changes: 8 additions & 0 deletions lib/cursor/aggregationCursor.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,20 @@ util.inherits(AggregationCursor, Readable);
function _init(model, c, agg) {
if (!model.collection.buffer) {
model.hooks.execPre('aggregate', agg, function() {
if (typeof agg.options?.cursor?.transform === 'function') {
c._transforms.push(agg.options.cursor.transform);
}

c.cursor = model.collection.aggregate(agg._pipeline, agg.options || {});
c.emit('cursor', c.cursor);
});
} else {
model.collection.emitter.once('queue', function() {
model.hooks.execPre('aggregate', agg, function() {
if (typeof agg.options?.cursor?.transform === 'function') {
c._transforms.push(agg.options.cursor.transform);
}

c.cursor = model.collection.aggregate(agg._pipeline, agg.options || {});
c.emit('cursor', c.cursor);
});
Expand Down
27 changes: 27 additions & 0 deletions test/aggregate.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
const start = require('./common');

const assert = require('assert');
const stream = require('stream');

const Aggregate = require('../lib/aggregate');

Expand Down Expand Up @@ -1215,6 +1216,32 @@ describe('aggregate: ', function() {
assert.equal(res[1].test, 'a test');
});

it('cursor supports transform option (gh-14331)', async function() {
const mySchema = new Schema({ name: String });
const Test = db.model('Test', mySchema);

await Test.deleteMany({});
await Test.create([{ name: 'Apple' }, { name: 'Apple' }]);

let resolve;
const waitForStream = new Promise(innerResolve => {
resolve = innerResolve;
});
const otherStream = new stream.Writable({
write(chunk, encoding, callback) {
resolve(chunk.toString());
callback();
}
});

await Test.
aggregate([{ $match: { name: 'Apple' } }]).
cursor({ transform: JSON.stringify }).
pipe(otherStream);
const streamValue = await waitForStream;
assert.ok(streamValue.includes('"name":"Apple"'), streamValue);
});

describe('Mongo 3.6 options', function() {
before(async function() {
await onlyTestAtOrAbove('3.6', this);
Expand Down

0 comments on commit b9e1f75

Please sign in to comment.