From f7f7274daf185f0dd079956ccea32352ab9415b8 Mon Sep 17 00:00:00 2001 From: Universal Web Date: Sat, 8 Jul 2023 23:30:54 -0400 Subject: [PATCH] Reduce Head Memory Usage Reduce Data Memory Usage Converting to new comms system Several Patches --- udsp/ask.js | 12 +- udsp/reply.js | 49 ++----- udsp/request/base.js | 210 ++++++++++++++++++++---------- udsp/request/dataPacketization.js | 19 ++- udsp/request/onData.js | 16 +-- udsp/request/onPacket.js | 83 +++++++----- udsp/request/sendAll.js | 10 -- udsp/request/sendEnd.js | 11 -- udsp/request/sendPacketsById.js | 6 +- udsp/server/actions/connect.js | 10 +- 10 files changed, 237 insertions(+), 189 deletions(-) delete mode 100644 udsp/request/sendAll.js delete mode 100644 udsp/request/sendEnd.js diff --git a/udsp/ask.js b/udsp/ask.js index c0cf432e..19db613e 100644 --- a/udsp/ask.js +++ b/udsp/ask.js @@ -29,8 +29,8 @@ export class Ask extends Base { const streamId = packetIdGenerator.get(); this.request.sid = streamId; this.packetTemplate.sid = streamId; - this.outgoingSetupPacket.sid = streamId; - this.outgoingSetupPacket.method = method; + this.request.method = method; + this.method = method; this.id = streamId; if (data) { this.request.data = data; @@ -38,15 +38,13 @@ export class Ask extends Base { if (head) { this.request.head = head; } - if (method) { - this.request.method = method; - } else { - this.request.method = 'get'; - } queue.set(streamId, this); } complete() { console.log('Ask complete', this); + if (this.state === 3) { + this.state = 4; + } this.accept(this); } isAsk = true; diff --git a/udsp/reply.js b/udsp/reply.js index 6a85e471..8463c62f 100644 --- a/udsp/reply.js +++ b/udsp/reply.js @@ -16,52 +16,27 @@ export class Reply extends Base { const thisReply = this; const { message } = request; const { sid } = message; - console.log(source); + // console.log(source); + // // console.log(message); const { queue, packetIdGenerator } = source; - thisReply.source = function() { - return source; - }; const server = source.server(); - thisReply.server = function() { + this.server = function() { return server; }; - thisReply.packetMaxPayload = server.packetMaxPayload; - thisReply.packetMaxPayloadSafeEstimate = server.packetMaxPayloadSafeEstimate; - thisReply.sid = sid; - thisReply.responsePacketTemplate.sid = sid; - thisReply.response.sid = sid; - queue.set(sid, thisReply); - thisReply.sendPacket = function(config) { - return source.send(config); - }; - if (source.lastActive) { - source.lastActive = Date.now(); - } - thisReply.received(message); - return thisReply; + this.sid = sid; + this.id = sid; + this.packetTemplate.sid = sid; + this.response.sid = sid; + source.lastActive = Date.now(); + queue.set(sid, this); + this.onPacket(message); } isReply = true; - async assemble() { - const thisReply = this; - const { serialization } = thisReply; - if (thisReply.totalIncomingPackets === 1) { - thisReply.request = thisReply.incomingPackets[0]; - } - const packet = thisReply.incomingPackets[0]; - eachArray(thisReply.incomingPackets, (item) => { - if (item.data) { - Buffer.concat([packet.data, item.data]); - } - }); - if (serialization === 'struct' || !serialization) { - msgReceived(thisReply.request); - if (thisReply.request.data) { - thisReply.request.data = decode(thisReply.request.data); - } - } + async complete() { + this.state = 1; await processEvent(this); } } diff --git a/udsp/request/base.js b/udsp/request/base.js index 044c4e47..782173e5 100644 --- a/udsp/request/base.js +++ b/udsp/request/base.js @@ -1,7 +1,6 @@ import { sendPacket } from './sendPacket.js'; import { destroy } from './destory.js'; import { dataPacketization } from './dataPacketization.js'; -import { sendEnd } from './sendEnd.js'; import { on } from './on.js'; import { flushOutgoing, flushIncoming, flush } from './flush.js'; import { sendPacketsById } from './sendPacketsById.js'; @@ -9,12 +8,11 @@ import { sendAll } from './sendAll.js'; import { onPacket } from './onPacket.js'; import { isBuffer, isPlainObject, isString, promise, assign, - objectSize, eachArray, jsonParse, construct, isArray + objectSize, eachArray, jsonParse, construct, isArray, clear } from '@universalweb/acid'; import { encode, decode } from 'msgpackr'; import { request } from '#udsp/request'; import { assembleData } from './assembleData.js'; -const singlePacketMethods = /^(connect|open)$/i; export class Base { constructor(options = {}, source) { const { events, } = options; @@ -48,49 +46,75 @@ export class Base { } assign(source.head, target); } - writeHeader(headerName, headerValue) { + setHeader(headerName, headerValue) { const source = (this.isAsk) ? this.request : this.response; if (!source.head) { source.head = {}; } source.head[headerName] = headerValue; } + setHead(headArg) { + const headBuffer = Buffer.concat(headArg); + clear(headArg); + const head = decode(headBuffer); + headBuffer.fill(0); + this.head = head; + this.headAssembled = true; + } async assembleHead() { - const incomingPackets = this.incomingPackets; - const head = []; - eachArray(incomingPackets, (item, index) => { + if (this.headAssembled) { + return; + } + const head = this.head; + const { missingHeadPackets } = this; + eachArray(this.incomingHeadPackets, (item, index) => { if (!item) { - this.missingHeadPackets.set(index); + if (!missingHeadPackets.has(index)) { + missingHeadPackets.set(index, true); + } } - if (item.head) { - head.push(item.head); + if (item.head && !head[index]) { + head[index] = item.head; } }); - this.setHead(head); - } - setHead(headArg) { - let head = (isArray(headArg)) ? Buffer.concat(headArg) : headArg; - head = (isBuffer(head)) ? decode(head) : head; - this.head = head; + if (this.totalIncomingHeadSize === this.currentIncomingHeadSize) { + this.setHead(head); + this.sendDataReady(); + } } - async assembleData() { - const incomingPackets = this.incomingPackets; - const data = []; - eachArray(incomingPackets, (item, index) => { - if (!item) { - this.missingDataPackets.set(index); - } - if (item.data) { - data.push(item.data); + async checkData() { + const { missingDataPackets } = this; + if (this.compiledData) { + return; + } + let lastKnownEndIndex = 0; + eachArray(this.incomingDataPackets, (item, index) => { + if (item) { + lastKnownEndIndex = item.endIndex; + } else if (missingDataPackets.has(index)) { + missingDataPackets.set(index, true); } }); - this.data = Buffer.concat(data); + if (missingDataPackets.size !== 0) { + console.log('Missing packets: ', missingDataPackets); + console.log('Last known EndIndex: ', lastKnownEndIndex); + } else if (this.head.dataSize === this.currentIncomingDataSize) { + this.ok = true; + this.complete(); + } + } + get data() { + if (this.compiledData) { + return this.compiledData; + } + const data = Buffer.concat(this.incomingDataPackets); if (this.head.serialization === 'struct') { this.data = decode(this.data); } else if (this.head.serialization === 'json') { this.data = jsonParse(this.data); } - this.complete(); + this.compiledData = data; + return data; } toString() { return this.data.toString(); @@ -102,7 +126,34 @@ export class Base { return decode(this.data); } sendSetup() { - this.send(this.outgoingSetupPacket); + if (this.state === 0) { + this.state = 1; + } + const packet = this.getPacketTemplate(); + packet.setup = true; + packet.headerSize = this.outgoingHeadSize; + this.sendPacket(packet); + } + sendHeadReady() { + if (this.state === 1) { + this.state = 2; + } + const packet = this.getPacketTemplate(); + packet.headReady = true; + this.sendPacket(packet); + } + sendDataReady() { + if (this.state === 2) { + this.state = 3; + } + const packet = this.getPacketTemplate(); + packet.dataReady = true; + this.sendPacket(packet); + } + async sendEnd() { + const packet = this.getPacketTemplate(); + packet.end = true; + this.sendPacket(packet); } get headers() { return this.head; @@ -110,18 +161,14 @@ export class Base { get body() { return this.data; } - buildSetupPacket() { + buildPacket() { const { - packetTemplate, - maxPacketSize, - sid, isAsk, isReply } = this; const message = (this.isAsk) ? this.request : this.response; this.outgoingHead = encode(message.head); this.outgoingHeadSize = this.outgoingHead.length; - this.outgoingSetupPacket.headerSize = this.outgoingHeadSize; } async headPacketization() { const { @@ -135,56 +182,48 @@ export class Base { let packetId = 0; const headSize = this.outgoingHeadSize; while (currentBytePosition < this.outgoingHeadSize) { - const packet = assign({}, this.packetTemplate); + const packet = this.getPacketTemplate(); packet.sid = sid; packet.pid = packetId; const endIndex = currentBytePosition + maxHeadSize; const safeEndIndex = endIndex > headSize ? headSize : endIndex; packet.head = this.outgoingHead.subarray(currentBytePosition, safeEndIndex); packet.headSize = packet.head.length; + outgoingHeadPackets[packetId] = packet; + if (safeEndIndex === headSize) { + packet.last = true; + break; + } packetId++; currentBytePosition += maxHeadSize; - outgoingHeadPackets[packetId] = packet; } } async dataPacketization() { const { - packetMaxPayloadSafeEstimate, - packetTemplate, - maxDataSize, - sid, isAsk, isReply } = this; const message = (isAsk) ? this.request : this.response; if (message.data) { - this.outgoingBody = message.data; + this.outgoingData = message.data; if (!isBuffer(message.data)) { - this.writeHeader('serialization', 'struct'); - this.outgoingBody = encode(message.data); + this.setHeader('serialization', 'struct'); + this.outgoingData = encode(message.data); } - this.writeHeader('dataSize', this.outgoingBody.length); + this.outgoingDataSize = this.outgoingData.length; + this.setHeader('dataSize', this.outgoingData.length); await dataPacketization(this); } } async packetization() { const message = (this.isAsk) ? this.request : this.response; - if (singlePacketMethods.test(message.method)) { - message.end = true; - message.pid = 0; - this.outgoingPackets[0] = message; - } else { - await this.buildSetupPacket(); - await this.headPacketization(); - await this.dataPacketization(); - } + await this.buildPacket(); + await this.dataPacketization(); + await this.headPacketization(); } async send() { const thisSource = this; const { - packetTemplate, - maxPacketSize, - sid, isAsk, isReply, } = this; @@ -198,10 +237,53 @@ export class Base { this.sendSetup(); return awaitingResult; } + async sendHead() { + const thisReply = this; + console.log('outgoingHeadPackets', this.outgoingHeadPackets); + this.sendPackets(this.outgoingHeadPackets); + } + async sendData() { + const thisReply = this; + console.log('outgoingDataPackets', this.outgoingDataPackets); + this.sendPackets(this.outgoingDataPackets); + } + sendPackets(packetArray) { + const thisReply = this; + eachArray(packetArray, (packet) => { + thisReply.sendPacket({ + message: packet + }); + }); + } + toObject() { + const { + head, + data, + sid, + method, + } = this; + const target = { + head: this.head, + data: this.data, + sid: this.id, + method: this.method + }; + return target; + } + sendHeadPacketsById(id) { + return sendPacketsById(this.outgoingHeadPackets, id); + } + sendDataPacketsById(id) { + return sendPacketsById(this.outgoingDataPackets, id); + } + getPacketTemplate() { + const { sid, } = this; + const packet = { + sid + }; + return packet; + } destroy = destroy; - sendEnd = sendEnd; - sendPacketsById = sendPacketsById; - sendAll = sendAll; onPacket = onPacket; sendPacket = sendPacket; on = on; @@ -221,26 +303,22 @@ export class Base { response = { head: {} }; - // this is the data in order may have missing packets at times but will remain in order - data = []; - // This is as the data came in over the wire out of order + head = []; + dataOrdered = []; stream = []; missingHeadPackets = construct(Map); missingDataPackets = construct(Map); events = {}; header = {}; options = {}; - outgoingSetupPacket = { - pid: 0, - setup: true, - }; setupConfirmationPacket = { pid: 0, authorize: true, }; outgoingDataPackets = []; outgoingHeadPackets = []; - incomingPackets = []; + incomingHeadPackets = []; + incomingDataPackets = []; incomingAks = []; incomingNacks = []; outgoingAcks = []; diff --git a/udsp/request/dataPacketization.js b/udsp/request/dataPacketization.js index fffb991e..7cba9336 100644 --- a/udsp/request/dataPacketization.js +++ b/udsp/request/dataPacketization.js @@ -6,19 +6,18 @@ export async function dataPacketization(source) { maxDataSize, id: sid, isAsk, - outgoingDataPackets + outgoingDataPackets, + outgoingData } = source; - const message = (isAsk) ? source.request : source.response; - const data = message.data; - const dataSize = data?.length; + const dataSize = outgoingData?.length; let currentBytePosition = 0; let packetId = outgoingDataPackets.length; if (dataSize > maxDataSize) { - console.log('data size', data.length); + console.log('data size', outgoingData.length); while (currentBytePosition < dataSize) { const endIndex = currentBytePosition + maxDataSize; const safeEndIndex = endIndex > dataSize ? dataSize : endIndex; - const chunk = data.subarray(currentBytePosition, safeEndIndex); + const chunk = outgoingData.subarray(currentBytePosition, safeEndIndex); console.log('chunksize', chunk.length, currentBytePosition, endIndex); const packet = { pid: packetId, @@ -27,18 +26,18 @@ export async function dataPacketization(source) { }; packet.data = chunk; outgoingDataPackets[packetId] = outgoingDataPackets; - if (endIndex >= dataSize) { - packet.end = true; + if (safeEndIndex === dataSize) { + packet.last = true; break; } - currentBytePosition = currentBytePosition + maxDataSize; + currentBytePosition += maxDataSize; packetId++; } } else { const packet = { pid: 0, end: true, - data + data: outgoingData }; console.log(source); outgoingDataPackets[0] = packet; diff --git a/udsp/request/onData.js b/udsp/request/onData.js index ee861abd..62ea9158 100644 --- a/udsp/request/onData.js +++ b/udsp/request/onData.js @@ -1,18 +1,12 @@ export async function onData(message) { console.log('On Data event'); - const { - pid, - data - } = message; - this.data[pid] = data; - this.currentPayloadSize += data.length; - if (this.totalIncomingPayloadSize) { - if (this.currentPayloadSize > 0) { - this.progress = (this.currentPayloadSize / this.totalIncomingPayloadSize) * 100; + if (this.totalIncomingDataSize) { + if (this.currentIncomingDataSize > 0) { + this.incomingProgress = (this.currentIncomingDataSize / this.totalIncomingDataSize) * 100; } - console.log('Progress', this.progress); + console.log('Incoming Progress', this.incomingProgress); } if (this.events.data) { - this.events.data(data, pid); + this.events.data(message.data, message.pid); } } diff --git a/udsp/request/onPacket.js b/udsp/request/onPacket.js index 06191ff2..6bae2406 100644 --- a/udsp/request/onPacket.js +++ b/udsp/request/onPacket.js @@ -1,11 +1,15 @@ import { hasValue } from '@universalweb/acid'; import { destroy } from './destory.js'; +import { processEvent } from '#udsp/processEvent'; +import { singlePacketMethods } from '#udsp/request/singlePacketMethods'; export async function onPacket(packet) { const source = this; this.lastPacketTime = Date.now(); const { message } = packet; const { + // main data payload data, + // header payload head, // Stream ID sid: streamId, @@ -14,7 +18,8 @@ export async function onPacket(packet) { // Action method, // Packet total - pt: totalIncomingUniquePackets, + hpt: totalIncomingUniqueHeadPackets, + dpt: totalIncomingUniqueDataPackets, headerSize, // Data payload size dataSize, @@ -24,59 +29,77 @@ export async function onPacket(packet) { nack, err, end, - setup + setup, + headReady, + dataReady, + last } = message; console.log(`Stream Id ${streamId}`); - if (hasValue(totalIncomingUniquePackets)) { - this.totalIncomingUniquePackets = totalIncomingUniquePackets; - } - if (hasValue(dataSize)) { - this.totalIncomingDataSize = dataSize; - } - if (hasValue(headerSize)) { - this.totalIncomingHeadSize = headerSize; - } this.totalIncomingPackets++; + if (this.ok) { + return; + } if (hasValue(packetId)) { if (head && !this.incomingHeadPackets[packetId]) { this.totalReceivedUniquePackets++; - this.incomingHeadPackets[packetId] = message; + this.incomingHeadPackets[packetId] = message.head; this.totalReceivedUniqueHeadPackets++; this.currentIncomingHeadSize += head.length; + if (this.missingHeadPackets.has(packetId)) { + this.missingHeadPackets.delete(packetId); + } if (this.onHead) { await this.onHead(message); } + if (this.totalIncomingUniqueHeadPackets === this.totalReceivedUniqueHeadPackets) { + this.assembleHead(); + } } if (data && !this.incomingDataPackets[packetId]) { this.totalReceivedUniquePackets++; - this.incomingDataPackets[packetId] = message; + this.incomingDataPackets[packetId] = message.data; this.totalReceivedUniqueDataPackets++; this.currentIncomingDataSize += data.length; + if (this.missingDataPackets.has(packetId)) { + this.missingDataPackets.delete(packetId); + } if (this.onData) { await this.onData(message); } + if (last) { + this.totalIncomingUniqueDataPackets = packetId; + this.checkData(); + } } - } - if (end) { - if (data) { - this.totalIncomingUniqueDataPackets = packetId; + } else if (setup) { + this.receivedSetupPacket = true; + console.log('Setup Packet Received'); + if (hasValue(headerSize)) { + this.totalIncomingHeadSize = headerSize; } - if (head) { - this.totalIncomingUniqueHeadPackets = packetId; + if (method) { + this.method = method; } - if (this.totalIncomingUniqueHeadPackets === this.totalReceivedUniqueHeadPackets) { - this.assembleHead(); + if (hasValue(totalIncomingUniqueHeadPackets)) { + this.totalIncomingUniqueHeadPackets = totalIncomingUniqueHeadPackets; } - if (this.totalIncomingUniqueDataPackets === this.totalReceivedUniqueDataPackets) { - this.assembleData(); + this.sendHeadReady(); + } else if (headReady) { + this.receivedHeadReadyPacket = true; + console.log('Head Ready Packet Received'); + if (hasValue(totalIncomingUniqueDataPackets)) { + this.totalIncomingUniqueDataPackets = totalIncomingUniqueDataPackets; } - } - if (err) { + this.sendHead(); + } else if (dataReady) { + this.receivedDataReadyPacket = true; + console.log('Data Ready Packet Received'); + this.sendData(); + } else if (end) { + console.log('End Packet Received'); + // this.check(); && this.cleanup(); + } else if (err) { return this.destroy(err); } - if (setup) { - console.log('Setup Packet Received'); - this.send(this.setupConfirmationPacket); - } - console.log('On Packet event', this); + console.log('On Packet event', this.id); } diff --git a/udsp/request/sendAll.js b/udsp/request/sendAll.js deleted file mode 100644 index 0c930d3f..00000000 --- a/udsp/request/sendAll.js +++ /dev/null @@ -1,10 +0,0 @@ -import { eachArray } from '@universalweb/acid'; -export async function sendAll() { - const thisReply = this; - console.log('outgoingPackets', thisReply.outgoingPackets[0]); - eachArray(thisReply.outgoingPackets, (packet) => { - thisReply.sendPacket({ - message: packet - }); - }); -} diff --git a/udsp/request/sendEnd.js b/udsp/request/sendEnd.js deleted file mode 100644 index 59d05cca..00000000 --- a/udsp/request/sendEnd.js +++ /dev/null @@ -1,11 +0,0 @@ -import { sendPacket } from './sendPacket.js'; -export async function sendEnd() { - const thisAsk = this; - const { id: sid } = thisAsk; - thisAsk.sendPacket({ - message: { - sid, - end: true - } - }); -} diff --git a/udsp/request/sendPacketsById.js b/udsp/request/sendPacketsById.js index 0450c056..127603fd 100644 --- a/udsp/request/sendPacketsById.js +++ b/udsp/request/sendPacketsById.js @@ -1,15 +1,15 @@ import { eachArray, isArray } from '@universalweb/acid'; -export async function sendPacketsById(indexes) { +export async function sendPacketsById(packetArray, indexes) { const thisReply = this; if (isArray(indexes)) { eachArray(indexes, (id) => { - const message = thisReply.outgoingPackets[id]; + const message = packetArray[id]; thisReply.sendPacket({ message }); }); } else { - const message = thisReply.outgoingPackets[indexes]; + const message = packetArray[indexes]; thisReply.sendPacket({ message }); diff --git a/udsp/server/actions/connect.js b/udsp/server/actions/connect.js index 2a529a9a..b125b71c 100644 --- a/udsp/server/actions/connect.js +++ b/udsp/server/actions/connect.js @@ -1,5 +1,5 @@ import { info } from '#logs'; -export async function opn(reply) { +export async function connect(reply) { const { resourceDirectory, cacheMaxAge, @@ -8,11 +8,12 @@ export async function opn(reply) { serverName, encoding, language, - onConnectResponse + onConnectResponse, + response } = this; const client = reply.client(); const server = reply.server(); - const response = reply.response; + const request = reply.data; info(`Server ID${client.idString}`, `Client ID${client.clientIdString}`, `Stream ID${response.sid}`); response.head = {}; response.data = { @@ -42,6 +43,7 @@ export async function opn(reply) { } // connection status - backwards compatibility response.state = 1; + reply.head.serialization = 'struct'; // REKEY THE CLIENT BEFORE SENDING BACK - reply.send('struct'); + reply.send(); }