From 5052f6b88ec1c3de29a4bd392791df8f7d419227 Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Thu, 19 May 2022 14:51:54 +0200 Subject: [PATCH 1/7] feat(NODE-4139): streaming protocol message changes: When the monitor detects it is using the streaming protocol we now tell the message stream to only ever emit the last message in the buffer. --- src/cmap/connection.ts | 10 ++++++++ src/cmap/message_stream.ts | 35 +++++++++++++++++++++------ src/sdam/monitor.ts | 4 +++ test/unit/cmap/message_stream.test.js | 28 +++++++++++++++++++++ 4 files changed, 70 insertions(+), 7 deletions(-) diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index 670fcd4994..608e55d08a 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -293,6 +293,16 @@ export class Connection extends TypedEventEmitter { this[kHello] = response; } + // Set the whether the message stream is for a monitoring connection using the + // streaming protocol. + set isStreamingProtocol(value: boolean) { + this[kMessageStream].isStreamingProtocol = value; + } + + get isStreamingProtocol(): boolean { + return this[kMessageStream].isStreamingProtocol; + } + get serviceId(): ObjectId | undefined { return this.hello?.serviceId; } diff --git a/src/cmap/message_stream.ts b/src/cmap/message_stream.ts index 55ed847e95..43df3dc0b8 100644 --- a/src/cmap/message_stream.ts +++ b/src/cmap/message_stream.ts @@ -53,6 +53,8 @@ export class MessageStream extends Duplex { maxBsonMessageSize: number; /** @internal */ [kBuffer]: BufferPool; + /** @internal */ + isStreamingProtocol = false; constructor(options: MessageStreamOptions = {}) { super(options); @@ -60,6 +62,10 @@ export class MessageStream extends Duplex { this[kBuffer] = new BufferPool(); } + get buffer(): BufferPool { + return this[kBuffer]; + } + override _write(chunk: Buffer, _: unknown, callback: Callback): void { this[kBuffer].append(chunk); processIncomingData(this, callback); @@ -165,12 +171,20 @@ function processIncomingData(stream: MessageStream, callback: Callback) let ResponseType = messageHeader.opCode === OP_MSG ? BinMsg : Response; if (messageHeader.opCode !== OP_COMPRESSED) { const messageBody = message.slice(MESSAGE_HEADER_SIZE); - stream.emit('message', new ResponseType(message, messageHeader, messageBody)); - if (buffer.length >= 4) { + // If we are a monitoring message stream using the streaming protocol and + // there is more in the buffer that can be read, skip processing since we + // want the last hello command response that is in the buffer. + if (stream.isStreamingProtocol && buffer.length >= 4) { processIncomingData(stream, callback); } else { - callback(); + stream.emit('message', new ResponseType(message, messageHeader, messageBody)); + + if (buffer.length >= 4) { + processIncomingData(stream, callback); + } else { + callback(); + } } return; @@ -198,12 +212,19 @@ function processIncomingData(stream: MessageStream, callback: Callback) return; } - stream.emit('message', new ResponseType(message, messageHeader, messageBody)); - - if (buffer.length >= 4) { + // If we are a monitoring message stream using the streaming protocol and + // there is more in the buffer that can be read, skip processing since we + // want the last hello command response that is in the buffer. + if (stream.isStreamingProtocol && buffer.length >= 4) { processIncomingData(stream, callback); } else { - callback(); + stream.emit('message', new ResponseType(message, messageHeader, messageBody)); + + if (buffer.length >= 4) { + processIncomingData(stream, callback); + } else { + callback(); + } } }); } diff --git a/src/sdam/monitor.ts b/src/sdam/monitor.ts index 92d626bd91..972f26fc0e 100644 --- a/src/sdam/monitor.ts +++ b/src/sdam/monitor.ts @@ -279,6 +279,10 @@ function checkServer(monitor: Monitor, callback: Callback) { // if we are using the streaming protocol then we immediately issue another `started` // event, otherwise the "check" is complete and return to the main monitor loop if (isAwaitable && hello.topologyVersion) { + // Tell the connection that we are using the streaming protocol so that the + // connection's message stream will only read the last hello on the buffer. + connection.isStreamingProtocol = true; + monitor.emit( Server.SERVER_HEARTBEAT_STARTED, new ServerHeartbeatStartedEvent(monitor.address) diff --git a/test/unit/cmap/message_stream.test.js b/test/unit/cmap/message_stream.test.js index ca8ab447f4..ff7f45f9b5 100644 --- a/test/unit/cmap/message_stream.test.js +++ b/test/unit/cmap/message_stream.test.js @@ -19,6 +19,34 @@ function bufferToStream(buffer) { } describe('Message Stream', function () { + context('when the stream uses the streaming protocol', function () { + const data = Buffer.from( + '370000000100000001000000010000000000000000000000000000000000000001000000130000001069736d6173746572000100000000' + + '370000000100000001000000010000000000000000000000000000000000000001000000130000001069736d6173746572000100000000' + + '370000000100000001000000010000000000000000000000000000000000000001000000130000001069736d6173746572000100000000' + + '370000000100000001000000010000000000000000000000000000000000000001000000130000001069736d6173746572000100000000', + 'hex' + ); + + it('only reads the last message in the buffer', function (done) { + const inputStream = bufferToStream(data); + const messageStream = new MessageStream(); + messageStream.isStreamingProtocol = true; + + messageStream.once('message', msg => { + msg.parse(); + expect(msg) + .to.have.property('documents') + .that.deep.equals([{ [LEGACY_HELLO_COMMAND]: 1 }]); + // Make sure there is nothing left in the buffer. + expect(messageStream.buffer.length).to.equal(0); + done(); + }); + + inputStream.pipe(messageStream); + }); + }); + describe('reading', function () { [ { From 6dc0025cc20f65d577c3d224746d8387f31900e8 Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Thu, 19 May 2022 21:58:36 +0200 Subject: [PATCH 2/7] test(NODE-4139): refactor message stream tests --- test/tools/utils.ts | 21 ++++ test/unit/cmap/message_stream.test.js | 167 ++++++++++---------------- 2 files changed, 84 insertions(+), 104 deletions(-) diff --git a/test/tools/utils.ts b/test/tools/utils.ts index de5addfebc..95b61773d3 100644 --- a/test/tools/utils.ts +++ b/test/tools/utils.ts @@ -1,7 +1,10 @@ import { EJSON } from 'bson'; +import * as BSON from 'bson'; import { expect } from 'chai'; import { inspect, promisify } from 'util'; +import { OP_MSG } from '../../src/cmap/wire_protocol/constants'; +import { Document } from '../../src/index'; import { Logger } from '../../src/logger'; import { deprecateOptions, DeprecateOptionsConfig } from '../../src/utils'; import { runUnifiedSuite } from './unified-spec-runner/runner'; @@ -343,6 +346,24 @@ export class TestBuilder { } } +export function generateOpMsgBuffer(document: Document): Buffer { + const header = Buffer.alloc(4 * 4 + 4); + + const typeBuffer = Buffer.alloc(1); + typeBuffer[0] = 0; + + const docBuffer = BSON.serialize(document); + + const totalLength = header.length + typeBuffer.length + docBuffer.length; + + header.writeInt32LE(totalLength, 0); + header.writeInt32LE(0, 4); + header.writeInt32LE(0, 8); + header.writeInt32LE(OP_MSG, 12); + header.writeUInt32LE(0, 16); + return Buffer.concat([header, typeBuffer, docBuffer]); +} + export class UnifiedTestSuiteBuilder { private _description = 'Default Description'; private _schemaVersion = '1.0'; diff --git a/test/unit/cmap/message_stream.test.js b/test/unit/cmap/message_stream.test.js index ff7f45f9b5..dc474b1c53 100644 --- a/test/unit/cmap/message_stream.test.js +++ b/test/unit/cmap/message_stream.test.js @@ -5,6 +5,7 @@ const { MessageStream } = require('../../../src/cmap/message_stream'); const { Msg } = require('../../../src/cmap/commands'); const expect = require('chai').expect; const { LEGACY_HELLO_COMMAND } = require('../../../src/constants'); +const { generateOpMsgBuffer } = require('../../tools/utils'); function bufferToStream(buffer) { const stream = new Readable(); @@ -18,26 +19,27 @@ function bufferToStream(buffer) { return stream; } -describe('Message Stream', function () { +describe('MessageStream', function () { context('when the stream uses the streaming protocol', function () { - const data = Buffer.from( - '370000000100000001000000010000000000000000000000000000000000000001000000130000001069736d6173746572000100000000' + - '370000000100000001000000010000000000000000000000000000000000000001000000130000001069736d6173746572000100000000' + - '370000000100000001000000010000000000000000000000000000000000000001000000130000001069736d6173746572000100000000' + - '370000000100000001000000010000000000000000000000000000000000000001000000130000001069736d6173746572000100000000', - 'hex' - ); + const response = { isWritablePrimary: true }; + let firstHello; + let secondHello; + let thirdHello; + + beforeEach(function () { + firstHello = generateOpMsgBuffer(response); + secondHello = generateOpMsgBuffer(response); + thirdHello = generateOpMsgBuffer(response); + }); it('only reads the last message in the buffer', function (done) { - const inputStream = bufferToStream(data); + const inputStream = bufferToStream(Buffer.concat([firstHello, secondHello, thirdHello])); const messageStream = new MessageStream(); messageStream.isStreamingProtocol = true; messageStream.once('message', msg => { msg.parse(); - expect(msg) - .to.have.property('documents') - .that.deep.equals([{ [LEGACY_HELLO_COMMAND]: 1 }]); + expect(msg).to.have.property('documents').that.deep.equals([response]); // Make sure there is nothing left in the buffer. expect(messageStream.buffer.length).to.equal(0); done(); @@ -47,116 +49,73 @@ describe('Message Stream', function () { }); }); - describe('reading', function () { - [ - { - description: 'valid OP_REPLY', - data: Buffer.from( - '370000000100000001000000010000000000000000000000000000000000000001000000130000001069736d6173746572000100000000', - 'hex' - ), - documents: [{ [LEGACY_HELLO_COMMAND]: 1 }] - }, - { - description: 'valid multiple OP_REPLY', - expectedMessageCount: 4, - data: Buffer.from( - '370000000100000001000000010000000000000000000000000000000000000001000000130000001069736d6173746572000100000000' + - '370000000100000001000000010000000000000000000000000000000000000001000000130000001069736d6173746572000100000000' + - '370000000100000001000000010000000000000000000000000000000000000001000000130000001069736d6173746572000100000000' + - '370000000100000001000000010000000000000000000000000000000000000001000000130000001069736d6173746572000100000000', - 'hex' - ), - documents: [{ [LEGACY_HELLO_COMMAND]: 1 }] - }, - { - description: 'valid OP_REPLY (partial)', - data: [ - Buffer.from('37', 'hex'), - Buffer.from('0000', 'hex'), - Buffer.from( - '000100000001000000010000000000000000000000000000000000000001000000130000001069736d6173746572000100000000', - 'hex' - ) - ], - documents: [{ [LEGACY_HELLO_COMMAND]: 1 }] - }, - - { - description: 'valid OP_MSG', - data: Buffer.from( - '370000000100000000000000dd0700000000000000220000001069736d6173746572000100000002246462000600000061646d696e0000', - 'hex' - ), - documents: [{ $db: 'admin', [LEGACY_HELLO_COMMAND]: 1 }] - }, - { - description: 'valid multiple OP_MSG', - expectedMessageCount: 4, - data: Buffer.from( - '370000000100000000000000dd0700000000000000220000001069736d6173746572000100000002246462000600000061646d696e0000' + - '370000000100000000000000dd0700000000000000220000001069736d6173746572000100000002246462000600000061646d696e0000' + - '370000000100000000000000dd0700000000000000220000001069736d6173746572000100000002246462000600000061646d696e0000' + - '370000000100000000000000dd0700000000000000220000001069736d6173746572000100000002246462000600000061646d696e0000', - 'hex' - ), - documents: [{ $db: 'admin', [LEGACY_HELLO_COMMAND]: 1 }] - }, - - { - description: 'Invalid message size (negative)', - data: Buffer.from('ffffffff', 'hex'), - error: 'Invalid message size: -1' - }, - { - description: 'Invalid message size (exceeds maximum)', - data: Buffer.from('01000004', 'hex'), - error: 'Invalid message size: 67108865, max allowed: 67108864' - } - ].forEach(test => { - it(test.description, function (done) { - const error = test.error; - const expectedMessageCount = test.expectedMessageCount || 1; - const inputStream = bufferToStream(test.data); + context('when the stream is not using the streaming protocol', function () { + context('when the messages are valid', function () { + const response = { isWritablePrimary: true }; + let firstHello; + let secondHello; + let thirdHello; + let messageCount = 0; + + beforeEach(function () { + firstHello = generateOpMsgBuffer(response); + secondHello = generateOpMsgBuffer(response); + thirdHello = generateOpMsgBuffer(response); + }); + + it('reads all messages in the buffer', function (done) { + const inputStream = bufferToStream(Buffer.concat([firstHello, secondHello, thirdHello])); const messageStream = new MessageStream(); - let messageCount = 0; messageStream.on('message', msg => { messageCount++; - if (error) { - done(new Error(`expected error: ${error}`)); - return; + msg.parse(); + expect(msg).to.have.property('documents').that.deep.equals([response]); + // Test will not complete until 3 messages processed. + if (messageCount === 3) { + done(); } + }); - msg.parse(); + inputStream.pipe(messageStream); + }); + }); - if (test.documents) { - expect(msg).to.have.property('documents').that.deep.equals(test.documents); - } + context('when the messages are invalid', function () { + context('when the message size is negative', function () { + it('emits an error', function (done) { + const inputStream = bufferToStream(Buffer.from('ffffffff', 'hex')); + const messageStream = new MessageStream(); - if (messageCount === expectedMessageCount) { + messageStream.on('error', err => { + expect(err).to.have.property('message').that.equals('Invalid message size: -1'); done(); - } + }); + + inputStream.pipe(messageStream); }); + }); - messageStream.on('error', err => { - if (error == null) { - done(err); - return; - } + context('when the message size exceeds the bson maximum', function () { + it('emits an error', function (done) { + const inputStream = bufferToStream(Buffer.from('01000004', 'hex')); + const messageStream = new MessageStream(); - expect(err).to.have.property('message').that.equals(error); + messageStream.on('error', err => { + expect(err) + .to.have.property('message') + .that.equals('Invalid message size: 67108865, max allowed: 67108864'); + done(); + }); - done(); + inputStream.pipe(messageStream); }); - - inputStream.pipe(messageStream); }); }); }); - describe('writing', function () { - it('should write a message to the stream', function (done) { + context('when writing to the message stream', function () { + it('pushes the message', function (done) { const readableStream = new Readable({ read() {} }); const writeableStream = new Writable({ write: (chunk, _, callback) => { From 562b63ab73e5dd0e4e28bea3a1f46ecbc33aa3ca Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Fri, 20 May 2022 15:10:32 +0200 Subject: [PATCH 3/7] test(NODE-4139): refactor to async/await --- test/unit/cmap/message_stream.test.js | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/test/unit/cmap/message_stream.test.js b/test/unit/cmap/message_stream.test.js index dc474b1c53..c763227191 100644 --- a/test/unit/cmap/message_stream.test.js +++ b/test/unit/cmap/message_stream.test.js @@ -1,6 +1,7 @@ 'use strict'; -const Readable = require('stream').Readable; -const Writable = require('stream').Writable; +const { once } = require('events'); +const { Readable, Writable } = require('stream'); + const { MessageStream } = require('../../../src/cmap/message_stream'); const { Msg } = require('../../../src/cmap/commands'); const expect = require('chai').expect; @@ -32,20 +33,18 @@ describe('MessageStream', function () { thirdHello = generateOpMsgBuffer(response); }); - it('only reads the last message in the buffer', function (done) { + it('only reads the last message in the buffer', async function () { const inputStream = bufferToStream(Buffer.concat([firstHello, secondHello, thirdHello])); const messageStream = new MessageStream(); messageStream.isStreamingProtocol = true; - messageStream.once('message', msg => { - msg.parse(); - expect(msg).to.have.property('documents').that.deep.equals([response]); - // Make sure there is nothing left in the buffer. - expect(messageStream.buffer.length).to.equal(0); - done(); - }); - inputStream.pipe(messageStream); + const messages = await once(messageStream, 'message'); + const msg = messages[0]; + msg.parse(); + expect(msg).to.have.property('documents').that.deep.equals([response]); + // Make sure there is nothing left in the buffer. + expect(messageStream.buffer.length).to.equal(0); }); }); From ca6e0aac5aafd8b8e3b6aa0d8ddfcb8b49ca51c9 Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Mon, 23 May 2022 15:11:48 +0200 Subject: [PATCH 4/7] fix(NODE-41439): only process when on last hello --- src/cmap/message_stream.ts | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/src/cmap/message_stream.ts b/src/cmap/message_stream.ts index 43df3dc0b8..7c16573c5d 100644 --- a/src/cmap/message_stream.ts +++ b/src/cmap/message_stream.ts @@ -168,6 +168,20 @@ function processIncomingData(stream: MessageStream, callback: Callback) opCode: message.readInt32LE(12) }; + const streamingProtocolHasAnotherHello = () => { + if (stream.isStreamingProtocol) { + // Can we read the next message size? + if (buffer.length >= 4) { + const sizeOfMessage = buffer.peek(4).readInt32LE(); + if (sizeOfMessage < buffer.length) { + return true; + } + return false; + } + } + return false; + } + let ResponseType = messageHeader.opCode === OP_MSG ? BinMsg : Response; if (messageHeader.opCode !== OP_COMPRESSED) { const messageBody = message.slice(MESSAGE_HEADER_SIZE); @@ -175,7 +189,7 @@ function processIncomingData(stream: MessageStream, callback: Callback) // If we are a monitoring message stream using the streaming protocol and // there is more in the buffer that can be read, skip processing since we // want the last hello command response that is in the buffer. - if (stream.isStreamingProtocol && buffer.length >= 4) { + if (streamingProtocolHasAnotherHello()) { processIncomingData(stream, callback); } else { stream.emit('message', new ResponseType(message, messageHeader, messageBody)); @@ -215,7 +229,7 @@ function processIncomingData(stream: MessageStream, callback: Callback) // If we are a monitoring message stream using the streaming protocol and // there is more in the buffer that can be read, skip processing since we // want the last hello command response that is in the buffer. - if (stream.isStreamingProtocol && buffer.length >= 4) { + if (streamingProtocolHasAnotherHello()) { processIncomingData(stream, callback); } else { stream.emit('message', new ResponseType(message, messageHeader, messageBody)); From 859d9e0641532c624056e4f825a84531ea8708cb Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Mon, 23 May 2022 15:24:21 +0200 Subject: [PATCH 5/7] test(NODE-4139): make all tests async --- src/cmap/message_stream.ts | 2 +- test/unit/cmap/message_stream.test.js | 57 +++++++++++++++++---------- 2 files changed, 37 insertions(+), 22 deletions(-) diff --git a/src/cmap/message_stream.ts b/src/cmap/message_stream.ts index 7c16573c5d..e9892fabbf 100644 --- a/src/cmap/message_stream.ts +++ b/src/cmap/message_stream.ts @@ -180,7 +180,7 @@ function processIncomingData(stream: MessageStream, callback: Callback) } } return false; - } + }; let ResponseType = messageHeader.opCode === OP_MSG ? BinMsg : Response; if (messageHeader.opCode !== OP_COMPRESSED) { diff --git a/test/unit/cmap/message_stream.test.js b/test/unit/cmap/message_stream.test.js index c763227191..40e5567fb6 100644 --- a/test/unit/cmap/message_stream.test.js +++ b/test/unit/cmap/message_stream.test.js @@ -1,5 +1,5 @@ 'use strict'; -const { once } = require('events'); +const { on, once } = require('events'); const { Readable, Writable } = require('stream'); const { MessageStream } = require('../../../src/cmap/message_stream'); @@ -26,11 +26,14 @@ describe('MessageStream', function () { let firstHello; let secondHello; let thirdHello; + let partial; beforeEach(function () { firstHello = generateOpMsgBuffer(response); secondHello = generateOpMsgBuffer(response); thirdHello = generateOpMsgBuffer(response); + partial = Buffer.alloc(5); + partial.writeInt32LE(100, 0); }); it('only reads the last message in the buffer', async function () { @@ -46,6 +49,22 @@ describe('MessageStream', function () { // Make sure there is nothing left in the buffer. expect(messageStream.buffer.length).to.equal(0); }); + + it('does not read partial messages', async function () { + const inputStream = bufferToStream( + Buffer.concat([firstHello, secondHello, thirdHello, partial]) + ); + const messageStream = new MessageStream(); + messageStream.isStreamingProtocol = true; + + inputStream.pipe(messageStream); + const messages = await once(messageStream, 'message'); + const msg = messages[0]; + msg.parse(); + expect(msg).to.have.property('documents').that.deep.equals([response]); + // Make sure the buffer wasn't read to the end. + expect(messageStream.buffer.length).to.equal(5); + }); }); context('when the stream is not using the streaming protocol', function () { @@ -62,52 +81,48 @@ describe('MessageStream', function () { thirdHello = generateOpMsgBuffer(response); }); - it('reads all messages in the buffer', function (done) { + it('reads all messages in the buffer', async function () { const inputStream = bufferToStream(Buffer.concat([firstHello, secondHello, thirdHello])); const messageStream = new MessageStream(); - messageStream.on('message', msg => { + inputStream.pipe(messageStream); + for await (const messages of on(messageStream, 'message')) { messageCount++; + const msg = messages[0]; msg.parse(); expect(msg).to.have.property('documents').that.deep.equals([response]); // Test will not complete until 3 messages processed. if (messageCount === 3) { - done(); + return; } - }); - - inputStream.pipe(messageStream); + } }); }); context('when the messages are invalid', function () { context('when the message size is negative', function () { - it('emits an error', function (done) { + it('emits an error', async function () { const inputStream = bufferToStream(Buffer.from('ffffffff', 'hex')); const messageStream = new MessageStream(); - messageStream.on('error', err => { - expect(err).to.have.property('message').that.equals('Invalid message size: -1'); - done(); - }); - inputStream.pipe(messageStream); + const errors = await once(messageStream, 'error'); + const err = errors[0]; + expect(err).to.have.property('message').that.equals('Invalid message size: -1'); }); }); context('when the message size exceeds the bson maximum', function () { - it('emits an error', function (done) { + it('emits an error', async function () { const inputStream = bufferToStream(Buffer.from('01000004', 'hex')); const messageStream = new MessageStream(); - messageStream.on('error', err => { - expect(err) - .to.have.property('message') - .that.equals('Invalid message size: 67108865, max allowed: 67108864'); - done(); - }); - inputStream.pipe(messageStream); + const errors = await once(messageStream, 'error'); + const err = errors[0]; + expect(err) + .to.have.property('message') + .that.equals('Invalid message size: 67108865, max allowed: 67108864'); }); }); }); From 96211293f72439fceaaacb383d6a646b25e4b2af Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Mon, 23 May 2022 16:11:49 +0200 Subject: [PATCH 6/7] fix(NODE-4139): set flag on all monitor connections --- src/cmap/connection.ts | 11 +++++------ src/cmap/message_stream.ts | 15 +++++++-------- src/sdam/monitor.ts | 12 ++++++++---- test/unit/cmap/message_stream.test.js | 8 ++++---- test/unit/sdam/monitor.test.js | 5 ++++- 5 files changed, 28 insertions(+), 23 deletions(-) diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index 608e55d08a..881105548e 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -293,14 +293,13 @@ export class Connection extends TypedEventEmitter { this[kHello] = response; } - // Set the whether the message stream is for a monitoring connection using the - // streaming protocol. - set isStreamingProtocol(value: boolean) { - this[kMessageStream].isStreamingProtocol = value; + // Set the whether the message stream is for a monitoring connection. + set isMonitoringConnection(value: boolean) { + this[kMessageStream].isMonitoringConnection = value; } - get isStreamingProtocol(): boolean { - return this[kMessageStream].isStreamingProtocol; + get isMonitoringConnection(): boolean { + return this[kMessageStream].isMonitoringConnection; } get serviceId(): ObjectId | undefined { diff --git a/src/cmap/message_stream.ts b/src/cmap/message_stream.ts index e9892fabbf..8b90aa726e 100644 --- a/src/cmap/message_stream.ts +++ b/src/cmap/message_stream.ts @@ -54,7 +54,7 @@ export class MessageStream extends Duplex { /** @internal */ [kBuffer]: BufferPool; /** @internal */ - isStreamingProtocol = false; + isMonitoringConnection = false; constructor(options: MessageStreamOptions = {}) { super(options); @@ -168,15 +168,14 @@ function processIncomingData(stream: MessageStream, callback: Callback) opCode: message.readInt32LE(12) }; - const streamingProtocolHasAnotherHello = () => { - if (stream.isStreamingProtocol) { + const monitorHasAnotherHello = () => { + if (stream.isMonitoringConnection) { // Can we read the next message size? if (buffer.length >= 4) { const sizeOfMessage = buffer.peek(4).readInt32LE(); if (sizeOfMessage < buffer.length) { return true; } - return false; } } return false; @@ -186,10 +185,10 @@ function processIncomingData(stream: MessageStream, callback: Callback) if (messageHeader.opCode !== OP_COMPRESSED) { const messageBody = message.slice(MESSAGE_HEADER_SIZE); - // If we are a monitoring message stream using the streaming protocol and + // If we are a monitoring connection message stream and // there is more in the buffer that can be read, skip processing since we // want the last hello command response that is in the buffer. - if (streamingProtocolHasAnotherHello()) { + if (monitorHasAnotherHello()) { processIncomingData(stream, callback); } else { stream.emit('message', new ResponseType(message, messageHeader, messageBody)); @@ -226,10 +225,10 @@ function processIncomingData(stream: MessageStream, callback: Callback) return; } - // If we are a monitoring message stream using the streaming protocol and + // If we are a monitoring connection message stream and // there is more in the buffer that can be read, skip processing since we // want the last hello command response that is in the buffer. - if (streamingProtocolHasAnotherHello()) { + if (monitorHasAnotherHello()) { processIncomingData(stream, callback); } else { stream.emit('message', new ResponseType(message, messageHeader, messageBody)); diff --git a/src/sdam/monitor.ts b/src/sdam/monitor.ts index 972f26fc0e..1cab4b5d16 100644 --- a/src/sdam/monitor.ts +++ b/src/sdam/monitor.ts @@ -88,6 +88,10 @@ export class Monitor extends TypedEventEmitter { [kMonitorId]?: InterruptibleAsyncInterval; [kRTTPinger]?: RTTPinger; + get connection(): Connection | undefined { + return this[kConnection]; + } + constructor(server: Server, options: MonitorOptions) { super(); @@ -279,10 +283,6 @@ function checkServer(monitor: Monitor, callback: Callback) { // if we are using the streaming protocol then we immediately issue another `started` // event, otherwise the "check" is complete and return to the main monitor loop if (isAwaitable && hello.topologyVersion) { - // Tell the connection that we are using the streaming protocol so that the - // connection's message stream will only read the last hello on the buffer. - connection.isStreamingProtocol = true; - monitor.emit( Server.SERVER_HEARTBEAT_STARTED, new ServerHeartbeatStartedEvent(monitor.address) @@ -314,6 +314,10 @@ function checkServer(monitor: Monitor, callback: Callback) { } if (conn) { + // Tell the connection that we are using the streaming protocol so that the + // connection's message stream will only read the last hello on the buffer. + conn.isMonitoringConnection = true; + if (isInCloseState(monitor)) { conn.destroy({ force: true }); return; diff --git a/test/unit/cmap/message_stream.test.js b/test/unit/cmap/message_stream.test.js index 40e5567fb6..87c82640c3 100644 --- a/test/unit/cmap/message_stream.test.js +++ b/test/unit/cmap/message_stream.test.js @@ -21,7 +21,7 @@ function bufferToStream(buffer) { } describe('MessageStream', function () { - context('when the stream uses the streaming protocol', function () { + context('when the stream is for a monitoring connection', function () { const response = { isWritablePrimary: true }; let firstHello; let secondHello; @@ -39,7 +39,7 @@ describe('MessageStream', function () { it('only reads the last message in the buffer', async function () { const inputStream = bufferToStream(Buffer.concat([firstHello, secondHello, thirdHello])); const messageStream = new MessageStream(); - messageStream.isStreamingProtocol = true; + messageStream.isMonitoringConnection = true; inputStream.pipe(messageStream); const messages = await once(messageStream, 'message'); @@ -55,7 +55,7 @@ describe('MessageStream', function () { Buffer.concat([firstHello, secondHello, thirdHello, partial]) ); const messageStream = new MessageStream(); - messageStream.isStreamingProtocol = true; + messageStream.isMonitoringConnection = true; inputStream.pipe(messageStream); const messages = await once(messageStream, 'message'); @@ -67,7 +67,7 @@ describe('MessageStream', function () { }); }); - context('when the stream is not using the streaming protocol', function () { + context('when the stream is not for a monitoring connection', function () { context('when the messages are valid', function () { const response = { isWritablePrimary: true }; let firstHello; diff --git a/test/unit/sdam/monitor.test.js b/test/unit/sdam/monitor.test.js index 2d7b786e1c..a161202dbe 100644 --- a/test/unit/sdam/monitor.test.js +++ b/test/unit/sdam/monitor.test.js @@ -115,7 +115,10 @@ describe('monitoring', function () { monitor = new Monitor(server, {}); monitor.on('serverHeartbeatFailed', () => done(new Error('unexpected heartbeat failure'))); - monitor.on('serverHeartbeatSucceeded', () => done()); + monitor.on('serverHeartbeatSucceeded', () => { + expect(monitor.connection.isMonitoringConnection).to.be.true; + done(); + }); monitor.connect(); }); From 71fd458585c90c098e9690394ac572400b9c0c28 Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Mon, 23 May 2022 18:51:02 +0200 Subject: [PATCH 7/7] test(NODE-4139): test for different last doc --- src/cmap/message_stream.ts | 2 +- test/unit/cmap/message_stream.test.js | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/cmap/message_stream.ts b/src/cmap/message_stream.ts index 8b90aa726e..542e35cf73 100644 --- a/src/cmap/message_stream.ts +++ b/src/cmap/message_stream.ts @@ -173,7 +173,7 @@ function processIncomingData(stream: MessageStream, callback: Callback) // Can we read the next message size? if (buffer.length >= 4) { const sizeOfMessage = buffer.peek(4).readInt32LE(); - if (sizeOfMessage < buffer.length) { + if (sizeOfMessage <= buffer.length) { return true; } } diff --git a/test/unit/cmap/message_stream.test.js b/test/unit/cmap/message_stream.test.js index 87c82640c3..3158a9144a 100644 --- a/test/unit/cmap/message_stream.test.js +++ b/test/unit/cmap/message_stream.test.js @@ -23,6 +23,7 @@ function bufferToStream(buffer) { describe('MessageStream', function () { context('when the stream is for a monitoring connection', function () { const response = { isWritablePrimary: true }; + const lastResponse = { ok: 1 }; let firstHello; let secondHello; let thirdHello; @@ -31,7 +32,7 @@ describe('MessageStream', function () { beforeEach(function () { firstHello = generateOpMsgBuffer(response); secondHello = generateOpMsgBuffer(response); - thirdHello = generateOpMsgBuffer(response); + thirdHello = generateOpMsgBuffer(lastResponse); partial = Buffer.alloc(5); partial.writeInt32LE(100, 0); }); @@ -45,7 +46,7 @@ describe('MessageStream', function () { const messages = await once(messageStream, 'message'); const msg = messages[0]; msg.parse(); - expect(msg).to.have.property('documents').that.deep.equals([response]); + expect(msg).to.have.property('documents').that.deep.equals([lastResponse]); // Make sure there is nothing left in the buffer. expect(messageStream.buffer.length).to.equal(0); }); @@ -61,7 +62,7 @@ describe('MessageStream', function () { const messages = await once(messageStream, 'message'); const msg = messages[0]; msg.parse(); - expect(msg).to.have.property('documents').that.deep.equals([response]); + expect(msg).to.have.property('documents').that.deep.equals([lastResponse]); // Make sure the buffer wasn't read to the end. expect(messageStream.buffer.length).to.equal(5); });