From 4692fbe534fe8f4f08c1181e0964e71f7bcd97b1 Mon Sep 17 00:00:00 2001 From: Universal Web Date: Sun, 6 Aug 2023 06:45:58 -0400 Subject: [PATCH] onPacket patched for replies Setup Completed Head Completed Data in Progress --- udsp/base.js | 1 - udsp/client/index.js | 1 + udsp/request/base.js | 11 ++++++++--- udsp/request/dataPacketization.js | 2 +- udsp/request/destory.js | 8 ++++++-- udsp/request/onPacket.js | 21 ++++++++++++--------- udsp/request/reply.js | 7 ++++--- udsp/server/clients/index.js | 16 ++++++++++++++-- udsp/server/onPacket.js | 2 +- 9 files changed, 47 insertions(+), 22 deletions(-) diff --git a/udsp/base.js b/udsp/base.js index 3f426ca6..b56292b6 100644 --- a/udsp/base.js +++ b/udsp/base.js @@ -80,6 +80,5 @@ export class UDSP { heapSize = 0; throttle = false; debounce = false; - requestQueue = construct(Map); randomId = randomBuffer(8); } diff --git a/udsp/client/index.js b/udsp/client/index.js index f8e7d88d..d2e69250 100644 --- a/udsp/client/index.js +++ b/udsp/client/index.js @@ -333,6 +333,7 @@ export class Client extends UDSP { connect = connect; static connections = new Map(); certChunks = []; + requestQueue = construct(Map); } export async function client(configuration) { console.log('Create Client'); diff --git a/udsp/request/base.js b/udsp/request/base.js index b3efd97a..d0e16b77 100644 --- a/udsp/request/base.js +++ b/udsp/request/base.js @@ -72,6 +72,7 @@ export class Base { } const head = this.head; const { missingHeadPackets } = this; + console.log(this.incomingHeadPackets); eachArray(this.incomingHeadPackets, (item, index) => { if (!item) { if (!missingHeadPackets.has(index)) { @@ -95,14 +96,14 @@ export class Base { let lastKnownEndIndex = 0; eachArray(this.incomingDataPackets, (item, index) => { if (item) { - lastKnownEndIndex = item.dataIndex; + lastKnownEndIndex = item.index; } else if (missingDataPackets.has(index)) { missingDataPackets.set(index, true); } }); if (missingDataPackets.size !== 0) { console.log('Missing packets: ', missingDataPackets); - console.log('Last known dataIndex: ', lastKnownEndIndex); + console.log('Last known index: ', lastKnownEndIndex); } else if (this.head.dataSize === this.currentIncomingDataSize) { this.complete(); } @@ -199,7 +200,7 @@ export class Base { const safeEndIndex = endIndex > headSize ? headSize : endIndex; message.head = this.outgoingHead.subarray(currentBytePosition, safeEndIndex); outgoingHeadPackets[packetId] = message; - message.dataIndex = safeEndIndex; + message.index = safeEndIndex; if (safeEndIndex === headSize) { message.last = true; break; @@ -317,6 +318,9 @@ export class Base { sendPacket(message, headers, footer) { this.source().send(message, headers, footer); } + flushOutgoing = flushOutgoing; + flushIncoming = flushIncoming; + flush = flush; on = on; outgoingHead; outgoingData; @@ -355,6 +359,7 @@ export class Base { totalIncomingPayloadSize = 0; // Must be checked for uniqueness totalReceivedPackets = 0; + totalReceivedUniqueHeadPackets = 0; /* `state = 0;` is initializing the `state` property of the `Ask` class to `0`. This property is used to keep track of the state of the request, where `0` represents an unsent request, `1` represents a request that is currently being sent, and `2` represents a completed request. */ diff --git a/udsp/request/dataPacketization.js b/udsp/request/dataPacketization.js index 5203b349..d62d3d88 100644 --- a/udsp/request/dataPacketization.js +++ b/udsp/request/dataPacketization.js @@ -21,7 +21,7 @@ export async function dataPacketization(source) { const data = outgoingData.subarray(currentBytePosition, safeEndIndex); console.log('chunksize', data.length, currentBytePosition, endIndex); message.pid = packetId; - message.dataIndex = safeEndIndex; + message.index = safeEndIndex; message.data = data; outgoingDataPackets[packetId] = message; if (safeEndIndex === dataSize) { diff --git a/udsp/request/destory.js b/udsp/request/destory.js index c5fdf721..9bb6c7b8 100644 --- a/udsp/request/destory.js +++ b/udsp/request/destory.js @@ -3,7 +3,11 @@ export async function destroy(err) { if (err) { this.err = err; } - console.log(`Destroying ${this.type} ${this.id}`); + console.log(`Destroying ${this.type} ID:${this.id} ->`, err); this.flush(); - this.source().queue.delete(this.id); + if (this.isAsk) { + this.source().requestQueue.delete(this.id); + } else { + this.source().replyQueue.delete(this.id); + } } diff --git a/udsp/request/onPacket.js b/udsp/request/onPacket.js index a9648b37..df951c5b 100644 --- a/udsp/request/onPacket.js +++ b/udsp/request/onPacket.js @@ -1,9 +1,9 @@ -import { hasValue } from '@universalweb/acid'; +import { hasValue, isFalse } from '@universalweb/acid'; import { destroy } from './destory.js'; import { processEvent } from '#udsp/processEvent'; export async function onPacket(packet) { const source = this; - this.lastPacketTime = Date.now(); + this.lastActive = Date.now(); const { message } = packet; if (!message) { return this.destroy('No Message in Packet'); @@ -38,11 +38,11 @@ export async function onPacket(packet) { last } = message; console.log(`onPacket Stream Id ${streamId}`); - this.totalIncomingPackets++; - if (this.ok) { - return; - } + this.totalReceivedPackets++; if (hasValue(packetId)) { + if (!this.receivedSetupPacket) { + return this.destroy('Setup packet not received'); + } source.lastActive = Date.now(); if (head && !this.incomingHeadPackets[packetId]) { this.totalReceivedUniquePackets++; @@ -55,7 +55,8 @@ export async function onPacket(packet) { if (this.onHead) { await this.onHead(message); } - if (this.totalIncomingUniqueHeadPackets === this.totalReceivedUniqueHeadPackets) { + console.log(this, this.currentIncomingHeadSize); + if (this.totalIncomingHeadSize === this.currentIncomingHeadSize) { this.assembleHead(); } } @@ -77,9 +78,11 @@ export async function onPacket(packet) { } } else if (setup) { this.receivedSetupPacket = true; - console.log('Setup Packet Received'); + console.log('Setup Packet Received', headerSize); + this.incomingSetupPacket = message; if (hasValue(headerSize)) { this.totalIncomingHeadSize = headerSize; + this.headerSize = headerSize; } if (method) { this.method = method; @@ -105,5 +108,5 @@ export async function onPacket(packet) { } else if (err) { return this.destroy(err); } - console.log('On Packet event', this.id); + console.log('On Packet event', this.id, message); } diff --git a/udsp/request/reply.js b/udsp/request/reply.js index 9542ebd8..b2e09afb 100644 --- a/udsp/request/reply.js +++ b/udsp/request/reply.js @@ -13,18 +13,19 @@ import { Base } from './base.js'; export class Reply extends Base { constructor(request, source) { super(request, source); + console.log('Setting up new reply'); const thisReply = this; const { message } = request; const { sid } = message; // console.log(source); // // console.log(message); - const { requestQueue, } = source; + const { replyQueue, } = source; this.sid = sid; this.id = sid; this.response.sid = sid; - requestQueue.set(sid, this); - this.onPacket(request); + replyQueue.set(sid, this); } + type = 'reply'; isReply = true; async complete() { this.state = 1; diff --git a/udsp/server/clients/index.js b/udsp/server/clients/index.js index 01352397..4556cd2b 100644 --- a/udsp/server/clients/index.js +++ b/udsp/server/clients/index.js @@ -18,7 +18,8 @@ import { promise, isFalse, isUndefined, - isFalsy + isFalsy, + hasValue } from '@universalweb/acid'; import { toBase64, @@ -26,6 +27,7 @@ import { } from '#crypto'; import { encodePacket } from '#udsp/encodePacket'; import { sendPacket } from '#udsp/sendPacket'; +import { reply } from '#udsp/request/reply'; export class Client { constructor(config) { const { server } = config; @@ -145,7 +147,17 @@ export class Client { encryptConnectionId = false; randomId = randomBuffer(8); privateData = {}; - requestQueue = construct(Map); + replyQueue = construct(Map); + reply(packet) { + const { message: { sid } } = packet; + console.log('Reply Client', this.replyQueue, this.replyQueue.has(sid), packet); + if (hasValue(sid)) { + if (this.replyQueue.has(sid)) { + return this.replyQueue.get(sid).onPacket(packet); + } + } + reply(packet, this).onPacket(packet); + } } export async function createClient(config) { const client = await construct(Client, [config]); diff --git a/udsp/server/onPacket.js b/udsp/server/onPacket.js index fcf0924a..69a5d035 100644 --- a/udsp/server/onPacket.js +++ b/udsp/server/onPacket.js @@ -56,7 +56,7 @@ export async function onPacket(packet, connection) { message } = config.packetDecoded; if (hasValue(message?.sid)) { - reply(config.packetDecoded, client); + client.reply(config.packetDecoded); } else { client.proccessProtocolPacket(message, header); }