This repository has been archived by the owner on Feb 4, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 106
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(OP_MSG): adding OP_MSG implementation
Adding implementation of OP_MSG and 3.6 wire protocol. This is currrently untested and not hooked in to the overall core driver. Fixes NODE-1085
- Loading branch information
1 parent
0b9243d
commit c5adfa3
Showing
8 changed files
with
682 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,204 @@ | ||
'use strict'; | ||
|
||
// Implementation of OP_MSG spec: | ||
// https://github.com/mongodb/specifications/blob/master/source/message/OP_MSG.rst | ||
// | ||
// struct Section { | ||
// uint8 payloadType; | ||
// union payload { | ||
// document document; // payloadType == 0 | ||
// struct sequence { // payloadType == 1 | ||
// int32 size; | ||
// cstring identifier; | ||
// document* documents; | ||
// }; | ||
// }; | ||
// }; | ||
|
||
// struct OP_MSG { | ||
// struct MsgHeader { | ||
// int32 messageLength; | ||
// int32 requestID; | ||
// int32 responseTo; | ||
// int32 opCode = 2013; | ||
// }; | ||
// uint32 flagBits; | ||
// Section+ sections; | ||
// [uint32 checksum;] | ||
// }; | ||
|
||
const opcodes = require('../wireprotocol/shared').opcodes; | ||
|
||
// Incrementing request id | ||
let _requestId = 0; | ||
|
||
// Msg Flags | ||
const OPTS_CHECKSUM_PRESENT = 1; | ||
const OPTS_MORE_TO_COME = 2; | ||
const OPTS_EXHAUST_ALLOWED = 1 >> 16; | ||
|
||
class Msg { | ||
constructor(bson, query, options) { | ||
// Basic options needed to be passed in | ||
if (query == null) throw new Error('query must be specified for query'); | ||
|
||
// Basic options | ||
this.bson = bson; | ||
this.query = Array.isArray(query) ? query : [query]; | ||
|
||
// Ensure empty options | ||
this.options = options || {}; | ||
|
||
// Additional options | ||
this.requestId = Msg.getRequestId(); | ||
|
||
// Serialization option | ||
this.serializeFunctions = | ||
typeof options.serializeFunctions === 'boolean' ? options.serializeFunctions : false; | ||
this.ignoreUndefined = | ||
typeof options.ignoreUndefined === 'boolean' ? options.ignoreUndefined : false; | ||
this.checkKeys = typeof options.checkKeys === 'boolean' ? options.checkKeys : false; | ||
this.maxBsonSize = options.maxBsonSize || 1024 * 1024 * 16; | ||
|
||
// flags | ||
this.checksumPresent = false; | ||
this.moreToCome = options.moreToCome || false; | ||
this.exhaustAllowed = false; | ||
} | ||
|
||
toBin() { | ||
const buffers = []; | ||
let flags = 0; | ||
|
||
if (this.checksumPresent) { | ||
flags |= OPTS_CHECKSUM_PRESENT; | ||
} | ||
|
||
if (this.moreToCome) { | ||
flags |= OPTS_MORE_TO_COME; | ||
} | ||
|
||
if (this.exhaustAllowed) { | ||
flags |= OPTS_EXHAUST_ALLOWED; | ||
} | ||
|
||
const header = new Buffer( | ||
4 * 4 + // Header | ||
4 // Flags | ||
); | ||
|
||
buffers.push(header); | ||
|
||
let totalLength = header.length; | ||
|
||
for (let i = 0; i < this.query.length; ++i) { | ||
const query = this.query[i]; | ||
|
||
const nameArgumentPair = getValidSegmentListNamePairs(query); | ||
if (nameArgumentPair) { | ||
// TODO: Add support for payload type 1 | ||
const argument = nameArgumentPair.argument; | ||
|
||
// Add initial type 0 segment with arguments pulled up | ||
const clonedQuery = Object.assign({}, query); | ||
delete clonedQuery[argument]; | ||
totalLength += this.makeDocumentSegment(buffers, clonedQuery); | ||
|
||
// Create type 1 query | ||
totalLength += this.makeSequenceSegment(buffers, argument, query[argument]); | ||
} else { | ||
totalLength += this.makeDocumentSegment(buffers, query); | ||
} | ||
} | ||
|
||
writeInt32ListToUint8Buffer(header, [totalLength, this.requestId, 0, opcodes.OP_MSG, flags]); | ||
|
||
return buffers; | ||
} | ||
|
||
makeDocumentSegment(buffers, document) { | ||
const payloadTypeBuffer = new Buffer(1); | ||
payloadTypeBuffer[0] = 0; | ||
|
||
const documentBuffer = this.serializeBson(document); | ||
|
||
buffers.push(payloadTypeBuffer); | ||
buffers.push(documentBuffer); | ||
|
||
return payloadTypeBuffer.length + documentBuffer.length; | ||
} | ||
|
||
makeSequenceSegment(buffers, argument, documents) { | ||
const metaBuffer = new Buffer( | ||
1 + // payloadType, | ||
4 + // Size of sequence | ||
argument.length + // Argument length | ||
1 //C string null terminator | ||
); | ||
|
||
let segmentLength = metaBuffer.length - 1; | ||
|
||
buffers.push(metaBuffer); | ||
documents.forEach(document => { | ||
const documentBuffer = this.serializeBson(document); | ||
segmentLength += documentBuffer.length; | ||
buffers.push(documentBuffer); | ||
}); | ||
|
||
metaBuffer[0] = 1 & 0x1; | ||
metaBuffer[1] = segmentLength & 0xff; | ||
metaBuffer[2] = (segmentLength >> 8) & 0xff; | ||
metaBuffer[3] = (segmentLength >> 16) & 0xff; | ||
metaBuffer[4] = (segmentLength >> 24) & 0xff; | ||
metaBuffer.write(argument, 5, 'utf8'); | ||
metaBuffer[metaBuffer.length - 1] = 0; | ||
|
||
return segmentLength + 1; | ||
} | ||
|
||
serializeBson(document) { | ||
return this.bson.serialize(document, { | ||
checkKeys: this.checkKeys, | ||
serializeFunctions: this.serializeFunctions, | ||
ignoreUndefined: this.ignoreUndefined | ||
}); | ||
} | ||
} | ||
|
||
Msg.getRequestId = function() { | ||
return ++_requestId; | ||
}; | ||
|
||
function writeInt32ListToUint8Buffer(buffer, int32List, start) { | ||
let index = start || 0; | ||
|
||
int32List.forEach(int32 => { | ||
buffer[index] = int32 & 0xff; | ||
buffer[index + 1] = (int32 >> 8) & 0xff; | ||
buffer[index + 2] = (int32 >> 16) & 0xff; | ||
buffer[index + 3] = (int32 >> 24) & 0xff; | ||
index += 4; | ||
}); | ||
|
||
return index; | ||
} | ||
|
||
const VALID_NAME_ARGUMENT_MAPS = { | ||
insert: 'documents', | ||
update: 'updates', | ||
delete: 'deletes' | ||
}; | ||
|
||
function getValidSegmentListNamePairs(query) { | ||
for (let name in VALID_NAME_ARGUMENT_MAPS) { | ||
if (name in query) { | ||
const argument = VALID_NAME_ARGUMENT_MAPS[name]; | ||
if (query[argument] && query[argument].length > 1) { | ||
return { name, argument }; | ||
} | ||
} | ||
} | ||
return false; | ||
} | ||
|
||
module.exports = { Msg }; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,123 @@ | ||
'use strict'; | ||
|
||
const getReadPreference = require('../shared').getReadPreference; | ||
const Msg = require('../../connection/msg').Msg; | ||
|
||
function executeFind(bson, ns, cmd, cursorState, topology, options) { | ||
// Ensure we have at least some options | ||
options = options || {}; | ||
// Get the readPreference | ||
const readPreference = getReadPreference(cmd, options); | ||
// Set the optional batchSize | ||
cursorState.batchSize = cmd.batchSize || cursorState.batchSize; | ||
|
||
// Get name of database | ||
const parts = ns.split(/\./); | ||
const $db = parts.shift(); | ||
|
||
// Build actual find command | ||
let findCmd = { $db, find: parts.join('.') }; | ||
|
||
// I we provided a filter | ||
if (cmd.query) { | ||
findCmd.filter = cmd.query['$query'] || cmd.query; | ||
} | ||
|
||
[ | ||
'fields', | ||
'hint', | ||
'skip', | ||
'limit', | ||
'comment', | ||
'maxScan', | ||
'maxTimeMS', | ||
'min', | ||
'max', | ||
'returnKey', | ||
'showDiskLoc', | ||
'snapshot', | ||
'tailable', | ||
'oplogReplay', | ||
'noCursorTimeout', | ||
'collation' | ||
].forEach(key => { | ||
if (cmd[key]) { | ||
findCmd[key] = cmd[key]; | ||
} | ||
}); | ||
|
||
const sort = parseSortField(cmd.sort); | ||
|
||
// Add sort to command | ||
if (sort) findCmd.sort = sort; | ||
|
||
// If we have awaitData set | ||
if (cmd.awaitData) findCmd.awaitData = cmd.awaitData; | ||
if (cmd.awaitdata) findCmd.awaitData = cmd.awaitdata; | ||
|
||
// If we have explain, we need to rewrite the find command | ||
// to wrap it in the explain command | ||
if (cmd.explain) { | ||
findCmd = { | ||
explain: findCmd | ||
}; | ||
} | ||
|
||
// Did we provide a readConcern | ||
if (cmd.readConcern) findCmd.readConcern = cmd.readConcern; | ||
|
||
// Set up the serialize and ignoreUndefined fields | ||
const serializeFunctions = | ||
typeof options.serializeFunctions === 'boolean' ? options.serializeFunctions : false; | ||
const ignoreUndefined = | ||
typeof options.ignoreUndefined === 'boolean' ? options.ignoreUndefined : false; | ||
|
||
// We have a Mongos topology, check if we need to add a readPreference | ||
if (topology.type === 'mongos' && readPreference && readPreference.preference !== 'primary') { | ||
findCmd = { | ||
$query: findCmd, | ||
$readPreference: readPreference.toJSON() | ||
}; | ||
} | ||
|
||
return new Msg(bson, findCmd, { serializeFunctions, ignoreUndefined, checkKeys: false }); | ||
} | ||
|
||
function parseSortField(sort) { | ||
if (!Array.isArray(sort)) { | ||
return sort; | ||
} | ||
|
||
// Handle issue of sort being an Array | ||
const sortObject = {}; | ||
|
||
if (sort.length > 0 && !Array.isArray(sort[0])) { | ||
var sortDirection = sort[1]; | ||
// Translate the sort order text | ||
if (sortDirection === 'asc') { | ||
sortDirection = 1; | ||
} else if (sortDirection === 'desc') { | ||
sortDirection = -1; | ||
} | ||
|
||
// Set the sort order | ||
sortObject[sort[0]] = sortDirection; | ||
} else { | ||
for (var i = 0; i < sort.length; i++) { | ||
sortDirection = sort[i][1]; | ||
// Translate the sort order text | ||
if (sortDirection === 'asc') { | ||
sortDirection = 1; | ||
} else if (sortDirection === 'desc') { | ||
sortDirection = -1; | ||
} | ||
|
||
// Set the sort order | ||
sortObject[sort[i][0]] = sortDirection; | ||
} | ||
} | ||
|
||
return sortObject; | ||
} | ||
|
||
module.exports = executeFind; |
Oops, something went wrong.