diff --git a/package-lock.json b/package-lock.json index 2c69a8f8..6b27dd52 100644 --- a/package-lock.json +++ b/package-lock.json @@ -3157,9 +3157,9 @@ } }, "node_modules/Acid": { - "version": "3.0.25", - "resolved": "https://registry.npmjs.org/Acid/-/Acid-3.0.25.tgz", - "integrity": "sha512-zsCp4ej9QtNS6Fe7IqVm3vg3NiRza9pnZWUPKEiLBqwjDctdFuHGPI9M+4I6yABD8Yecu36T9sd1Ff9G+qoEWA==", + "version": "3.0.27", + "resolved": "https://registry.npmjs.org/Acid/-/Acid-3.0.27.tgz", + "integrity": "sha512-mO6Pldzj+cbyea5x3Jp0lnJPG9n/odKNt2SXczIASDAHHgHR2SMwByh33bVWFmE+lX19BdSoy5EOO2WnwvwJ3Q==", "bin": { "Acid": "index.js" } diff --git a/todo.md b/todo.md new file mode 100644 index 00000000..308980df --- /dev/null +++ b/todo.md @@ -0,0 +1,6 @@ +# TODO + +Swap to simple binary IDs instead of UTF8 strings +Swap to simple binary Codes +Minify property names for packets +Minify property names for certificates diff --git a/udsp/ask.js b/udsp/ask.js index e8638d27..0e09c784 100644 --- a/udsp/ask.js +++ b/udsp/ask.js @@ -1,42 +1,236 @@ -import { promise } from 'Acid'; +import { + promise, assign, omit, eachArray, stringify, get, isBuffer, isPlainObject, isArray, isMap +} from 'Acid'; +import { decode, encode } from 'msgpackr'; +import { + failed, info, msgReceived, msgSent +} from '#logs'; +const chunkSize = 700; +const dataEncodingTypesChunked = /stream|file|image|string/; +const dataEncodingTypesStructured = /json|msgpack|struct|/; +/** + * @todo Prepare Request into singular object. + * @todo Chunk body data while adding packit number to it. + * @todo Send all chunks (consider sending pkt1 twice). + * @todo + */ export class Ask { - constructor(payload, thisClient) { + constructor(request, options) { const thisAsk = this; + this.request = request; const timeStamp = Date.now(); + const { + client, + header, + footer + } = options; const { requestQueue, packetIdGenerator - } = thisClient; + } = client; // sid is a Stream ID const sid = packetIdGenerator.get(); - payload.sid = sid; - payload.t = timeStamp; - thisAsk.payload = payload; + request.sid = sid; + thisAsk.created = timeStamp; + if (options.dataEncoding) { + this.dataEncoding = options.dataEncoding; + } + this.client = function() { + return client; + }; const awaitingResult = promise((accept) => { thisAsk.accept = accept; }); requestQueue.set(sid, thisAsk); - thisClient.send(payload); + thisAsk.proccessRequest(request); return awaitingResult; } - /* `completedChunks = [];` is initializing an empty array called `completedChunks` as a property of - the `Ask` class. This property is likely used to store any completed chunks of data received from - the server during the request process. */ - completedChunks = []; - /* `incompleteChunks = [];` is initializing an empty array called `incompleteChunks` as a property of - the `Ask` class. This property is likely used to store any incomplete chunks of data received from - the server during the request process. */ - incompleteChunks = []; + request = {}; + response = {}; + headers = {}; + options = {}; + outgoingPackets = []; + incomingPackets = []; + incomingAks = []; + incomingNacks = []; + outgoingAcks = []; + outgoingNacks = []; + totalOutgoingPackets = 0; + totalOutgoingPayloadSize = 0; + // Must be checked for uniqueness + totalSentConfirmedPackets = 0; + totalIncomingPackets = 0; + totalIncomingPayloadSize = 0; + // Must be checked for uniqueness + totalReceivedPackets = 0; /* `state = 0;` is initializing the `state` property of the `Ask` class to `0`. This property is used to keep track of the state of the request, where `0` represents an unsent request, `1` represents a request that is currently being sent, and `2` represents a completed request. */ state = 0; - recieve() { + flushOut() { + this.outgoingPayload = {}; + this.outgoingPackets = []; + this.outgoingChunks = []; + this.totalOutgoingPackets = 0; + this.totalOutgoingPayloadSize = 0; + } + // Flush all body + flush() { + this.flushOut(); + this.flushAsk(); + } + // Flush All body and remove this reply from the map + destroy() { + this.flush(); + this.client().requestQueue.delete(this.sid); + } + async sendPacket(request) { + const client = this.client(); + const options = this.options; + const headers = this.headers; + const footer = this.footer; + if (this.options) { + console.log(`Sending msg with options var`, options); + } + if (this.headers) { + console.log(`Sending msg with headers var`, headers); + } + if (this.footer) { + console.log(`Sending msg with footer var`, footer); + } + console.log('Handover to Server Reply Packet to Send', request, headers, options); + client.send(request, headers, footer, options); } - callback(response, headers) { - this.accept({ - response, - headers, + async chunk(body) { + const chunks = []; + const packetLength = body.length; + for (let index = 0; index < packetLength;index += chunkSize) { + const chunk = body.slice(index, index + chunkSize); + chunks.push(chunk); + } + return chunks; + } + async buildRequestPackets() { + const thisReply = this; + const { request } = thisReply; + const { sid } = request; + console.log(request.body.length); + if (request.body && request.body.length > chunkSize) { + const chunks = await thisReply.chunk(request.body); + const packetLength = chunks.length; + thisReply.totalOutgoingPackets = packetLength; + eachArray(chunks, (item, pid) => { + const outgoingPacket = { + pid, + sid + }; + if (pid === 0) { + outgoingPacket.pt = packetLength; + assign(outgoingPacket, omit(request, ['body'])); + } + outgoingPacket.body = item; + thisReply.outgoingPackets[pid] = outgoingPacket; + }); + } else { + request.pt = 0; + thisReply.outgoingPackets[0] = request; + } + } + async buildRequest() { + const dataEncoding = this.dataEncoding; + const request = this.request; + const thisReply = this; + if (request.body) { + if (!isBuffer(request.body)) { + request.body = encode(request.body); + } + } + await this.buildRequestPackets(request); + thisReply.sendAll(); + } + sendIDs(packetIDs) { + const thisReply = this; + const server = this.server(); + const client = this.client(); + eachArray(packetIDs, (id) => { + thisReply.sendPacket(thisReply.outgoingPackets[id]); }); } + async sendAll() { + const thisReply = this; + const client = this.client(); + console.log('Ask.sendAll', thisReply.outgoingPackets); + eachArray(thisReply.outgoingPackets, (packet) => { + thisReply.sendPacket(packet); + }); + } + received(message) { + const thisReply = this; + const { + body, + head, + sid, + pid, + act, + pt, + te, + cmplt, + finale, + ack, + nack + } = message; + if (cmplt) { + return thisReply.destroy(); + } + if (pt) { + thisReply.totalIncomingPackets = pt; + } + if (te) { + thisReply.dataEncoding = te; + } + if (pid) { + if (!thisReply.incomingPackets[pid]) { + thisReply.incomingPackets[pid] = message; + thisReply.totalReceivedPackets++; + } + } else { + thisReply.incomingPackets[0] = message; + thisReply.totalReceivedPackets = 1; + thisReply.totalIncomingPackets = 1; + } + if (thisReply.totalIncomingPackets === thisReply.totalReceivedPackets) { + thisReply.state = 2; + } + if (thisReply.state === 2) { + thisReply.assemble(); + } + } + assemble() { + const thisReply = this; + const { dataEncoding } = thisReply; + if (thisReply.totalIncomingPackets === 1) { + thisReply.request = thisReply.incomingPackets[0]; + return thisReply.process(); + } + const packet = thisReply.incomingPackets[0]; + eachArray(thisReply.incomingPackets, (item) => { + if (item.body) { + Buffer.concat([packet.body, item.body]); + } + }); + if (dataEncoding === 'struct' || !dataEncoding) { + msgReceived(thisReply.request); + if (thisReply.request.body) { + thisReply.request.body = decode(thisReply.request.body); + } + } + thisReply.flushOut(); + } + async proccessRequest() { + await this.buildRequest(); + } + callback(results) { + console.log('Request Results', results); + this.accept(results); + } } diff --git a/udsp/client/connect.js b/udsp/client/connect.js index 865f6f67..9a1d7888 100644 --- a/udsp/client/connect.js +++ b/udsp/client/connect.js @@ -5,6 +5,9 @@ export async function connect(payload = {}) { const thisClient = this; // opn stands for open meaning connect to a server payload.act = 'opn'; + payload.body = { + intro: 'Hello World!' + }; const result = await thisClient.request(payload); console.log(result); const { diff --git a/udsp/client/index.js b/udsp/client/index.js index e1c1a1f1..b405ecbe 100644 --- a/udsp/client/index.js +++ b/udsp/client/index.js @@ -1,8 +1,9 @@ /* * Client Module * UDSP - Universal Data Stream Protocol - * UWS Universal Web client - * Establishes a UDP based bi-directional real-time client between a client and end service. + * UW Universal Web + * UWP Universal Web Protocol + * Establishes a UDP based bi-directional real-time connection between client and server. */ // Default System imports import { diff --git a/udsp/client/processMessage.js b/udsp/client/processMessage.js index ada9fdb7..0f5e39eb 100644 --- a/udsp/client/processMessage.js +++ b/udsp/client/processMessage.js @@ -32,23 +32,16 @@ export async function processMessage(data) { info(`Stream ID: ${sid} ${stringify(message)}`); const askObject = requestQueue.get(sid); if (askObject) { - const messageBody = await askObject.callback(message, headers); - if (messageBody) { - thisContext.send(messageBody, { - sid - }); - } + const messageBody = await askObject.accept(packet); if (askObject.state === 2) { - requestQueue.delete(sid); + askObject.delete(); } } else { return failed(`Invalid Stream Id given. ${stringify(message)}`); } - } else if (message.watcher) { - console.log('WATCHER', message); } } else { - console.log('NO MESSAGE OBJECT', message); + console.log('NO MESSAGE OBJECT', packet); } } diff --git a/udsp/client/send.js b/udsp/client/send.js index 4542fff8..910bc2e8 100644 --- a/udsp/client/send.js +++ b/udsp/client/send.js @@ -4,34 +4,40 @@ import { import { promise } from 'Acid'; import { encodePacket } from '#udsp/encodePacket'; imported('Client Send'); -export async function send(message, priority) { +export async function send(message, headers, footer, options) { info(`Send to server`); + const client = this; const { - server, + destination, + ephemeralPublic, + id, ip, - port, + keypair, nonce, - state, - ephemeralPublic, + port, profile, - transmitKey, + server, serverId, - keypair, - id, - destination - } = this; + state, + transmitKey + } = client; const packet = await encodePacket({ - nonce, - transmitKey, + client, + destination, + ephemeralPublic, + footer, + headers, id, - state, + isClient: true, + keypair, message, - ephemeralPublic, + nonce, + options, profile, - keypair, - isClient: true, - destination + state, + transmitKey }); + msgSent(`Packet Size ${packet.length}`); return promise((accept, reject) => { server.send(packet, port, ip, (error) => { if (error) { diff --git a/udsp/processEvent.js b/udsp/processEvent.js new file mode 100644 index 00000000..487790fe --- /dev/null +++ b/udsp/processEvent.js @@ -0,0 +1,24 @@ +import { stringify, get } from 'Acid'; +import { failed, info } from '#logs'; +export async function processEvent(request, eventSource, source) { + const { + body, + sid, + evnt, + act + } = request; + const { + events, + actions + } = eventSource; + const eventName = act || evnt; + const method = (act) ? actions.get(act) : events.get(evnt); + info(`Request:${eventName} RequestID: ${sid}`); + if (method) { + console.log(request); + const hasResponse = await method(request, source); + return; + } else { + return failed(`Invalid method name given. ${stringify(request)}`); + } +} diff --git a/udsp/reply.js b/udsp/reply.js index 02b8967e..d761385a 100644 --- a/udsp/reply.js +++ b/udsp/reply.js @@ -5,8 +5,12 @@ import { decode, encode } from 'msgpackr'; import { success, failed, info, msgReceived, msgSent } from '#logs'; +import { processEvent } from '#udsp/processEvent'; const chunkSize = 700; const transferEncodingTypesChunked = /stream|file|image|string/; +/** + * @todo Add promise to send use the method that Ask uses assign the accept, return it, and when completed execute. +*/ export class Reply { constructor(request, client) { const thisReply = this; @@ -17,7 +21,7 @@ export class Reply { packetIdGenerator } = client; const timeStamp = Date.now(); - thisReply.created = timeStamp; + thisReply.t = timeStamp; thisReply.client = function() { return client; }; @@ -62,7 +66,7 @@ export class Reply { */ state = 0; // Flush Ask Body to Free Memory - flushIn() { + async flushIn() { this.incomingPayload = {}; this.incomingPackets = []; this.incomingChunks = []; @@ -70,7 +74,7 @@ export class Reply { this.totalIncomingPayloadSize = 0; } // Flush Reply Body to Free Memory - flushOut() { + async flushOut() { this.outgoingPayload = {}; this.outgoingPackets = []; this.outgoingChunks = []; @@ -78,17 +82,17 @@ export class Reply { this.totalOutgoingPayloadSize = 0; } // Flush all body - flush() { + async flush() { this.flushOut(); this.flushAsk(); } // Flush All body and remove this reply from the map - destroy() { + async destroy() { this.flush(); this.server().replyQueue.delete(this.sid); } // Raw Send Packet - sendPacket(request, server, client) { + async sendPacket(request, client) { const options = this.options; const headers = this.headers; if (this.options) { @@ -98,9 +102,9 @@ export class Reply { console.log(`Sending msg with headers var`, headers, `this.headers`, this.headers); } console.log('Handover to Server Reply Packet to Send', request, headers, options); - server.send(client, request, headers, options); + client.send(request, headers, options); } - chunk(body) { + async chunk(body) { const chunks = []; const packetLength = body.length; for (let index = 0; index < packetLength;index += chunkSize) { @@ -151,7 +155,7 @@ export class Reply { const server = this.server(); const client = this.client(); eachArray(packetIDs, (id) => { - thisReply.sendPacket(thisReply.outgoingPackets[id], server, client); + thisReply.sendPacket(thisReply.outgoingPackets[id]); }); } replyAll() { @@ -160,10 +164,10 @@ export class Reply { const client = this.client(); console.log('Reply.replyAll', thisReply.outgoingPackets); eachArray(thisReply.outgoingPackets, (packet) => { - thisReply.sendPacket(packet, server, client); + thisReply.sendPacket(packet); }); } - received(message) { + async received(message) { const thisReply = this; const { body, @@ -201,15 +205,15 @@ export class Reply { thisReply.state = 2; } if (thisReply.state === 2) { - thisReply.assemble(); + await thisReply.assemble(); } } - assemble() { + async assemble() { const thisReply = this; const { transferEncoding } = thisReply; if (thisReply.totalIncomingPackets === 1) { thisReply.request = thisReply.incomingPackets[0]; - return thisReply.process(); + return thisReply.processRequest(); } const packet = thisReply.incomingPackets[0]; eachArray(thisReply.incomingPackets, (item) => { @@ -223,32 +227,12 @@ export class Reply { thisReply.request.body = decode(thisReply.request.body); } } - thisReply.flushOut(); + await thisReply.flushOut(); } - async process() { + async processRequest() { const thisReply = this; - console.log(thisReply); const request = thisReply.request; - const { - body, - sid, - evnt, - act - } = request; - const { - events, - actions - } = thisReply.server(); - const eventName = act || evnt; - const method = (act) ? actions.get(act) : events.get(evnt); - info(`Request:${eventName} RequestID: ${sid}`); - if (method) { - console.log(request); - const hasResponse = await method(request, thisReply); - return; - } else { - return failed(`Invalid method name given. ${stringify(request)}`); - } + processEvent(request, thisReply); } } export function reply(packet, client) { diff --git a/udsp/request.js b/udsp/request.js index 7b37c90a..311e15f9 100644 --- a/udsp/request.js +++ b/udsp/request.js @@ -4,9 +4,10 @@ import { import { promise, construct } from 'Acid'; import { Ask } from './ask.js'; imported('Request'); -export async function request(message, sendAsIs) { - const thisClient = this; +export async function request(message, options = {}) { + const client = this; + options.client = client; info(`Requested Body`, message); - const ask = await (construct(Ask, [message, thisClient])); + const ask = await (construct(Ask, [message, options])); return ask; } diff --git a/udsp/server/actions/file.js b/udsp/server/actions/file.js index 45c50802..f485ff0b 100644 --- a/udsp/server/actions/file.js +++ b/udsp/server/actions/file.js @@ -4,6 +4,10 @@ import { info } from '#logs'; import { read } from '#utilities/file'; import path from 'path'; const dots = /\./g; +/** + * + * @todo Include file size, extension, & other metadata as mandatory single packet data. + */ export async function file(message, reply) { const { resourceDirectory, diff --git a/udsp/server/clients/index.js b/udsp/server/clients/index.js index e43d5558..2adba048 100644 --- a/udsp/server/clients/index.js +++ b/udsp/server/clients/index.js @@ -3,7 +3,6 @@ import { initialize } from './initialize.js'; import { created } from './created.js'; import { destroy } from './destroy.js'; import { reKey } from './reKey.js'; -import { send } from './send.js'; import { state } from './state.js'; // import { createResponse } from './message.js'; import { received } from './received.js'; @@ -47,11 +46,12 @@ export class Client { this.newKeys = newKeypair; info(`socket EVENT -> reKey - ID:${this.id}`); } - async send(message, frameHeaders) { + async send(message, headers, options) { const server = this.server(); - console.log(server); - await send(this, message, frameHeaders, server); - info(`socket EVENT -> send - ID:${this.id}`); + const client = this; + await server.send(message, headers, options, client); + await server.clientEvent('send', client); + msgSent(`socket Sent -> ID: ${client.id}`); } async received(message, frameHeaders) { const server = this.server(); diff --git a/udsp/server/clients/send.js b/udsp/server/clients/send.js deleted file mode 100644 index 61497026..00000000 --- a/udsp/server/clients/send.js +++ /dev/null @@ -1,8 +0,0 @@ -import { - success, failed, imported, msgSent, info, msgReceived -} from '#logs'; -export async function send(client, message, headers, options, server) { - await server.send(client, message, headers, options); - await server.clientEvent('send', client); - msgSent(`socket Sent -> ID: ${client.id}`); -} diff --git a/udsp/server/send.js b/udsp/server/send.js index 1489ee35..bff717e5 100644 --- a/udsp/server/send.js +++ b/udsp/server/send.js @@ -2,7 +2,7 @@ import { success, failed, imported, msgSent, info } from '#logs'; import { encodePacket } from '#udsp/encodePacket'; -export async function send(client, message, headers, options) { +export async function send(message, headers, options, client) { const { address, port,