Skip to content
This repository has been archived by the owner on Feb 4, 2022. It is now read-only.

Commit

Permalink
feat(OP_MSG): adding OP_MSG implementation
Browse files Browse the repository at this point in the history
Adding implementation of OP_MSG and 3.6 wire protocol.
This is currrently untested and not hooked in to the overall core
driver.

Fixes NODE-1085
  • Loading branch information
daprahamian authored and mbroadst committed Feb 12, 2019
1 parent 0b9243d commit c5adfa3
Show file tree
Hide file tree
Showing 8 changed files with 682 additions and 1 deletion.
204 changes: 204 additions & 0 deletions lib/connection/msg.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
'use strict';

// Implementation of OP_MSG spec:
// https://github.com/mongodb/specifications/blob/master/source/message/OP_MSG.rst
//
// struct Section {
// uint8 payloadType;
// union payload {
// document document; // payloadType == 0
// struct sequence { // payloadType == 1
// int32 size;
// cstring identifier;
// document* documents;
// };
// };
// };

// struct OP_MSG {
// struct MsgHeader {
// int32 messageLength;
// int32 requestID;
// int32 responseTo;
// int32 opCode = 2013;
// };
// uint32 flagBits;
// Section+ sections;
// [uint32 checksum;]
// };

const opcodes = require('../wireprotocol/shared').opcodes;

// Incrementing request id
let _requestId = 0;

// Msg Flags
const OPTS_CHECKSUM_PRESENT = 1;
const OPTS_MORE_TO_COME = 2;
const OPTS_EXHAUST_ALLOWED = 1 >> 16;

class Msg {
constructor(bson, query, options) {
// Basic options needed to be passed in
if (query == null) throw new Error('query must be specified for query');

// Basic options
this.bson = bson;
this.query = Array.isArray(query) ? query : [query];

// Ensure empty options
this.options = options || {};

// Additional options
this.requestId = Msg.getRequestId();

// Serialization option
this.serializeFunctions =
typeof options.serializeFunctions === 'boolean' ? options.serializeFunctions : false;
this.ignoreUndefined =
typeof options.ignoreUndefined === 'boolean' ? options.ignoreUndefined : false;
this.checkKeys = typeof options.checkKeys === 'boolean' ? options.checkKeys : false;
this.maxBsonSize = options.maxBsonSize || 1024 * 1024 * 16;

// flags
this.checksumPresent = false;
this.moreToCome = options.moreToCome || false;
this.exhaustAllowed = false;
}

toBin() {
const buffers = [];
let flags = 0;

if (this.checksumPresent) {
flags |= OPTS_CHECKSUM_PRESENT;
}

if (this.moreToCome) {
flags |= OPTS_MORE_TO_COME;
}

if (this.exhaustAllowed) {
flags |= OPTS_EXHAUST_ALLOWED;
}

const header = new Buffer(
4 * 4 + // Header
4 // Flags
);

buffers.push(header);

let totalLength = header.length;

for (let i = 0; i < this.query.length; ++i) {
const query = this.query[i];

const nameArgumentPair = getValidSegmentListNamePairs(query);
if (nameArgumentPair) {
// TODO: Add support for payload type 1
const argument = nameArgumentPair.argument;

// Add initial type 0 segment with arguments pulled up
const clonedQuery = Object.assign({}, query);
delete clonedQuery[argument];
totalLength += this.makeDocumentSegment(buffers, clonedQuery);

// Create type 1 query
totalLength += this.makeSequenceSegment(buffers, argument, query[argument]);
} else {
totalLength += this.makeDocumentSegment(buffers, query);
}
}

writeInt32ListToUint8Buffer(header, [totalLength, this.requestId, 0, opcodes.OP_MSG, flags]);

return buffers;
}

makeDocumentSegment(buffers, document) {
const payloadTypeBuffer = new Buffer(1);
payloadTypeBuffer[0] = 0;

const documentBuffer = this.serializeBson(document);

buffers.push(payloadTypeBuffer);
buffers.push(documentBuffer);

return payloadTypeBuffer.length + documentBuffer.length;
}

makeSequenceSegment(buffers, argument, documents) {
const metaBuffer = new Buffer(
1 + // payloadType,
4 + // Size of sequence
argument.length + // Argument length
1 //C string null terminator
);

let segmentLength = metaBuffer.length - 1;

buffers.push(metaBuffer);
documents.forEach(document => {
const documentBuffer = this.serializeBson(document);
segmentLength += documentBuffer.length;
buffers.push(documentBuffer);
});

metaBuffer[0] = 1 & 0x1;
metaBuffer[1] = segmentLength & 0xff;
metaBuffer[2] = (segmentLength >> 8) & 0xff;
metaBuffer[3] = (segmentLength >> 16) & 0xff;
metaBuffer[4] = (segmentLength >> 24) & 0xff;
metaBuffer.write(argument, 5, 'utf8');
metaBuffer[metaBuffer.length - 1] = 0;

return segmentLength + 1;
}

serializeBson(document) {
return this.bson.serialize(document, {
checkKeys: this.checkKeys,
serializeFunctions: this.serializeFunctions,
ignoreUndefined: this.ignoreUndefined
});
}
}

Msg.getRequestId = function() {
return ++_requestId;
};

function writeInt32ListToUint8Buffer(buffer, int32List, start) {
let index = start || 0;

int32List.forEach(int32 => {
buffer[index] = int32 & 0xff;
buffer[index + 1] = (int32 >> 8) & 0xff;
buffer[index + 2] = (int32 >> 16) & 0xff;
buffer[index + 3] = (int32 >> 24) & 0xff;
index += 4;
});

return index;
}

const VALID_NAME_ARGUMENT_MAPS = {
insert: 'documents',
update: 'updates',
delete: 'deletes'
};

function getValidSegmentListNamePairs(query) {
for (let name in VALID_NAME_ARGUMENT_MAPS) {
if (name in query) {
const argument = VALID_NAME_ARGUMENT_MAPS[name];
if (query[argument] && query[argument].length > 1) {
return { name, argument };
}
}
}
return false;
}

module.exports = { Msg };
123 changes: 123 additions & 0 deletions lib/wireprotocol/3_6_support/execute_find.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
'use strict';

const getReadPreference = require('../shared').getReadPreference;
const Msg = require('../../connection/msg').Msg;

function executeFind(bson, ns, cmd, cursorState, topology, options) {
// Ensure we have at least some options
options = options || {};
// Get the readPreference
const readPreference = getReadPreference(cmd, options);
// Set the optional batchSize
cursorState.batchSize = cmd.batchSize || cursorState.batchSize;

// Get name of database
const parts = ns.split(/\./);
const $db = parts.shift();

// Build actual find command
let findCmd = { $db, find: parts.join('.') };

// I we provided a filter
if (cmd.query) {
findCmd.filter = cmd.query['$query'] || cmd.query;
}

[
'fields',
'hint',
'skip',
'limit',
'comment',
'maxScan',
'maxTimeMS',
'min',
'max',
'returnKey',
'showDiskLoc',
'snapshot',
'tailable',
'oplogReplay',
'noCursorTimeout',
'collation'
].forEach(key => {
if (cmd[key]) {
findCmd[key] = cmd[key];
}
});

const sort = parseSortField(cmd.sort);

// Add sort to command
if (sort) findCmd.sort = sort;

// If we have awaitData set
if (cmd.awaitData) findCmd.awaitData = cmd.awaitData;
if (cmd.awaitdata) findCmd.awaitData = cmd.awaitdata;

// If we have explain, we need to rewrite the find command
// to wrap it in the explain command
if (cmd.explain) {
findCmd = {
explain: findCmd
};
}

// Did we provide a readConcern
if (cmd.readConcern) findCmd.readConcern = cmd.readConcern;

// Set up the serialize and ignoreUndefined fields
const serializeFunctions =
typeof options.serializeFunctions === 'boolean' ? options.serializeFunctions : false;
const ignoreUndefined =
typeof options.ignoreUndefined === 'boolean' ? options.ignoreUndefined : false;

// We have a Mongos topology, check if we need to add a readPreference
if (topology.type === 'mongos' && readPreference && readPreference.preference !== 'primary') {
findCmd = {
$query: findCmd,
$readPreference: readPreference.toJSON()
};
}

return new Msg(bson, findCmd, { serializeFunctions, ignoreUndefined, checkKeys: false });
}

function parseSortField(sort) {
if (!Array.isArray(sort)) {
return sort;
}

// Handle issue of sort being an Array
const sortObject = {};

if (sort.length > 0 && !Array.isArray(sort[0])) {
var sortDirection = sort[1];
// Translate the sort order text
if (sortDirection === 'asc') {
sortDirection = 1;
} else if (sortDirection === 'desc') {
sortDirection = -1;
}

// Set the sort order
sortObject[sort[0]] = sortDirection;
} else {
for (var i = 0; i < sort.length; i++) {
sortDirection = sort[i][1];
// Translate the sort order text
if (sortDirection === 'asc') {
sortDirection = 1;
} else if (sortDirection === 'desc') {
sortDirection = -1;
}

// Set the sort order
sortObject[sort[i][0]] = sortDirection;
}
}

return sortObject;
}

module.exports = executeFind;
Loading

0 comments on commit c5adfa3

Please sign in to comment.