Skip to content

Commit

Permalink
Moving in new Ask & Reply (Partial)
Browse files Browse the repository at this point in the history
  • Loading branch information
Universal Web committed May 18, 2023
1 parent af71c70 commit 673a850
Show file tree
Hide file tree
Showing 14 changed files with 314 additions and 106 deletions.
6 changes: 3 additions & 3 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions todo.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# TODO

Swap to simple binary IDs instead of UTF8 strings
Swap to simple binary Codes
Minify property names for packets
Minify property names for certificates
234 changes: 214 additions & 20 deletions udsp/ask.js
Original file line number Diff line number Diff line change
@@ -1,42 +1,236 @@
import { promise } from 'Acid';
import {
promise, assign, omit, eachArray, stringify, get, isBuffer, isPlainObject, isArray, isMap
} from 'Acid';
import { decode, encode } from 'msgpackr';
import {
failed, info, msgReceived, msgSent
} from '#logs';
const chunkSize = 700;
const dataEncodingTypesChunked = /stream|file|image|string/;
const dataEncodingTypesStructured = /json|msgpack|struct|/;
/**
* @todo Prepare Request into singular object.
* @todo Chunk body data while adding packit number to it.
* @todo Send all chunks (consider sending pkt1 twice).
* @todo
*/
export class Ask {
constructor(payload, thisClient) {
constructor(request, options) {
const thisAsk = this;
this.request = request;
const timeStamp = Date.now();
const {
client,
header,
footer
} = options;
const {
requestQueue,
packetIdGenerator
} = thisClient;
} = client;
// sid is a Stream ID
const sid = packetIdGenerator.get();
payload.sid = sid;
payload.t = timeStamp;
thisAsk.payload = payload;
request.sid = sid;
thisAsk.created = timeStamp;
if (options.dataEncoding) {
this.dataEncoding = options.dataEncoding;
}
this.client = function() {
return client;
};
const awaitingResult = promise((accept) => {
thisAsk.accept = accept;
});
requestQueue.set(sid, thisAsk);
thisClient.send(payload);
thisAsk.proccessRequest(request);
return awaitingResult;
}
/* `completedChunks = [];` is initializing an empty array called `completedChunks` as a property of
the `Ask` class. This property is likely used to store any completed chunks of data received from
the server during the request process. */
completedChunks = [];
/* `incompleteChunks = [];` is initializing an empty array called `incompleteChunks` as a property of
the `Ask` class. This property is likely used to store any incomplete chunks of data received from
the server during the request process. */
incompleteChunks = [];
request = {};
response = {};
headers = {};
options = {};
outgoingPackets = [];
incomingPackets = [];
incomingAks = [];
incomingNacks = [];
outgoingAcks = [];
outgoingNacks = [];
totalOutgoingPackets = 0;
totalOutgoingPayloadSize = 0;
// Must be checked for uniqueness
totalSentConfirmedPackets = 0;
totalIncomingPackets = 0;
totalIncomingPayloadSize = 0;
// Must be checked for uniqueness
totalReceivedPackets = 0;
/* `state = 0;` is initializing the `state` property of the `Ask` class to `0`. This property is used
to keep track of the state of the request, where `0` represents an unsent request, `1` represents a
request that is currently being sent, and `2` represents a completed request. */
state = 0;
recieve() {
flushOut() {
this.outgoingPayload = {};
this.outgoingPackets = [];
this.outgoingChunks = [];
this.totalOutgoingPackets = 0;
this.totalOutgoingPayloadSize = 0;
}
// Flush all body
flush() {
this.flushOut();
this.flushAsk();
}
// Flush All body and remove this reply from the map
destroy() {
this.flush();
this.client().requestQueue.delete(this.sid);
}
async sendPacket(request) {
const client = this.client();
const options = this.options;
const headers = this.headers;
const footer = this.footer;
if (this.options) {
console.log(`Sending msg with options var`, options);
}
if (this.headers) {
console.log(`Sending msg with headers var`, headers);
}
if (this.footer) {
console.log(`Sending msg with footer var`, footer);
}
console.log('Handover to Server Reply Packet to Send', request, headers, options);
client.send(request, headers, footer, options);
}
callback(response, headers) {
this.accept({
response,
headers,
async chunk(body) {
const chunks = [];
const packetLength = body.length;
for (let index = 0; index < packetLength;index += chunkSize) {
const chunk = body.slice(index, index + chunkSize);
chunks.push(chunk);
}
return chunks;
}
async buildRequestPackets() {
const thisReply = this;
const { request } = thisReply;
const { sid } = request;
console.log(request.body.length);
if (request.body && request.body.length > chunkSize) {
const chunks = await thisReply.chunk(request.body);
const packetLength = chunks.length;
thisReply.totalOutgoingPackets = packetLength;
eachArray(chunks, (item, pid) => {
const outgoingPacket = {
pid,
sid
};
if (pid === 0) {
outgoingPacket.pt = packetLength;
assign(outgoingPacket, omit(request, ['body']));
}
outgoingPacket.body = item;
thisReply.outgoingPackets[pid] = outgoingPacket;
});
} else {
request.pt = 0;
thisReply.outgoingPackets[0] = request;
}
}
async buildRequest() {
const dataEncoding = this.dataEncoding;
const request = this.request;
const thisReply = this;
if (request.body) {
if (!isBuffer(request.body)) {
request.body = encode(request.body);
}
}
await this.buildRequestPackets(request);
thisReply.sendAll();
}
sendIDs(packetIDs) {
const thisReply = this;
const server = this.server();
const client = this.client();
eachArray(packetIDs, (id) => {
thisReply.sendPacket(thisReply.outgoingPackets[id]);
});
}
async sendAll() {
const thisReply = this;
const client = this.client();
console.log('Ask.sendAll', thisReply.outgoingPackets);
eachArray(thisReply.outgoingPackets, (packet) => {
thisReply.sendPacket(packet);
});
}
received(message) {
const thisReply = this;
const {
body,
head,
sid,
pid,
act,
pt,
te,
cmplt,
finale,
ack,
nack
} = message;
if (cmplt) {
return thisReply.destroy();
}
if (pt) {
thisReply.totalIncomingPackets = pt;
}
if (te) {
thisReply.dataEncoding = te;
}
if (pid) {
if (!thisReply.incomingPackets[pid]) {
thisReply.incomingPackets[pid] = message;
thisReply.totalReceivedPackets++;
}
} else {
thisReply.incomingPackets[0] = message;
thisReply.totalReceivedPackets = 1;
thisReply.totalIncomingPackets = 1;
}
if (thisReply.totalIncomingPackets === thisReply.totalReceivedPackets) {
thisReply.state = 2;
}
if (thisReply.state === 2) {
thisReply.assemble();
}
}
assemble() {
const thisReply = this;
const { dataEncoding } = thisReply;
if (thisReply.totalIncomingPackets === 1) {
thisReply.request = thisReply.incomingPackets[0];
return thisReply.process();
}
const packet = thisReply.incomingPackets[0];
eachArray(thisReply.incomingPackets, (item) => {
if (item.body) {
Buffer.concat([packet.body, item.body]);
}
});
if (dataEncoding === 'struct' || !dataEncoding) {
msgReceived(thisReply.request);
if (thisReply.request.body) {
thisReply.request.body = decode(thisReply.request.body);
}
}
thisReply.flushOut();
}
async proccessRequest() {
await this.buildRequest();
}
callback(results) {
console.log('Request Results', results);
this.accept(results);
}
}
3 changes: 3 additions & 0 deletions udsp/client/connect.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ export async function connect(payload = {}) {
const thisClient = this;
// opn stands for open meaning connect to a server
payload.act = 'opn';
payload.body = {
intro: 'Hello World!'
};
const result = await thisClient.request(payload);
console.log(result);
const {
Expand Down
5 changes: 3 additions & 2 deletions udsp/client/index.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
/*
* Client Module
* UDSP - Universal Data Stream Protocol
* UWS Universal Web client
* Establishes a UDP based bi-directional real-time client between a client and end service.
* UW Universal Web
* UWP Universal Web Protocol
* Establishes a UDP based bi-directional real-time connection between client and server.
*/
// Default System imports
import {
Expand Down
13 changes: 3 additions & 10 deletions udsp/client/processMessage.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,16 @@ export async function processMessage(data) {
info(`Stream ID: ${sid} ${stringify(message)}`);
const askObject = requestQueue.get(sid);
if (askObject) {
const messageBody = await askObject.callback(message, headers);
if (messageBody) {
thisContext.send(messageBody, {
sid
});
}
const messageBody = await askObject.accept(packet);
if (askObject.state === 2) {
requestQueue.delete(sid);
askObject.delete();
}
} else {
return failed(`Invalid Stream Id given. ${stringify(message)}`);
}
} else if (message.watcher) {
console.log('WATCHER', message);
}
} else {
console.log('NO MESSAGE OBJECT', message);
console.log('NO MESSAGE OBJECT', packet);
}
}

40 changes: 23 additions & 17 deletions udsp/client/send.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,40 @@ import {
import { promise } from 'Acid';
import { encodePacket } from '#udsp/encodePacket';
imported('Client Send');
export async function send(message, priority) {
export async function send(message, headers, footer, options) {
info(`Send to server`);
const client = this;
const {
server,
destination,
ephemeralPublic,
id,
ip,
port,
keypair,
nonce,
state,
ephemeralPublic,
port,
profile,
transmitKey,
server,
serverId,
keypair,
id,
destination
} = this;
state,
transmitKey
} = client;
const packet = await encodePacket({
nonce,
transmitKey,
client,
destination,
ephemeralPublic,
footer,
headers,
id,
state,
isClient: true,
keypair,
message,
ephemeralPublic,
nonce,
options,
profile,
keypair,
isClient: true,
destination
state,
transmitKey
});
msgSent(`Packet Size ${packet.length}`);
return promise((accept, reject) => {
server.send(packet, port, ip, (error) => {
if (error) {
Expand Down
Loading

0 comments on commit 673a850

Please sign in to comment.