From a459704184390f7e591be3e62e9f00533a20d907 Mon Sep 17 00:00:00 2001 From: Warren James Date: Mon, 28 Jun 2021 15:32:57 -0400 Subject: [PATCH 01/12] test: Add test to check that change streams can be used as iterators or emitters, but not both concurrently --- test/functional/change_stream.test.js | 61 +++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index 1ee60286e8..8d25aef856 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -1792,6 +1792,67 @@ describe('Change Streams', function () { } }); + // FIXME: NODE-1797 + describe('should error when used as iterator and emitter concurrently', function () { + let client, coll, changeStream; + + beforeEach(function () { + client = this.configuration.newClient(); + return client.connect().then(_client => { + client = _client; + coll = client.db(this.configuration.db).collection('tester'); + changeStream = coll.watch(); + }); + }); + + afterEach(function () { + return Promise.resolve() + .then(() => { + if (changeStream && !changeStream.closed) { + return changeStream.close(); + } + }) + .then(() => { + if (client) { + return client.close(); + } + }) + .then(() => { + coll = undefined; + changeStream = undefined; + client = undefined; + }); + }); + + // TODO: Better Errors + it('should throw MongoDriverError when set as an emitter and used as an iterator', { + metadata: { requires: { topology: 'single', mongodb: '>=3.6' } }, + test: function () { + changeStream.on('change', nextVal => { + console.log(nextVal); + }); + + expect(() => { + changeStream.next().then(); + }).to.throw(); + } + }); + + it('should throw MongoDriverError when set as an iterator and used as an emitter', { + metadata: { requires: { topology: 'single', mongodb: '>=3.6' } }, + test: function () { + function readIter() { + return changeStream.next(); + } + readIter().then(); + expect(() => { + changeStream.on('change', nextVal => { + console.log(nextVal); + }); + }).to.throw(); + } + }); + }); describe('should properly handle a changeStream event being processed mid-close', function () { let client, coll, changeStream; From c16731885a58763baa0c9733c1c586ea84a44220 Mon Sep 17 00:00:00 2001 From: Warren James Date: Mon, 28 Jun 2021 15:37:28 -0400 Subject: [PATCH 02/12] fix: (wip) start fix to ensure change streams can be used as iterators or emitters, but not both at once --- src/change_stream.ts | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/change_stream.ts b/src/change_stream.ts index 2e89355359..33ba294d5b 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -204,6 +204,9 @@ export class ChangeStream extends TypedEventEmitter extends TypedEventEmitter extends TypedEventEmitter + ): ChangeStream { + if (this.isIterator) { + throw new MongoDriverError( + 'Cannot use ChangeStream as EventEmitter if it has already been used as an iterator' + ); + } + return super.on(event, callback); + } } /** @internal */ From 1e8e112c0fe9d067a942e82ab1ac9d55344c1ca3 Mon Sep 17 00:00:00 2001 From: Warren James Date: Wed, 30 Jun 2021 14:06:48 -0400 Subject: [PATCH 03/12] fix: (wip) Fixing tests --- src/change_stream.ts | 37 +++++--- test/functional/change_stream.test.js | 116 +++++++++++++++++++++----- 2 files changed, 119 insertions(+), 34 deletions(-) diff --git a/src/change_stream.ts b/src/change_stream.ts index 33ba294d5b..bbe3ae0b8b 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -278,6 +278,13 @@ export class ChangeStream extends TypedEventEmitter { + if (eventName !== 'removeListener') { + if (this.isIterator) + throw new MongoDriverError( + 'Cannot use ChangeStream as emitter after using as an iterator' + ); + this.isEmitter = true; + } if (eventName === 'change' && this.cursor && this.listenerCount('change') === 0) { streamEvents(this, this.cursor); } @@ -302,6 +309,12 @@ export class ChangeStream extends TypedEventEmitter | void { + if (this.isEmitter) { + throw new MongoDriverError( + 'Cannot use ChangeStream as iterator after using as an EventEmitter' + ); + } + this.isIterator = true; return maybePromise(callback, cb => { getCursor(this, (err, cursor) => { if (err || !cursor) return cb(err); // failed to resume, raise an error @@ -316,6 +329,12 @@ export class ChangeStream extends TypedEventEmitter> ): Promise> | void { + if (this.isEmitter) { + throw new MongoDriverError( + 'Cannot use ChangeStream as iterator after using as an EventEmitter' + ); + } + this.isIterator = true; return maybePromise(callback, cb => { getCursor(this, (err, cursor) => { if (err || !cursor) return cb(err); // failed to resume, raise an error @@ -370,6 +389,12 @@ export class ChangeStream extends TypedEventEmitter; tryNext(callback: Callback): void; tryNext(callback?: Callback): Promise | void { + if (this.isEmitter) { + throw new MongoDriverError( + 'Cannot use ChangeStream as iterator after using as an EventEmitter' + ); + } + this.isIterator = true; return maybePromise(callback, cb => { getCursor(this, (err, cursor) => { if (err || !cursor) return cb(err); // failed to resume, raise an error @@ -377,18 +402,6 @@ export class ChangeStream extends TypedEventEmitter - ): ChangeStream { - if (this.isIterator) { - throw new MongoDriverError( - 'Cannot use ChangeStream as EventEmitter if it has already been used as an iterator' - ); - } - return super.on(event, callback); - } } /** @internal */ diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index 8d25aef856..2294e9a640 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -1,7 +1,7 @@ 'use strict'; const assert = require('assert'); const { Transform, PassThrough } = require('stream'); -const { MongoNetworkError } = require('../../src/error'); +const { MongoNetworkError, MongoDriverError } = require('../../src/error'); const { delay, setupDatabase, withClient, withCursor } = require('./shared'); const co = require('co'); const mock = require('../tools/mock'); @@ -1825,34 +1825,106 @@ describe('Change Streams', function () { }); // TODO: Better Errors - it('should throw MongoDriverError when set as an emitter and used as an iterator', { - metadata: { requires: { topology: 'single', mongodb: '>=3.6' } }, - test: function () { - changeStream.on('change', nextVal => { - console.log(nextVal); - }); - - expect(() => { - changeStream.next().then(); - }).to.throw(); + it( + 'should throw MongoDriverError when set as an emitter and used as an iterator using promises', + { + metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, + test: function (done) { + changeStream.on('change', console.log); + coll.insertOne({ c: 10 }); + try { + changeStream + .hasNext() + .then() + .catch(err => expect.fail(err ? err.message : '')); + } catch (error) { + expect(error).to.be.instanceof(MongoDriverError); + done(); + return; + } + expect.fail('Should not reach here'); + } } - }); - - it('should throw MongoDriverError when set as an iterator and used as an emitter', { - metadata: { requires: { topology: 'single', mongodb: '>=3.6' } }, - test: function () { - function readIter() { - return changeStream.next(); + ); + + // FIXME: + it( + 'should throw MongoDriverError when set as an iterator and used as an emitter using promises', + { + metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, + test: function (done) { + coll.insertOne({ c: 10 }); + let promise = changeStream.hasNext(); + promise + .then(() => { + try { + changeStream.on('change', console.log); + } catch (error) { + expect(error).to.be.instanceof(MongoDriverError); + done(); + return; + } + expect.fail('Should not reach here'); + }) + .catch(err => { + expect.fail(err.message); + }); } - readIter().then(); - expect(() => { + } + ); + + it( + 'should throw MongoDriverError when set as an emitter and used as an iterator using callbacks', + { + metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, + test: function (done) { changeStream.on('change', nextVal => { console.log(nextVal); }); - }).to.throw(); + coll.insertOne({ c: 10 }); + + try { + changeStream.hasNext(console.log); + } catch (error) { + console.log('Hello there'); + expect(error).to.be.instanceof(MongoDriverError); + done(); + return; + } + expect.fail('Should not reach here'); + } } - }); + ); + + // FIXME: + it( + 'should throw MongoDriverError when set as an iterator and used as an emitter using callbacks', + { + metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, + test: function (done) { + coll.insertOne({ c: 10 }).then(() => { + console.log('About to call hasNext'); + changeStream.next((err, res) => { + console.log(res); + console.log('Inside hasNext callback'); + try { + changeStream.on('change', console.log); + console.log('Here now'); + } catch (error) { + expect(error).to.be.instanceof(MongoDriverError); + console.log("now I'm here"); + done(); + return; + } + console.log('oopsie'); + expect.fail('Should not reach here'); + }); + }); + } + } + ); }); + describe('should properly handle a changeStream event being processed mid-close', function () { let client, coll, changeStream; From ed6cb45e1f058ecc12f7d4ed79f2107a7c9ccff7 Mon Sep 17 00:00:00 2001 From: Warren James Date: Wed, 30 Jun 2021 17:14:33 -0400 Subject: [PATCH 04/12] test: Fix tests for 'on' and 'hasNext' --- test/functional/change_stream.test.js | 137 ++++++++++---------------- 1 file changed, 53 insertions(+), 84 deletions(-) diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index 2294e9a640..fda9197c35 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -1794,101 +1794,80 @@ describe('Change Streams', function () { // FIXME: NODE-1797 describe('should error when used as iterator and emitter concurrently', function () { - let client, coll, changeStream; + let client, coll, changeStream, repeatInsert, val; - beforeEach(function () { + beforeEach(async function () { + val = 0; client = this.configuration.newClient(); - return client.connect().then(_client => { - client = _client; - coll = client.db(this.configuration.db).collection('tester'); - changeStream = coll.watch(); - }); + await client.connect().catch(() => expect.fail('Failed to connect to client')); + + coll = client.db(this.configuration.db).collection('tester'); + changeStream = coll.watch(); + + repeatInsert = setInterval(async function () { + await coll.insertOne({ c: val++ }).catch('Failed to insert document'); + }, 100); }); - afterEach(function () { - return Promise.resolve() - .then(() => { - if (changeStream && !changeStream.closed) { - return changeStream.close(); - } - }) - .then(() => { - if (client) { - return client.close(); - } - }) - .then(() => { - coll = undefined; - changeStream = undefined; - client = undefined; - }); + afterEach(async function () { + if (changeStream && !changeStream.closed) { + await changeStream.close(); + } + + if (client && !client.closed) { + await client.close(); + } + + clearInterval(repeatInsert); }); // TODO: Better Errors it( - 'should throw MongoDriverError when set as an emitter and used as an iterator using promises', + 'should throw MongoDriverError when set as an emitter with "on" and used as an iterator with "hasNext" using promises', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, - test: function (done) { + test: async function () { changeStream.on('change', console.log); - coll.insertOne({ c: 10 }); try { - changeStream - .hasNext() - .then() - .catch(err => expect.fail(err ? err.message : '')); + await changeStream.hasNext().catch(err => { + expect.fail(err.message); + }); } catch (error) { expect(error).to.be.instanceof(MongoDriverError); - done(); - return; } - expect.fail('Should not reach here'); } } ); - // FIXME: it( - 'should throw MongoDriverError when set as an iterator and used as an emitter using promises', + 'should throw MongoDriverError when set as an emitter with "on" and used as an iterator with "hasNext" using callbacks', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, - test: function (done) { - coll.insertOne({ c: 10 }); - let promise = changeStream.hasNext(); - promise - .then(() => { - try { - changeStream.on('change', console.log); - } catch (error) { - expect(error).to.be.instanceof(MongoDriverError); - done(); - return; - } - expect.fail('Should not reach here'); - }) - .catch(err => { - expect.fail(err.message); - }); + test: async function () { + changeStream.on('change', console.log); + + try { + changeStream.hasNext(console.log); + } catch (error) { + expect(error).to.be.instanceof(MongoDriverError); + return; + } + expect.fail('Should not reach here'); } } ); - it( - 'should throw MongoDriverError when set as an emitter and used as an iterator using callbacks', + 'should throw MongoDriverError when set as an iterator with "hasNext" and used as an emitter with "on" using promises', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, - test: function (done) { - changeStream.on('change', nextVal => { - console.log(nextVal); - }); - coll.insertOne({ c: 10 }); - + test: async function () { + await changeStream + .hasNext() + .catch(() => expect.fail('Failed to set changeStream to iterator')); try { - changeStream.hasNext(console.log); + changeStream.on('change', console.log); } catch (error) { - console.log('Hello there'); expect(error).to.be.instanceof(MongoDriverError); - done(); return; } expect.fail('Should not reach here'); @@ -1896,29 +1875,19 @@ describe('Change Streams', function () { } ); - // FIXME: it( - 'should throw MongoDriverError when set as an iterator and used as an emitter using callbacks', + 'should throw MongoDriverError when set as an iterator with "hasNext" and used as an emitter with "on" using callbacks', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, - test: function (done) { - coll.insertOne({ c: 10 }).then(() => { - console.log('About to call hasNext'); - changeStream.next((err, res) => { - console.log(res); - console.log('Inside hasNext callback'); - try { - changeStream.on('change', console.log); - console.log('Here now'); - } catch (error) { - expect(error).to.be.instanceof(MongoDriverError); - console.log("now I'm here"); - done(); - return; - } - console.log('oopsie'); - expect.fail('Should not reach here'); - }); + test: async function () { + changeStream.hasNext(() => { + try { + changeStream.on('change', console.log); + } catch (error) { + expect(error).to.be.instanceof(MongoDriverError); + return; + } + expect.fail('Should not reach here'); }); } } From f3d2af0b403aee6d8d8c933df7d4bcb8bce1d526 Mon Sep 17 00:00:00 2001 From: Warren James Date: Fri, 2 Jul 2021 09:42:33 -0400 Subject: [PATCH 05/12] fix: Throw error MongoDriverError only when attempting to watch changes --- src/change_stream.ts | 54 ++++++++++++++++++++++++-------------------- 1 file changed, 29 insertions(+), 25 deletions(-) diff --git a/src/change_stream.ts b/src/change_stream.ts index bbe3ae0b8b..d0d1cf127d 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -278,13 +278,6 @@ export class ChangeStream extends TypedEventEmitter { - if (eventName !== 'removeListener') { - if (this.isIterator) - throw new MongoDriverError( - 'Cannot use ChangeStream as emitter after using as an iterator' - ); - this.isEmitter = true; - } if (eventName === 'change' && this.cursor && this.listenerCount('change') === 0) { streamEvents(this, this.cursor); } @@ -309,12 +302,7 @@ export class ChangeStream extends TypedEventEmitter | void { - if (this.isEmitter) { - throw new MongoDriverError( - 'Cannot use ChangeStream as iterator after using as an EventEmitter' - ); - } - this.isIterator = true; + this._setIsIterator(); return maybePromise(callback, cb => { getCursor(this, (err, cursor) => { if (err || !cursor) return cb(err); // failed to resume, raise an error @@ -329,12 +317,7 @@ export class ChangeStream extends TypedEventEmitter> ): Promise> | void { - if (this.isEmitter) { - throw new MongoDriverError( - 'Cannot use ChangeStream as iterator after using as an EventEmitter' - ); - } - this.isIterator = true; + this._setIsIterator(); return maybePromise(callback, cb => { getCursor(this, (err, cursor) => { if (err || !cursor) return cb(err); // failed to resume, raise an error @@ -389,12 +372,7 @@ export class ChangeStream extends TypedEventEmitter; tryNext(callback: Callback): void; tryNext(callback?: Callback): Promise | void { - if (this.isEmitter) { - throw new MongoDriverError( - 'Cannot use ChangeStream as iterator after using as an EventEmitter' - ); - } - this.isIterator = true; + this._setIsIterator(); return maybePromise(callback, cb => { getCursor(this, (err, cursor) => { if (err || !cursor) return cb(err); // failed to resume, raise an error @@ -402,6 +380,16 @@ export class ChangeStream extends TypedEventEmitter( changeStream: ChangeStream, cursor: ChangeStreamCursor ): void { + changeStream._setIsEmitter(); const stream = changeStream[kCursorStream] || cursor.stream(); changeStream[kCursorStream] = stream; stream.on('data', change => processNewChange(changeStream, change)); @@ -767,6 +756,21 @@ function processError( return closeWithError(changeStream, error, callback); } +function errorIfIsEmmiter(changeStream: ChangeStream) { + if (changeStream.isEmitter) { + throw new MongoDriverError( + 'Cannot use ChangeStream as iterator after using as an EventEmitter' + ); + } +} + +function errorIfIsIterator(changeStream: ChangeStream) { + if (changeStream.isIterator) { + throw new MongoDriverError( + 'Cannot use ChangeStream as an EventEmitter after using as an iterator' + ); + } +} /** * Safely provides a cursor across resume attempts * From 2bf784ae02de40a29ef07266fdf4f65a1d0fb85c Mon Sep 17 00:00:00 2001 From: Warren James Date: Fri, 2 Jul 2021 09:42:59 -0400 Subject: [PATCH 06/12] test: Clean up test code --- test/functional/change_stream.test.js | 124 ++++++++++++++++++++------ 1 file changed, 97 insertions(+), 27 deletions(-) diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index fda9197c35..6c9de055f0 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -1795,9 +1795,9 @@ describe('Change Streams', function () { // FIXME: NODE-1797 describe('should error when used as iterator and emitter concurrently', function () { let client, coll, changeStream, repeatInsert, val; + val = 0; beforeEach(async function () { - val = 0; client = this.configuration.newClient(); await client.connect().catch(() => expect.fail('Failed to connect to client')); @@ -1805,20 +1805,25 @@ describe('Change Streams', function () { changeStream = coll.watch(); repeatInsert = setInterval(async function () { - await coll.insertOne({ c: val++ }).catch('Failed to insert document'); - }, 100); + await coll.insertOne({ c: val }).catch('Failed to insert document'); + val++; + }, 75); }); afterEach(async function () { + if (repeatInsert) { + clearInterval(repeatInsert); + } + if (changeStream && !changeStream.closed) { - await changeStream.close(); + await changeStream.close().catch(changeStream.close); } - if (client && !client.closed) { - await client.close(); + if (client) { + await client.close().catch(client.close); } - clearInterval(repeatInsert); + await mock.cleanup(); }); // TODO: Better Errors @@ -1827,14 +1832,15 @@ describe('Change Streams', function () { { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, test: async function () { - changeStream.on('change', console.log); + changeStream.on('change', () => {}); try { await changeStream.hasNext().catch(err => { expect.fail(err.message); }); } catch (error) { - expect(error).to.be.instanceof(MongoDriverError); + return expect(error).to.be.instanceof(MongoDriverError); } + return expect.fail('Should not reach here'); } } ); @@ -1844,15 +1850,14 @@ describe('Change Streams', function () { { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, test: async function () { - changeStream.on('change', console.log); + changeStream.on('change', () => {}); try { - changeStream.hasNext(console.log); + changeStream.hasNext(() => {}); } catch (error) { - expect(error).to.be.instanceof(MongoDriverError); - return; + return expect(error).to.be.instanceof(MongoDriverError); } - expect.fail('Should not reach here'); + return expect.fail('Should not reach here'); } } ); @@ -1865,12 +1870,11 @@ describe('Change Streams', function () { .hasNext() .catch(() => expect.fail('Failed to set changeStream to iterator')); try { - changeStream.on('change', console.log); + changeStream.on('change', () => {}); } catch (error) { - expect(error).to.be.instanceof(MongoDriverError); - return; + return expect(error).to.be.instanceof(MongoDriverError); } - expect.fail('Should not reach here'); + return expect.fail('Should not reach here'); } } ); @@ -1880,15 +1884,80 @@ describe('Change Streams', function () { { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, test: async function () { - changeStream.hasNext(() => { - try { - changeStream.on('change', console.log); - } catch (error) { - expect(error).to.be.instanceof(MongoDriverError); - return; - } - expect.fail('Should not reach here'); - }); + changeStream.hasNext(() => {}); + try { + changeStream.on('change', () => {}); + } catch (error) { + return expect(error).to.be.instanceof(MongoDriverError); + } + return expect.fail('Should not reach here'); + } + } + ); + it( + 'should throw MongoDriverError when set as an emitter with "once" and used as an iterator with "next" using promises', + { + metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, + test: async function () { + changeStream.once('change', () => {}); + try { + await changeStream.next().catch(err => { + expect.fail(err.message); + }); + } catch (error) { + return expect(error).to.be.instanceof(MongoDriverError); + } + return expect.fail('Should not reach here'); + } + } + ); + + it( + 'should throw MongoDriverError when set as an emitter with "once" and used as an iterator with "next" using callbacks', + { + metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, + test: async function () { + changeStream.once('change', () => {}); + + try { + changeStream.next(() => {}); + } catch (error) { + return expect(error).to.be.instanceof(MongoDriverError); + } + return expect.fail('Should not reach here'); + } + } + ); + it( + 'should throw MongoDriverError when set as an iterator with "tryNext" and used as an emitter with "on" using promises', + { + metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, + test: async function () { + await changeStream + .tryNext() + .catch(() => expect.fail('Failed to set changeStream to iterator')); + try { + changeStream.on('change', () => {}); + } catch (error) { + return expect(error).to.be.instanceof(MongoDriverError); + } + return expect.fail('Should not reach here'); + } + } + ); + + it( + 'should throw MongoDriverError when set as an iterator with "tryNext" and used as an emitter with "on" using callbacks', + { + metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, + test: async function () { + changeStream.tryNext(() => {}); + try { + changeStream.on('change', () => {}); + } catch (error) { + return expect(error).to.be.instanceof(MongoDriverError); + } + return expect.fail('Should not reach here'); } } ); @@ -2077,6 +2146,7 @@ describe('Change Streams', function () { this.changeStream.on('resumeTokenChanged', resumeToken => { this.resumeTokenChangedEvents.push({ resumeToken }); }); + this.changeStream.isEmitter = false; return this.changeStream; } From a585b4b751d55213bb5bca1c4b7216233dbda71d Mon Sep 17 00:00:00 2001 From: Warren James Date: Fri, 2 Jul 2021 10:32:24 -0400 Subject: [PATCH 07/12] test: Ensure that callbacks execute before test cleanup --- test/functional/change_stream.test.js | 29 +++++++++++++-------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index 6c9de055f0..da831eb867 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -1814,13 +1814,12 @@ describe('Change Streams', function () { if (repeatInsert) { clearInterval(repeatInsert); } - if (changeStream && !changeStream.closed) { - await changeStream.close().catch(changeStream.close); + await changeStream.close(); } if (client) { - await client.close().catch(client.close); + await client.close(); } await mock.cleanup(); @@ -1832,7 +1831,7 @@ describe('Change Streams', function () { { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, test: async function () { - changeStream.on('change', () => {}); + await new Promise(resolve => changeStream.on('change', resolve)); try { await changeStream.hasNext().catch(err => { expect.fail(err.message); @@ -1850,10 +1849,10 @@ describe('Change Streams', function () { { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, test: async function () { - changeStream.on('change', () => {}); + await new Promise(resolve => changeStream.on('change', resolve)); try { - changeStream.hasNext(() => {}); + await new Promise(resolve => changeStream.hasNext(resolve)); } catch (error) { return expect(error).to.be.instanceof(MongoDriverError); } @@ -1870,7 +1869,7 @@ describe('Change Streams', function () { .hasNext() .catch(() => expect.fail('Failed to set changeStream to iterator')); try { - changeStream.on('change', () => {}); + await new Promise(resolve => changeStream.on('change', resolve)); } catch (error) { return expect(error).to.be.instanceof(MongoDriverError); } @@ -1884,9 +1883,9 @@ describe('Change Streams', function () { { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, test: async function () { - changeStream.hasNext(() => {}); + await new Promise(resolve => changeStream.hasNext(resolve)); try { - changeStream.on('change', () => {}); + await new Promise(resolve => changeStream.on('change', resolve)); } catch (error) { return expect(error).to.be.instanceof(MongoDriverError); } @@ -1899,7 +1898,7 @@ describe('Change Streams', function () { { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, test: async function () { - changeStream.once('change', () => {}); + await new Promise(resolve => changeStream.once('change', resolve)); try { await changeStream.next().catch(err => { expect.fail(err.message); @@ -1917,10 +1916,10 @@ describe('Change Streams', function () { { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, test: async function () { - changeStream.once('change', () => {}); + await new Promise(resolve => changeStream.once('change', resolve)); try { - changeStream.next(() => {}); + await new Promise(resolve => changeStream.next(resolve)); } catch (error) { return expect(error).to.be.instanceof(MongoDriverError); } @@ -1937,7 +1936,7 @@ describe('Change Streams', function () { .tryNext() .catch(() => expect.fail('Failed to set changeStream to iterator')); try { - changeStream.on('change', () => {}); + await new Promise(resolve => changeStream.on('change', resolve)); } catch (error) { return expect(error).to.be.instanceof(MongoDriverError); } @@ -1951,9 +1950,9 @@ describe('Change Streams', function () { { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, test: async function () { - changeStream.tryNext(() => {}); + await new Promise(resolve => changeStream.tryNext(resolve)); try { - changeStream.on('change', () => {}); + await new Promise(resolve => changeStream.on('change', resolve)); } catch (error) { return expect(error).to.be.instanceof(MongoDriverError); } From b929dd5a75a2bf3d7ef2f86d86fa705711a7687e Mon Sep 17 00:00:00 2001 From: Warren James Date: Tue, 6 Jul 2021 12:29:21 -0400 Subject: [PATCH 08/12] refactor: remove errorIfIsX functions and use Symbol for mode property --- src/change_stream.ts | 38 +++++++++++++++----------------------- 1 file changed, 15 insertions(+), 23 deletions(-) diff --git a/src/change_stream.ts b/src/change_stream.ts index d0d1cf127d..e0cc3824dd 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -34,6 +34,8 @@ const kResumeQueue = Symbol('resumeQueue'); const kCursorStream = Symbol('cursorStream'); /** @internal */ const kClosed = Symbol('closed'); +/** @internal */ +const kMode = Symbol('mode'); const CHANGE_STREAM_OPTIONS = ['resumeAfter', 'startAfter', 'startAtOperationTime', 'fullDocument']; const CURSOR_OPTIONS = ['batchSize', 'maxAwaitTimeMS', 'collation', 'readPreference'].concat( @@ -205,8 +207,7 @@ export class ChangeStream extends TypedEventEmitter extends TypedEventEmitter extends TypedEventEmitter( return closeWithError(changeStream, error, callback); } -function errorIfIsEmmiter(changeStream: ChangeStream) { - if (changeStream.isEmitter) { - throw new MongoDriverError( - 'Cannot use ChangeStream as iterator after using as an EventEmitter' - ); - } -} - -function errorIfIsIterator(changeStream: ChangeStream) { - if (changeStream.isIterator) { - throw new MongoDriverError( - 'Cannot use ChangeStream as an EventEmitter after using as an iterator' - ); - } -} /** * Safely provides a cursor across resume attempts * From 0d9d213a9fb7fb65943c185291b1569ce4b4cbe0 Mon Sep 17 00:00:00 2001 From: Warren James Date: Wed, 7 Jul 2021 10:25:55 -0400 Subject: [PATCH 09/12] refactor: change _setIsEmitter and _setIsIterator methods to helper functions --- src/change_stream.ts | 43 +++++++++++++++++++++---------------------- 1 file changed, 21 insertions(+), 22 deletions(-) diff --git a/src/change_stream.ts b/src/change_stream.ts index e0cc3824dd..cd794f6188 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -301,7 +301,7 @@ export class ChangeStream extends TypedEventEmitter | void { - this._setIsIterator(); + setIsIterator(this); return maybePromise(callback, cb => { getCursor(this, (err, cursor) => { if (err || !cursor) return cb(err); // failed to resume, raise an error @@ -316,7 +316,7 @@ export class ChangeStream extends TypedEventEmitter> ): Promise> | void { - this._setIsIterator(); + setIsIterator(this); return maybePromise(callback, cb => { getCursor(this, (err, cursor) => { if (err || !cursor) return cb(err); // failed to resume, raise an error @@ -371,7 +371,7 @@ export class ChangeStream extends TypedEventEmitter; tryNext(callback: Callback): void; tryNext(callback?: Callback): Promise | void { - this._setIsIterator(); + setIsIterator(this); return maybePromise(callback, cb => { getCursor(this, (err, cursor) => { if (err || !cursor) return cb(err); // failed to resume, raise an error @@ -379,24 +379,6 @@ export class ChangeStream extends TypedEventEmitter(changeStream: ChangeStream): void { + if (changeStream[kMode] === 'iterator') { + throw new MongoDriverError( + 'Cannot use ChangeStream as an EventEmitter after using as an iterator' + ); + } + changeStream[kMode] = 'emitter'; +} + +function setIsIterator(changeStream: ChangeStream): void { + if (changeStream[kMode] === 'emitter') { + throw new MongoDriverError( + 'Cannot use ChangeStream as iterator after using as an EventEmitter' + ); + } + changeStream[kMode] = 'iterator'; +} /** * Create a new change stream cursor based on self's configuration * @internal @@ -653,7 +652,7 @@ function streamEvents( changeStream: ChangeStream, cursor: ChangeStreamCursor ): void { - changeStream._setIsEmitter(); + setIsEmitter(changeStream); const stream = changeStream[kCursorStream] || cursor.stream(); changeStream[kCursorStream] = stream; stream.on('data', change => processNewChange(changeStream, change)); From 5d0250769cf3dc163482e452d98238923a6aa595 Mon Sep 17 00:00:00 2001 From: Warren James Date: Wed, 7 Jul 2021 10:27:30 -0400 Subject: [PATCH 10/12] fix: Initialize changeStream mode to false in constructor --- src/change_stream.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/change_stream.ts b/src/change_stream.ts index cd794f6188..d3d83439b3 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -274,6 +274,7 @@ export class ChangeStream extends TypedEventEmitter { From df93e3cc2209e4703003e75be62ae54a2ac2f1d4 Mon Sep 17 00:00:00 2001 From: Warren James Date: Wed, 7 Jul 2021 12:13:49 -0400 Subject: [PATCH 11/12] test: clean up tests --- test/functional/change_stream.test.js | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index da831eb867..6cd7ccb465 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -1814,18 +1814,16 @@ describe('Change Streams', function () { if (repeatInsert) { clearInterval(repeatInsert); } - if (changeStream && !changeStream.closed) { + if (changeStream) { await changeStream.close(); } + await mock.cleanup(); if (client) { await client.close(); } - - await mock.cleanup(); }); - // TODO: Better Errors it( 'should throw MongoDriverError when set as an emitter with "on" and used as an iterator with "hasNext" using promises', { @@ -2145,7 +2143,6 @@ describe('Change Streams', function () { this.changeStream.on('resumeTokenChanged', resumeToken => { this.resumeTokenChangedEvents.push({ resumeToken }); }); - this.changeStream.isEmitter = false; return this.changeStream; } From ce2cbe7c560088ae0643b863f4b583daa942c660 Mon Sep 17 00:00:00 2001 From: Warren James Date: Wed, 7 Jul 2021 14:58:39 -0400 Subject: [PATCH 12/12] test: Remove unneeded tests --- test/functional/change_stream.test.js | 71 ++------------------------- 1 file changed, 4 insertions(+), 67 deletions(-) diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index 6cd7ccb465..accb54c65c 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -1825,7 +1825,7 @@ describe('Change Streams', function () { }); it( - 'should throw MongoDriverError when set as an emitter with "on" and used as an iterator with "hasNext" using promises', + 'should throw MongoDriverError when set as an emitter with "on" and used as an iterator with "hasNext"', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, test: async function () { @@ -1843,23 +1843,7 @@ describe('Change Streams', function () { ); it( - 'should throw MongoDriverError when set as an emitter with "on" and used as an iterator with "hasNext" using callbacks', - { - metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, - test: async function () { - await new Promise(resolve => changeStream.on('change', resolve)); - - try { - await new Promise(resolve => changeStream.hasNext(resolve)); - } catch (error) { - return expect(error).to.be.instanceof(MongoDriverError); - } - return expect.fail('Should not reach here'); - } - } - ); - it( - 'should throw MongoDriverError when set as an iterator with "hasNext" and used as an emitter with "on" using promises', + 'should throw MongoDriverError when set as an iterator with "hasNext" and used as an emitter with "on"', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, test: async function () { @@ -1877,22 +1861,7 @@ describe('Change Streams', function () { ); it( - 'should throw MongoDriverError when set as an iterator with "hasNext" and used as an emitter with "on" using callbacks', - { - metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, - test: async function () { - await new Promise(resolve => changeStream.hasNext(resolve)); - try { - await new Promise(resolve => changeStream.on('change', resolve)); - } catch (error) { - return expect(error).to.be.instanceof(MongoDriverError); - } - return expect.fail('Should not reach here'); - } - } - ); - it( - 'should throw MongoDriverError when set as an emitter with "once" and used as an iterator with "next" using promises', + 'should throw MongoDriverError when set as an emitter with "once" and used as an iterator with "next"', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, test: async function () { @@ -1910,23 +1879,7 @@ describe('Change Streams', function () { ); it( - 'should throw MongoDriverError when set as an emitter with "once" and used as an iterator with "next" using callbacks', - { - metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, - test: async function () { - await new Promise(resolve => changeStream.once('change', resolve)); - - try { - await new Promise(resolve => changeStream.next(resolve)); - } catch (error) { - return expect(error).to.be.instanceof(MongoDriverError); - } - return expect.fail('Should not reach here'); - } - } - ); - it( - 'should throw MongoDriverError when set as an iterator with "tryNext" and used as an emitter with "on" using promises', + 'should throw MongoDriverError when set as an iterator with "tryNext" and used as an emitter with "on"', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, test: async function () { @@ -1942,22 +1895,6 @@ describe('Change Streams', function () { } } ); - - it( - 'should throw MongoDriverError when set as an iterator with "tryNext" and used as an emitter with "on" using callbacks', - { - metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, - test: async function () { - await new Promise(resolve => changeStream.tryNext(resolve)); - try { - await new Promise(resolve => changeStream.on('change', resolve)); - } catch (error) { - return expect(error).to.be.instanceof(MongoDriverError); - } - return expect.fail('Should not reach here'); - } - } - ); }); describe('should properly handle a changeStream event being processed mid-close', function () {