diff --git a/package.json b/package.json index 1bb55a3c693..b3e8400e77e 100644 --- a/package.json +++ b/package.json @@ -26,7 +26,6 @@ "bson-ext": "^2.0.0" }, "dependencies": { - "bl": "^2.2.1", "bson": "^4.0.4", "denque": "^1.4.1", "lodash": "^4.17.20" diff --git a/src/cmap/message_stream.ts b/src/cmap/message_stream.ts index 9878b2d907c..c8c8d3c1243 100644 --- a/src/cmap/message_stream.ts +++ b/src/cmap/message_stream.ts @@ -1,4 +1,3 @@ -import BufferList = require('bl'); import { Duplex, DuplexOptions } from 'stream'; import { Response, Msg, BinMsg, Query, WriteProtocolMessageType, MessageHeader } from './commands'; import { MongoError, MongoParseError } from '../error'; @@ -11,7 +10,7 @@ import { CompressorName } from './wire_protocol/compression'; import type { Document, BSONSerializeOptions } from '../bson'; -import type { Callback } from '../utils'; +import { BufferPool, Callback } from '../utils'; import type { ClientSession } from '../sessions'; const MESSAGE_HEADER_SIZE = 16; @@ -48,21 +47,19 @@ export interface OperationDescription extends BSONSerializeOptions { * @internal */ export class MessageStream extends Duplex { + /** @internal */ maxBsonMessageSize: number; - [kBuffer]: BufferList; + /** @internal */ + [kBuffer]: BufferPool; constructor(options: MessageStreamOptions = {}) { super(options); - this.maxBsonMessageSize = options.maxBsonMessageSize || kDefaultMaxBsonMessageSize; - - this[kBuffer] = new BufferList(); + this[kBuffer] = new BufferPool(); } _write(chunk: Buffer, _: unknown, callback: Callback): void { - const buffer = this[kBuffer]; - buffer.append(chunk); - + this[kBuffer].append(chunk); processIncomingData(this, callback); } @@ -135,7 +132,7 @@ function processIncomingData(stream: MessageStream, callback: Callback) return; } - const sizeOfMessage = buffer.readInt32LE(0); + const sizeOfMessage = buffer.peek(4).readInt32LE(); if (sizeOfMessage < 0) { callback(new MongoParseError(`Invalid message size: ${sizeOfMessage}`)); return; @@ -155,9 +152,7 @@ function processIncomingData(stream: MessageStream, callback: Callback) return; } - const message = buffer.slice(0, sizeOfMessage); - buffer.consume(sizeOfMessage); - + const message = buffer.read(sizeOfMessage); const messageHeader: MessageHeader = { length: message.readInt32LE(0), requestId: message.readInt32LE(4), diff --git a/src/index.ts b/src/index.ts index b4254ef2a56..ef9f9222941 100644 --- a/src/index.ts +++ b/src/index.ts @@ -292,7 +292,8 @@ export type { ClientMetadata, ClientMetadataOptions, MongoDBNamespace, - InterruptableAsyncInterval + InterruptibleAsyncInterval, + BufferPool } from './utils'; export type { WriteConcern, W, WriteConcernOptions } from './write_concern'; export type { ExecutionResult } from './operations/execute_operation'; diff --git a/src/sdam/monitor.ts b/src/sdam/monitor.ts index d7cf62688e1..6d4d2b99dfd 100644 --- a/src/sdam/monitor.ts +++ b/src/sdam/monitor.ts @@ -3,7 +3,7 @@ import { now, makeStateMachine, calculateDurationInMs, - makeInterruptableAsyncInterval + makeInterruptibleAsyncInterval } from '../utils'; import { EventEmitter } from 'events'; import { connect } from '../cmap/connect'; @@ -17,7 +17,7 @@ import { } from './events'; import { Server } from './server'; -import type { InterruptableAsyncInterval, Callback } from '../utils'; +import type { InterruptibleAsyncInterval, Callback } from '../utils'; import type { TopologyVersion } from './server_description'; import type { ConnectionOptions } from '../cmap/connection'; @@ -65,7 +65,7 @@ export class Monitor extends EventEmitter { [kConnection]?: Connection; [kCancellationToken]: EventEmitter; /** @internal */ - [kMonitorId]?: InterruptableAsyncInterval; + [kMonitorId]?: InterruptibleAsyncInterval; [kRTTPinger]?: RTTPinger; constructor(server: Server, options?: Partial) { @@ -123,7 +123,7 @@ export class Monitor extends EventEmitter { // start const heartbeatFrequencyMS = this.options.heartbeatFrequencyMS; const minHeartbeatFrequencyMS = this.options.minHeartbeatFrequencyMS; - this[kMonitorId] = makeInterruptableAsyncInterval(monitorServer(this), { + this[kMonitorId] = makeInterruptibleAsyncInterval(monitorServer(this), { interval: heartbeatFrequencyMS, minInterval: minHeartbeatFrequencyMS, immediate: true @@ -153,7 +153,7 @@ export class Monitor extends EventEmitter { // restart monitoring const heartbeatFrequencyMS = this.options.heartbeatFrequencyMS; const minHeartbeatFrequencyMS = this.options.minHeartbeatFrequencyMS; - this[kMonitorId] = makeInterruptableAsyncInterval(monitorServer(this), { + this[kMonitorId] = makeInterruptibleAsyncInterval(monitorServer(this), { interval: heartbeatFrequencyMS, minInterval: minHeartbeatFrequencyMS }); diff --git a/src/utils.ts b/src/utils.ts index 719d40f5b3a..d579ddce3ec 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -961,7 +961,7 @@ export function calculateDurationInMs(started: number): number { return elapsed < 0 ? 0 : elapsed; } -export interface InterruptableAsyncIntervalOptions { +export interface InterruptibleAsyncIntervalOptions { /** The interval to execute a method on */ interval: number; /** A minimum interval that must elapse before the method is called */ @@ -977,7 +977,7 @@ export interface InterruptableAsyncIntervalOptions { } /** @internal */ -export interface InterruptableAsyncInterval { +export interface InterruptibleAsyncInterval { wake(): void; stop(): void; } @@ -991,10 +991,10 @@ export interface InterruptableAsyncInterval { * * @param fn - An async function to run on an interval, must accept a `callback` as its only parameter */ -export function makeInterruptableAsyncInterval( +export function makeInterruptibleAsyncInterval( fn: (callback: Callback) => void, - options?: Partial -): InterruptableAsyncInterval { + options?: Partial +): InterruptibleAsyncInterval { let timerId: NodeJS.Timeout | undefined; let lastCallTime: number; let lastWakeTime: number; @@ -1126,3 +1126,101 @@ export function resolveOptions( return result; } + +const kBuffers = Symbol('buffers'); +const kLength = Symbol('length'); + +/** @internal */ +export class BufferPool { + [kBuffers]: Buffer[]; + [kLength]: number; + + constructor() { + this[kBuffers] = []; + this[kLength] = 0; + } + + get length(): number { + return this[kLength]; + } + + append(buffer: Buffer): void { + this[kBuffers].push(buffer); + this[kLength] += buffer.length; + } + + peek(size: number): Buffer { + return this.read(size, false); + } + + read(size: number, consume = true): Buffer { + if (typeof size !== 'number' || size < 0) { + throw new TypeError('Parameter size must be a non-negative number'); + } + + if (typeof size === 'number' && size < 0) { + size += this.length; + } + + if (size > this[kLength]) { + return Buffer.alloc(0); + } + + let result: Buffer; + + // read the whole buffer + if (size === this.length) { + result = Buffer.concat(this[kBuffers]); + + if (consume) { + this[kBuffers] = []; + this[kLength] = 0; + } + } + + // size is within first buffer, no need to concat + else if (size <= this[kBuffers][0].length) { + result = this[kBuffers][0].slice(0, size); + if (consume) { + this[kBuffers][0] = this[kBuffers][0].slice(size); + this[kLength] -= size; + } + } + + // size is beyond first buffer, need to track and copy + else { + result = Buffer.allocUnsafe(size); + + let idx; + let offset = 0; + let bytesToCopy = size; + for (idx = 0; idx < this[kBuffers].length; ++idx) { + let bytesCopied; + if (bytesToCopy > this[kBuffers][idx].length) { + bytesCopied = this[kBuffers][idx].copy(result, offset, 0); + offset += bytesCopied; + } else { + bytesCopied = this[kBuffers][idx].copy(result, offset, 0, bytesToCopy); + if (consume) { + this[kBuffers][idx] = this[kBuffers][idx].slice(0, bytesCopied); + } + offset += bytesCopied; + break; + } + + bytesToCopy -= bytesCopied; + } + + // compact the internal buffer array + if (consume) { + this[kBuffers] = this[kBuffers].slice(idx); + } + + if (result.length > offset) { + return result.slice(0, offset); + } + } + + return result; + } +} diff --git a/test/unit/utils.test.js b/test/unit/utils.test.js index bae78fd89f4..e7bdf6677f7 100644 --- a/test/unit/utils.test.js +++ b/test/unit/utils.test.js @@ -1,5 +1,5 @@ 'use strict'; -const { eachAsync, now, makeInterruptableAsyncInterval } = require('../../src/utils'); +const { eachAsync, now, makeInterruptableAsyncInterval, BufferPool } = require('../../src/utils'); const { expect } = require('chai'); const sinon = require('sinon'); @@ -161,4 +161,84 @@ describe('utils', function () { this.clock.tick(250); }); }); + + context('makeBufferPool', function () { + it('should report the correct length', function () { + const buffer = new BufferPool(); + buffer.append(Buffer.from([0, 1])); + buffer.append(Buffer.from([2, 3])); + buffer.append(Buffer.from([2, 3])); + expect(buffer).property('length').to.equal(6); + }); + + it('return an empty buffer if too many bytes requested', function () { + const buffer = new BufferPool(); + buffer.append(Buffer.from([0, 1, 2, 3])); + const data = buffer.read(6); + expect(data).to.have.length(0); + expect(buffer).property('length').to.equal(4); + }); + + context('peek', function () { + it('exact size', function () { + const buffer = new BufferPool(); + buffer.append(Buffer.from([0, 1])); + const data = buffer.peek(2); + expect(data).to.eql(Buffer.from([0, 1])); + expect(buffer).property('length').to.equal(2); + }); + + it('within first buffer', function () { + const buffer = new BufferPool(); + buffer.append(Buffer.from([0, 1, 2, 3])); + const data = buffer.peek(2); + expect(data).to.eql(Buffer.from([0, 1])); + expect(buffer).property('length').to.equal(4); + }); + + it('across multiple buffers', function () { + const buffer = new BufferPool(); + buffer.append(Buffer.from([0, 1, 2, 3, 4, 5])); + const data = buffer.peek(6); + expect(data).to.eql(Buffer.from([0, 1, 2, 3, 4, 5])); + expect(buffer).property('length').to.equal(6); + }); + }); + + context('read', function () { + it('should throw an error if a negative size is requested', function () { + const buffer = new BufferPool(); + expect(() => buffer.read(-1)).to.throw(/Parameter size must be a non-negative number/); + }); + + it('should throw an error if a non-number size is requested', function () { + const buffer = new BufferPool(); + expect(() => buffer.read('256')).to.throw(/Parameter size must be a non-negative number/); + }); + + it('exact size', function () { + const buffer = new BufferPool(); + buffer.append(Buffer.from([0, 1])); + const data = buffer.read(2); + expect(data).to.eql(Buffer.from([0, 1])); + expect(buffer).property('length').to.equal(0); + }); + + it('within first buffer', function () { + const buffer = new BufferPool(); + buffer.append(Buffer.from([0, 1, 2, 3])); + const data = buffer.read(2); + expect(data).to.eql(Buffer.from([0, 1])); + expect(buffer).property('length').to.equal(2); + }); + + it('across multiple buffers', function () { + const buffer = new BufferPool(); + buffer.append(Buffer.from([0, 1, 2, 3, 4, 5])); + const data = buffer.read(6); + expect(data).to.eql(Buffer.from([0, 1, 2, 3, 4, 5])); + expect(buffer).property('length').to.equal(0); + }); + }); + }); });