Skip to content
This repository has been archived by the owner on Feb 4, 2022. It is now read-only.

Commit

Permalink
feat(OP_MSG): adding class for translating OP_MSG from binary
Browse files Browse the repository at this point in the history
This will be used for parsing OP_MSG replies from the server
  • Loading branch information
daprahamian authored and mbroadst committed Feb 12, 2019
1 parent c5adfa3 commit 11e4132
Showing 1 changed file with 80 additions and 2 deletions.
82 changes: 80 additions & 2 deletions lib/connection/msg.js
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,86 @@ Msg.getRequestId = function() {
return ++_requestId;
};

class BinMsg {
constructor(bson, message, msgHeader, msgBody, opts) {
opts = opts || { promoteLongs: true, promoteValues: true, promoteBuffers: false };
this.parsed = false;
this.raw = message;
this.data = msgBody;
this.bson = bson;
this.opts = opts;

// Read the message header
this.length = msgHeader.length;
this.requestId = msgHeader.requestId;
this.responseTo = msgHeader.responseTo;
this.opCode = msgHeader.opCode;

// Read response flags
this.responseFlags = msgBody.readInt32LE(0);
this.checksumPresent = (this.responseFlags & Msg.flags.CHECKSUM_PRESENT) !== 0;
this.moreToCome = (this.responseFlags & Msg.flags.MORE_TO_COME) !== 0;
this.exhaustAllowed = (this.responseFlags & Msg.flags.EXHAUST_ALLOWED) !== 0;
this.promoteLongs = typeof opts.promoteLongs === 'boolean' ? opts.promoteLongs : true;
this.promoteValues = typeof opts.promoteValues === 'boolean' ? opts.promoteValues : true;
this.promoteBuffers = typeof opts.promoteBuffers === 'boolean' ? opts.promoteBuffers : false;

this.documents = [];
}

isParsed() {
return this.parsed;
}

parse(options) {
// Don't parse again if not needed
if (this.parsed) return;
options = options || {};

this.index = 4;
// Allow the return of raw documents instead of parsing
const raw = options.raw || false;
const promoteLongs =
typeof options.promoteLongs === 'boolean' ? options.promoteLongs : this.opts.promoteLongs;
const promoteValues =
typeof options.promoteValues === 'boolean' ? options.promoteValues : this.opts.promoteValues;
const promoteBuffers =
typeof options.promoteBuffers === 'boolean'
? options.promoteBuffers
: this.opts.promoteBuffers;

// Set up the options
const _options = {
promoteLongs: promoteLongs,
promoteValues: promoteValues,
promoteBuffers: promoteBuffers
};

while (this.index < this.data.length) {
const payloadType = this.data.readUInt8(this.index++);
if (payloadType === 1) {
console.error('TYPE 1');
} else if (payloadType === 0) {
const bsonSize = this.data.readUInt32LE(this.index);
const bin = this.data.slice(this.index, this.index + bsonSize);
this.documents.push(raw ? bin : this.bson.deserialize(bin, _options));

this.index += bsonSize;
}
}

this.parsed = true;
}
}

Msg.flags = {
CHECKSUM_PRESENT: 1,
MORE_TO_COME: 2,
EXHAUST_ALLOWED: 1 << 16
};

function writeInt32ListToUint8Buffer(buffer, int32List, start) {
let index = start || 0;

int32List.forEach(int32 => {
buffer[index] = int32 & 0xff;
buffer[index + 1] = (int32 >> 8) & 0xff;
Expand All @@ -198,7 +275,8 @@ function getValidSegmentListNamePairs(query) {
}
}
}

return false;
}

module.exports = { Msg };
module.exports = { Msg, BinMsg };

0 comments on commit 11e4132

Please sign in to comment.