Skip to content

Commit

Permalink
onPacket patched for replies
Browse files Browse the repository at this point in the history
Setup Completed
Head Completed
Data in Progress
  • Loading branch information
Universal Web committed Aug 6, 2023
1 parent 695ce9b commit 4692fbe
Show file tree
Hide file tree
Showing 9 changed files with 47 additions and 22 deletions.
1 change: 0 additions & 1 deletion udsp/base.js
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,5 @@ export class UDSP {
heapSize = 0;
throttle = false;
debounce = false;
requestQueue = construct(Map);
randomId = randomBuffer(8);
}
1 change: 1 addition & 0 deletions udsp/client/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ export class Client extends UDSP {
connect = connect;
static connections = new Map();
certChunks = [];
requestQueue = construct(Map);
}
export async function client(configuration) {
console.log('Create Client');
Expand Down
11 changes: 8 additions & 3 deletions udsp/request/base.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ export class Base {
}
const head = this.head;
const { missingHeadPackets } = this;
console.log(this.incomingHeadPackets);
eachArray(this.incomingHeadPackets, (item, index) => {
if (!item) {
if (!missingHeadPackets.has(index)) {
Expand All @@ -95,14 +96,14 @@ export class Base {
let lastKnownEndIndex = 0;
eachArray(this.incomingDataPackets, (item, index) => {
if (item) {
lastKnownEndIndex = item.dataIndex;
lastKnownEndIndex = item.index;
} else if (missingDataPackets.has(index)) {
missingDataPackets.set(index, true);
}
});
if (missingDataPackets.size !== 0) {
console.log('Missing packets: ', missingDataPackets);
console.log('Last known dataIndex: ', lastKnownEndIndex);
console.log('Last known index: ', lastKnownEndIndex);
} else if (this.head.dataSize === this.currentIncomingDataSize) {
this.complete();
}
Expand Down Expand Up @@ -199,7 +200,7 @@ export class Base {
const safeEndIndex = endIndex > headSize ? headSize : endIndex;
message.head = this.outgoingHead.subarray(currentBytePosition, safeEndIndex);
outgoingHeadPackets[packetId] = message;
message.dataIndex = safeEndIndex;
message.index = safeEndIndex;
if (safeEndIndex === headSize) {
message.last = true;
break;
Expand Down Expand Up @@ -317,6 +318,9 @@ export class Base {
sendPacket(message, headers, footer) {
this.source().send(message, headers, footer);
}
flushOutgoing = flushOutgoing;
flushIncoming = flushIncoming;
flush = flush;
on = on;
outgoingHead;
outgoingData;
Expand Down Expand Up @@ -355,6 +359,7 @@ export class Base {
totalIncomingPayloadSize = 0;
// Must be checked for uniqueness
totalReceivedPackets = 0;
totalReceivedUniqueHeadPackets = 0;
/* `state = 0;` is initializing the `state` property of the `Ask` class to `0`. This property is used
to keep track of the state of the request, where `0` represents an unsent request, `1` represents a
request that is currently being sent, and `2` represents a completed request. */
Expand Down
2 changes: 1 addition & 1 deletion udsp/request/dataPacketization.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export async function dataPacketization(source) {
const data = outgoingData.subarray(currentBytePosition, safeEndIndex);
console.log('chunksize', data.length, currentBytePosition, endIndex);
message.pid = packetId;
message.dataIndex = safeEndIndex;
message.index = safeEndIndex;
message.data = data;
outgoingDataPackets[packetId] = message;
if (safeEndIndex === dataSize) {
Expand Down
8 changes: 6 additions & 2 deletions udsp/request/destory.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ export async function destroy(err) {
if (err) {
this.err = err;
}
console.log(`Destroying ${this.type} ${this.id}`);
console.log(`Destroying ${this.type} ID:${this.id} ->`, err);
this.flush();
this.source().queue.delete(this.id);
if (this.isAsk) {
this.source().requestQueue.delete(this.id);
} else {
this.source().replyQueue.delete(this.id);
}
}
21 changes: 12 additions & 9 deletions udsp/request/onPacket.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { hasValue } from '@universalweb/acid';
import { hasValue, isFalse } from '@universalweb/acid';
import { destroy } from './destory.js';
import { processEvent } from '#udsp/processEvent';
export async function onPacket(packet) {
const source = this;
this.lastPacketTime = Date.now();
this.lastActive = Date.now();
const { message } = packet;
if (!message) {
return this.destroy('No Message in Packet');
Expand Down Expand Up @@ -38,11 +38,11 @@ export async function onPacket(packet) {
last
} = message;
console.log(`onPacket Stream Id ${streamId}`);
this.totalIncomingPackets++;
if (this.ok) {
return;
}
this.totalReceivedPackets++;
if (hasValue(packetId)) {
if (!this.receivedSetupPacket) {
return this.destroy('Setup packet not received');
}
source.lastActive = Date.now();
if (head && !this.incomingHeadPackets[packetId]) {
this.totalReceivedUniquePackets++;
Expand All @@ -55,7 +55,8 @@ export async function onPacket(packet) {
if (this.onHead) {
await this.onHead(message);
}
if (this.totalIncomingUniqueHeadPackets === this.totalReceivedUniqueHeadPackets) {
console.log(this, this.currentIncomingHeadSize);
if (this.totalIncomingHeadSize === this.currentIncomingHeadSize) {
this.assembleHead();
}
}
Expand All @@ -77,9 +78,11 @@ export async function onPacket(packet) {
}
} else if (setup) {
this.receivedSetupPacket = true;
console.log('Setup Packet Received');
console.log('Setup Packet Received', headerSize);
this.incomingSetupPacket = message;
if (hasValue(headerSize)) {
this.totalIncomingHeadSize = headerSize;
this.headerSize = headerSize;
}
if (method) {
this.method = method;
Expand All @@ -105,5 +108,5 @@ export async function onPacket(packet) {
} else if (err) {
return this.destroy(err);
}
console.log('On Packet event', this.id);
console.log('On Packet event', this.id, message);
}
7 changes: 4 additions & 3 deletions udsp/request/reply.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,19 @@ import { Base } from './base.js';
export class Reply extends Base {
constructor(request, source) {
super(request, source);
console.log('Setting up new reply');
const thisReply = this;
const { message } = request;
const { sid } = message;
// console.log(source);
// // console.log(message);
const { requestQueue, } = source;
const { replyQueue, } = source;
this.sid = sid;
this.id = sid;
this.response.sid = sid;
requestQueue.set(sid, this);
this.onPacket(request);
replyQueue.set(sid, this);
}
type = 'reply';
isReply = true;
async complete() {
this.state = 1;
Expand Down
16 changes: 14 additions & 2 deletions udsp/server/clients/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@ import {
promise,
isFalse,
isUndefined,
isFalsy
isFalsy,
hasValue
} from '@universalweb/acid';
import {
toBase64,
randomBuffer
} from '#crypto';
import { encodePacket } from '#udsp/encodePacket';
import { sendPacket } from '#udsp/sendPacket';
import { reply } from '#udsp/request/reply';
export class Client {
constructor(config) {
const { server } = config;
Expand Down Expand Up @@ -145,7 +147,17 @@ export class Client {
encryptConnectionId = false;
randomId = randomBuffer(8);
privateData = {};
requestQueue = construct(Map);
replyQueue = construct(Map);
reply(packet) {
const { message: { sid } } = packet;
console.log('Reply Client', this.replyQueue, this.replyQueue.has(sid), packet);
if (hasValue(sid)) {
if (this.replyQueue.has(sid)) {
return this.replyQueue.get(sid).onPacket(packet);
}
}
reply(packet, this).onPacket(packet);
}
}
export async function createClient(config) {
const client = await construct(Client, [config]);
Expand Down
2 changes: 1 addition & 1 deletion udsp/server/onPacket.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ export async function onPacket(packet, connection) {
message
} = config.packetDecoded;
if (hasValue(message?.sid)) {
reply(config.packetDecoded, client);
client.reply(config.packetDecoded);
} else {
client.proccessProtocolPacket(message, header);
}
Expand Down

0 comments on commit 4692fbe

Please sign in to comment.