Skip to content

Commit

Permalink
feat: introduce BufferPool to replace BufferList
Browse files Browse the repository at this point in the history
BufferList really helped simplify a lot of code in our message
stream processing, but ultimately is more powerful than we need it
to be. Additionally, depending on this package makes maintenance
of the driver more difficult over time. This introduces a new type
called BufferList which is tailored to our particular use case.

NODE-2930
  • Loading branch information
mbroadst committed Dec 8, 2020
1 parent a52767f commit ee28cf1
Show file tree
Hide file tree
Showing 6 changed files with 199 additions and 26 deletions.
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
"bson-ext": "^2.0.0"
},
"dependencies": {
"bl": "^2.2.1",
"bson": "^4.0.4",
"denque": "^1.4.1",
"lodash": "^4.17.20"
Expand Down
21 changes: 8 additions & 13 deletions src/cmap/message_stream.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import BufferList = require('bl');
import { Duplex, DuplexOptions } from 'stream';
import { Response, Msg, BinMsg, Query, WriteProtocolMessageType, MessageHeader } from './commands';
import { MongoError, MongoParseError } from '../error';
Expand All @@ -11,7 +10,7 @@ import {
CompressorName
} from './wire_protocol/compression';
import type { Document, BSONSerializeOptions } from '../bson';
import type { Callback } from '../utils';
import { BufferPool, Callback } from '../utils';
import type { ClientSession } from '../sessions';

const MESSAGE_HEADER_SIZE = 16;
Expand Down Expand Up @@ -48,21 +47,19 @@ export interface OperationDescription extends BSONSerializeOptions {
* @internal
*/
export class MessageStream extends Duplex {
/** @internal */
maxBsonMessageSize: number;
[kBuffer]: BufferList;
/** @internal */
[kBuffer]: BufferPool;

constructor(options: MessageStreamOptions = {}) {
super(options);

this.maxBsonMessageSize = options.maxBsonMessageSize || kDefaultMaxBsonMessageSize;

this[kBuffer] = new BufferList();
this[kBuffer] = new BufferPool();
}

_write(chunk: Buffer, _: unknown, callback: Callback<Buffer>): void {
const buffer = this[kBuffer];
buffer.append(chunk);

this[kBuffer].append(chunk);
processIncomingData(this, callback);
}

Expand Down Expand Up @@ -135,7 +132,7 @@ function processIncomingData(stream: MessageStream, callback: Callback<Buffer>)
return;
}

const sizeOfMessage = buffer.readInt32LE(0);
const sizeOfMessage = buffer.peek(4).readInt32LE();
if (sizeOfMessage < 0) {
callback(new MongoParseError(`Invalid message size: ${sizeOfMessage}`));
return;
Expand All @@ -155,9 +152,7 @@ function processIncomingData(stream: MessageStream, callback: Callback<Buffer>)
return;
}

const message = buffer.slice(0, sizeOfMessage);
buffer.consume(sizeOfMessage);

const message = buffer.read(sizeOfMessage);
const messageHeader: MessageHeader = {
length: message.readInt32LE(0),
requestId: message.readInt32LE(4),
Expand Down
3 changes: 2 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,8 @@ export type {
ClientMetadata,
ClientMetadataOptions,
MongoDBNamespace,
InterruptableAsyncInterval
InterruptibleAsyncInterval,
BufferPool
} from './utils';
export type { WriteConcern, W, WriteConcernOptions } from './write_concern';
export type { ExecutionResult } from './operations/execute_operation';
Expand Down
10 changes: 5 additions & 5 deletions src/sdam/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {
now,
makeStateMachine,
calculateDurationInMs,
makeInterruptableAsyncInterval
makeInterruptibleAsyncInterval
} from '../utils';
import { EventEmitter } from 'events';
import { connect } from '../cmap/connect';
Expand All @@ -17,7 +17,7 @@ import {
} from './events';

import { Server } from './server';
import type { InterruptableAsyncInterval, Callback } from '../utils';
import type { InterruptibleAsyncInterval, Callback } from '../utils';
import type { TopologyVersion } from './server_description';
import type { ConnectionOptions } from '../cmap/connection';

Expand Down Expand Up @@ -65,7 +65,7 @@ export class Monitor extends EventEmitter {
[kConnection]?: Connection;
[kCancellationToken]: EventEmitter;
/** @internal */
[kMonitorId]?: InterruptableAsyncInterval;
[kMonitorId]?: InterruptibleAsyncInterval;
[kRTTPinger]?: RTTPinger;

constructor(server: Server, options?: Partial<MonitorOptions>) {
Expand Down Expand Up @@ -123,7 +123,7 @@ export class Monitor extends EventEmitter {
// start
const heartbeatFrequencyMS = this.options.heartbeatFrequencyMS;
const minHeartbeatFrequencyMS = this.options.minHeartbeatFrequencyMS;
this[kMonitorId] = makeInterruptableAsyncInterval(monitorServer(this), {
this[kMonitorId] = makeInterruptibleAsyncInterval(monitorServer(this), {
interval: heartbeatFrequencyMS,
minInterval: minHeartbeatFrequencyMS,
immediate: true
Expand Down Expand Up @@ -153,7 +153,7 @@ export class Monitor extends EventEmitter {
// restart monitoring
const heartbeatFrequencyMS = this.options.heartbeatFrequencyMS;
const minHeartbeatFrequencyMS = this.options.minHeartbeatFrequencyMS;
this[kMonitorId] = makeInterruptableAsyncInterval(monitorServer(this), {
this[kMonitorId] = makeInterruptibleAsyncInterval(monitorServer(this), {
interval: heartbeatFrequencyMS,
minInterval: minHeartbeatFrequencyMS
});
Expand Down
108 changes: 103 additions & 5 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -961,7 +961,7 @@ export function calculateDurationInMs(started: number): number {
return elapsed < 0 ? 0 : elapsed;
}

export interface InterruptableAsyncIntervalOptions {
export interface InterruptibleAsyncIntervalOptions {
/** The interval to execute a method on */
interval: number;
/** A minimum interval that must elapse before the method is called */
Expand All @@ -977,7 +977,7 @@ export interface InterruptableAsyncIntervalOptions {
}

/** @internal */
export interface InterruptableAsyncInterval {
export interface InterruptibleAsyncInterval {
wake(): void;
stop(): void;
}
Expand All @@ -991,10 +991,10 @@ export interface InterruptableAsyncInterval {
*
* @param fn - An async function to run on an interval, must accept a `callback` as its only parameter
*/
export function makeInterruptableAsyncInterval(
export function makeInterruptibleAsyncInterval(
fn: (callback: Callback) => void,
options?: Partial<InterruptableAsyncIntervalOptions>
): InterruptableAsyncInterval {
options?: Partial<InterruptibleAsyncIntervalOptions>
): InterruptibleAsyncInterval {
let timerId: NodeJS.Timeout | undefined;
let lastCallTime: number;
let lastWakeTime: number;
Expand Down Expand Up @@ -1126,3 +1126,101 @@ export function resolveOptions<T extends CommandOperationOptions>(

return result;
}

const kBuffers = Symbol('buffers');
const kLength = Symbol('length');

/** @internal */
export class BufferPool {
[kBuffers]: Buffer[];
[kLength]: number;

constructor() {
this[kBuffers] = [];
this[kLength] = 0;
}

get length(): number {
return this[kLength];
}

append(buffer: Buffer): void {
this[kBuffers].push(buffer);
this[kLength] += buffer.length;
}

peek(size: number): Buffer {
return this.read(size, false);
}

read(size: number, consume = true): Buffer {
if (typeof size !== 'number' || size < 0) {
throw new TypeError('Parameter size must be a non-negative number');
}

if (typeof size === 'number' && size < 0) {
size += this.length;
}

if (size > this[kLength]) {
return Buffer.alloc(0);
}

let result: Buffer;

// read the whole buffer
if (size === this.length) {
result = Buffer.concat(this[kBuffers]);

if (consume) {
this[kBuffers] = [];
this[kLength] = 0;
}
}

// size is within first buffer, no need to concat
else if (size <= this[kBuffers][0].length) {
result = this[kBuffers][0].slice(0, size);
if (consume) {
this[kBuffers][0] = this[kBuffers][0].slice(size);
this[kLength] -= size;
}
}

// size is beyond first buffer, need to track and copy
else {
result = Buffer.allocUnsafe(size);

let idx;
let offset = 0;
let bytesToCopy = size;
for (idx = 0; idx < this[kBuffers].length; ++idx) {
let bytesCopied;
if (bytesToCopy > this[kBuffers][idx].length) {
bytesCopied = this[kBuffers][idx].copy(result, offset, 0);
offset += bytesCopied;
} else {
bytesCopied = this[kBuffers][idx].copy(result, offset, 0, bytesToCopy);
if (consume) {
this[kBuffers][idx] = this[kBuffers][idx].slice(0, bytesCopied);
}
offset += bytesCopied;
break;
}

bytesToCopy -= bytesCopied;
}

// compact the internal buffer array
if (consume) {
this[kBuffers] = this[kBuffers].slice(idx);
}

if (result.length > offset) {
return result.slice(0, offset);
}
}

return result;
}
}
82 changes: 81 additions & 1 deletion test/unit/utils.test.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
'use strict';
const { eachAsync, now, makeInterruptableAsyncInterval } = require('../../src/utils');
const { eachAsync, now, makeInterruptableAsyncInterval, BufferPool } = require('../../src/utils');
const { expect } = require('chai');
const sinon = require('sinon');

Expand Down Expand Up @@ -161,4 +161,84 @@ describe('utils', function () {
this.clock.tick(250);
});
});

context('makeBufferPool', function () {
it('should report the correct length', function () {
const buffer = new BufferPool();
buffer.append(Buffer.from([0, 1]));
buffer.append(Buffer.from([2, 3]));
buffer.append(Buffer.from([2, 3]));
expect(buffer).property('length').to.equal(6);
});

it('return an empty buffer if too many bytes requested', function () {
const buffer = new BufferPool();
buffer.append(Buffer.from([0, 1, 2, 3]));
const data = buffer.read(6);
expect(data).to.have.length(0);
expect(buffer).property('length').to.equal(4);
});

context('peek', function () {
it('exact size', function () {
const buffer = new BufferPool();
buffer.append(Buffer.from([0, 1]));
const data = buffer.peek(2);
expect(data).to.eql(Buffer.from([0, 1]));
expect(buffer).property('length').to.equal(2);
});

it('within first buffer', function () {
const buffer = new BufferPool();
buffer.append(Buffer.from([0, 1, 2, 3]));
const data = buffer.peek(2);
expect(data).to.eql(Buffer.from([0, 1]));
expect(buffer).property('length').to.equal(4);
});

it('across multiple buffers', function () {
const buffer = new BufferPool();
buffer.append(Buffer.from([0, 1, 2, 3, 4, 5]));
const data = buffer.peek(6);
expect(data).to.eql(Buffer.from([0, 1, 2, 3, 4, 5]));
expect(buffer).property('length').to.equal(6);
});
});

context('read', function () {
it('should throw an error if a negative size is requested', function () {
const buffer = new BufferPool();
expect(() => buffer.read(-1)).to.throw(/Parameter size must be a non-negative number/);
});

it('should throw an error if a non-number size is requested', function () {
const buffer = new BufferPool();
expect(() => buffer.read('256')).to.throw(/Parameter size must be a non-negative number/);
});

it('exact size', function () {
const buffer = new BufferPool();
buffer.append(Buffer.from([0, 1]));
const data = buffer.read(2);
expect(data).to.eql(Buffer.from([0, 1]));
expect(buffer).property('length').to.equal(0);
});

it('within first buffer', function () {
const buffer = new BufferPool();
buffer.append(Buffer.from([0, 1, 2, 3]));
const data = buffer.read(2);
expect(data).to.eql(Buffer.from([0, 1]));
expect(buffer).property('length').to.equal(2);
});

it('across multiple buffers', function () {
const buffer = new BufferPool();
buffer.append(Buffer.from([0, 1, 2, 3, 4, 5]));
const data = buffer.read(6);
expect(data).to.eql(Buffer.from([0, 1, 2, 3, 4, 5]));
expect(buffer).property('length').to.equal(0);
});
});
});
});

0 comments on commit ee28cf1

Please sign in to comment.