Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Greatly speed up 'advanced' ipc receiving with big messages #42931

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 52 additions & 20 deletions lib/internal/child_process/serialization.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const {
JSONParse,
JSONStringify,
StringPrototypeSplit,
ArrayPrototypePush,
Symbol,
TypedArrayPrototypeSubarray,
} = primordials;
Expand All @@ -15,6 +16,7 @@ const assert = require('internal/assert');
const { streamBaseState, kLastWriteWasAsync } = internalBinding('stream_wrap');

const kMessageBuffer = Symbol('kMessageBuffer');
const kMessageBufferSize = Symbol('kMessageBufferSize');
const kJSONBuffer = Symbol('kJSONBuffer');
const kStringDecoder = Symbol('kStringDecoder');

Expand Down Expand Up @@ -51,47 +53,77 @@ class ChildProcessDeserializer extends v8.DefaultDeserializer {
// (aka 'advanced')
const advanced = {
initMessageChannel(channel) {
channel[kMessageBuffer] = Buffer.alloc(0);
channel[kMessageBuffer] = [];
channel[kMessageBufferSize] = 0;
channel.buffering = false;
},

*parseChannelMessages(channel, readData) {
if (readData.length === 0) return;

let messageBuffer = Buffer.concat([channel[kMessageBuffer], readData]);
while (messageBuffer.length > 4) {
const size = messageBuffer.readUInt32BE();
if (messageBuffer.length < 4 + size) {
break;
}
ArrayPrototypePush(channel[kMessageBuffer], readData);
channel[kMessageBufferSize] += readData.length;

// Index 0 should always be present because we just pushed data into it.
let messageBufferHead = channel[kMessageBuffer][0];
while (messageBufferHead.length >= 4) {
// We call `readUInt32BE` manually here, because this is faster than first converting
// it to a buffer and using `readUInt32BE` on that.
const fullMessageSize = (
messageBufferHead[0] << 24 |
messageBufferHead[1] << 16 |
messageBufferHead[2] << 8 |
messageBufferHead[3]
) + 4;

if (channel[kMessageBufferSize] < fullMessageSize) break;

const concatenatedBuffer = channel[kMessageBuffer].length === 1 ?
targos marked this conversation as resolved.
Show resolved Hide resolved
channel[kMessageBuffer][0] :
Buffer.concat(
channel[kMessageBuffer],
channel[kMessageBufferSize]
);

const deserializer = new ChildProcessDeserializer(
TypedArrayPrototypeSubarray(messageBuffer, 4, 4 + size));
messageBuffer = TypedArrayPrototypeSubarray(messageBuffer, 4 + size);
TypedArrayPrototypeSubarray(concatenatedBuffer, 4, fullMessageSize)
);

messageBufferHead = TypedArrayPrototypeSubarray(concatenatedBuffer, fullMessageSize);
channel[kMessageBufferSize] = messageBufferHead.length;
channel[kMessageBuffer] =
channel[kMessageBufferSize] !== 0 ? [messageBufferHead] : [];

deserializer.readHeader();
yield deserializer.readValue();
}
channel[kMessageBuffer] = messageBuffer;
channel.buffering = messageBuffer.length > 0;

channel.buffering = channel[kMessageBufferSize] > 0;
},

writeChannelMessage(channel, req, message, handle) {
const ser = new ChildProcessSerializer();
// Add 4 bytes, to later populate with message length
ser.writeRawBytes(Buffer.allocUnsafe(4));
ser.writeHeader();
ser.writeValue(message);

const serializedMessage = ser.releaseBuffer();
const sizeBuffer = Buffer.allocUnsafe(4);
sizeBuffer.writeUInt32BE(serializedMessage.length);

const buffer = Buffer.concat([
sizeBuffer,
serializedMessage,
]);
const result = channel.writeBuffer(req, buffer, handle);
const serializedMessageLength = serializedMessage.length - 4;

serializedMessage.set([
serializedMessageLength >> 24 & 0xFF,
serializedMessageLength >> 16 & 0xFF,
serializedMessageLength >> 8 & 0xFF,
serializedMessageLength & 0xFF,
], 0);

const result = channel.writeBuffer(req, serializedMessage, handle);

// Mirror what stream_base_commons.js does for Buffer retention.
if (streamBaseState[kLastWriteWasAsync])
req.buffer = buffer;
req.buffer = serializedMessage;

return result;
},
};
Expand Down