Skip to content

Commit

Permalink
UDSP Protocol packet resturcture
Browse files Browse the repository at this point in the history
standardize method, data, and headers (metadata)
Bring API closer to HTTP
  • Loading branch information
Universal Web committed Jun 28, 2023
1 parent 8e273ad commit e0a43a4
Show file tree
Hide file tree
Showing 9 changed files with 173 additions and 104 deletions.
59 changes: 14 additions & 45 deletions udsp/ask.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,66 +11,35 @@ import { decode, encode } from 'msgpackr';
import {
failed, info, msgReceived, msgSent
} from '#logs';
import { assembleData } from './request/assembleData.js';
import { Base } from './request/base.js';
import { bufferPacketization } from './request/bufferPacketization.js';
import { request } from '#udsp/request';
export class Ask extends Base {
constructor(config, source) {
super(config, source);
const {
message,
options,
} = config;
constructor(requestObject, options = {}, source) {
super(options, source);
const {
queue,
packetIdGenerator,
maxPacketSize
maxPacketSize,
} = source;
assign(this.request, message);
// sid is a Stream ID
const {
data,
head,
method
} = requestObject;
console.log('Ask', requestObject);
const streamId = packetIdGenerator.get();
this.request.sid = streamId;
this.packetTemplate.sid = streamId;
this.id = streamId;
queue.set(streamId, this);
}
async assemble() {
const { contentType } = this;
if (this.data) {
this.data = await assembleData(this.data, this.response, contentType);
console.log('Assembled', this.data);
}
this.destroy();
await this.accept(this);
}
async send(data) {
const thisAsk = this;
const {
packetTemplate,
contentType,
maxPacketSize,
sid
} = this;
if (data) {
this.request.data = data;
}
console.log('Reply.send', this.response);
if (this.request.data) {
if (!isBuffer(this.request.data)) {
this.request.data = this.dataToBuffer(this.request.data);
}
this.totalReplyDataSize = request.data?.length;
if (head) {
this.request.head = head;
}
if (this.contentType) {
this.request.head.contentType = this.contentType;
if (method) {
this.request.method = method;
}
await bufferPacketization(this.request.data, sid, this.outgoingPackets, maxPacketSize, contentType);
const awaitingResult = promise((accept) => {
thisAsk.accept = accept;
});
thisAsk.sendAll();
return awaitingResult;
queue.set(streamId, this);
}
isAsk = true;
type = 'ask';
Expand Down
27 changes: 27 additions & 0 deletions udsp/base.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { construct, UniqID } from '@universalweb/acid';
import { actions } from './server/actions/index.js';
class UDSP {
constructor() {
return this.initialize();
}
async initialize() {
}
gracePeriod = 30000;
maxPacketSize = 1328;
connectionIdSize = 8;
actions = construct(Map);
stateCodeDescriptions = ['initializing', 'initialized', 'failed to initialize'];
state = 0;
/*
* A puzzle used to challenge clients to ensure authenticity, connection liveliness, and congestion control.
* Slow down account creation.
* Generate crypto currency or compute work.
*/
puzzleFlag = false;
/*
* IPv6 preferred.
*/
ipVersion = 'udp6';
events = construct(Map);
streamIdGenerator = construct(UniqID);
}
11 changes: 7 additions & 4 deletions udsp/client/connect.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,18 @@ export async function connect(message = {}) {
const thisClient = this;
// opn stands for open meaning connect to a server
message.intro = 'Hello Server!';
const result = await thisClient.fetch('opn', message);
console.log('Connect response', result.response.data);
const connectRequest = thisClient.request({
message
}, 'connect');
console.log('Connect request', connectRequest);
const connectResponse = await connectRequest.send();
const {
data,
state,
time,
// server connection ID
sid
} = result;
} = connectResponse;
if (state === 1 && sid) {
connected(data);
thisClient.state = 1;
Expand All @@ -27,5 +30,5 @@ export async function connect(message = {}) {
}
}
console.log('-------CLIENT CONNECTED-------\n');
return result;
return connectResponse;
}
4 changes: 2 additions & 2 deletions udsp/client/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,8 @@ export class Client {
this.socket.close();
Client.connections.delete(this.id);
}
ask(message) {
const ask = construct(Ask, [message, this]);
ask(message, options) {
const ask = construct(Ask, [message, options, this]);
return ask;
}
connect = clientConnect;
Expand Down
19 changes: 10 additions & 9 deletions udsp/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@ import {
} from '#logs';
import { promise, construct, isString } from '@universalweb/acid';
imported('Request');
export async function request(source, options) {
if (options) {
if (isString(options)) {
source.method = options;
}
source.method = options.method;
export function request(data, options) {
let target = data;
if (isString(options)) {
target = {
method: options,
data
};
}
info(`Request Function: ${source.method}`);
const ask = this.ask(source, options);
console.log(ask);
info(`Request Function: ${target.method}`);
const ask = this.ask(target, options);
console.log(target, ask);
return ask;
}
77 changes: 55 additions & 22 deletions udsp/request/base.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,21 @@ import { flushOutgoing, flushIncoming, flush } from './flush.js';
import { sendPacketsById } from './sendPacketsById.js';
import { sendAll } from './sendAll.js';
import { onPacket } from './onPacket.js';
import { isBuffer, isPlainObject, isString } from '@universalweb/acid';
import {
isBuffer, isPlainObject, isString, promise, assign
} from '@universalweb/acid';
import { encode } from 'msgpackr';
import { request } from '#udsp/request';
import { assembleData } from './assembleData.js';
export class Base {
constructor(config, source) {
const { events } = config;
constructor(options = {}, source) {
const { events, } = options;
const timeStamp = Date.now();
this.created = timeStamp;
this.source = function() {
return source;
};
const {
queue,
packetIdGenerator,
maxPacketSize
} = source;
const { maxPacketSize } = source;
if (events) {
this.on(events);
}
Expand All @@ -31,27 +30,23 @@ export class Base {
}
}
code(codeNumber) {
const source = (this.isAsk) ? this.request : this.response;
if (this.isAsk) {
this.request.head.code = codeNumber;
source.head.code = codeNumber;
} else {
this.response.head.code = codeNumber;
source.head.code = codeNumber;
}
}
setHeader(headerName, headerValue) {
if (this.isAsk) {
if (!this.request.head) {
this.request.head = {};
}
this.request.head[headerName] = headerValue;
}
setHeader(target) {
const source = (this.isAsk) ? this.request : this.response;
assign(source.head, target);
}
writeHeader(headerName, headerValue) {
if (this.isReply) {
if (!this.response.head) {
this.response.head = {};
}
this.response.head[headerName] = headerValue;
const source = (this.isAsk) ? this.request : this.response;
if (!source.head) {
source.head = {};
}
source.head[headerName] = headerValue;
}
dataToBuffer(data) {
if (isBuffer(data)) {
Expand All @@ -63,6 +58,44 @@ export class Base {
}
return Buffer.from(data);
}
async assemble() {
const { contentType } = this;
if (this.data) {
this.data = await assembleData(this.data, this.response, contentType);
console.log('Assembled', this.data);
}
this.destroy();
await this.accept(this);
}
async send() {
const thisSource = this;
const {
packetTemplate,
contentType,
maxPacketSize,
sid,
isAsk,
isReply
} = this;
const message = (isAsk) ? this.request : this.response;
console.log(`${this.type}.send`, message);
if (message.data) {
if (!isBuffer(message.data)) {
message.data = this.dataToBuffer(message.data);
}
this.totalReplyDataSize = request.data?.length;
}
if (this.contentType) {
message.head.contentType = this.contentType;
}
// await bufferPacketization(this);
const awaitingResult = promise((accept) => {
thisSource.accept = accept;
});
console.log(`BASE ${this.type}`, this);
this.sendAll();
return awaitingResult;
}
destroy = destroy;
sendEnd = sendEnd;
sendPacketsById = sendPacketsById;
Expand Down
62 changes: 41 additions & 21 deletions udsp/request/bufferPacketization.js
Original file line number Diff line number Diff line change
@@ -1,28 +1,44 @@
import { assign } from '@universalweb/acid';
export async function bufferPacketization(data, sid, packets = [], maxPacketSize, contentType) {
const totalPayloadSize = data?.length;
import { buildMessage } from './buildMessage.js';
import { request } from '#udsp/request';
export async function bufferPacketization(source) {
const {
maxPacketSize,
contentType,
method,
id: sid,
isAsk,
outgoingPackets
} = source;
const message = (isAsk) ? this.request : this.response;
const data = message.data;
const dataSize = data?.length;
let currentBytePosition = 0;
let packetId = 0;
if (totalPayloadSize > maxPacketSize) {
if (dataSize > maxPacketSize) {
console.log('Body size', data.length);
while (currentBytePosition < totalPayloadSize) {
while (currentBytePosition < dataSize) {
const endIndex = currentBytePosition + maxPacketSize;
const safeEndIndex = endIndex > totalPayloadSize ? totalPayloadSize : endIndex;
const safeEndIndex = endIndex > dataSize ? dataSize : endIndex;
const chunk = data.subarray(currentBytePosition, safeEndIndex);
console.log('chunksize', chunk.length, currentBytePosition, endIndex);
const packet = assign({
const packet = {
pid: packetId,
sid
});
endIndex: safeEndIndex,
sid,
head: {}
};
if (packetId === 0) {
if (contentType) {
packet.de = contentType;
}
packet.tps = totalPayloadSize;
buildMessage({
method,
contentType,
dataSize,
packet
});
}
packet.data = chunk;
packets[packetId] = packets;
if (endIndex >= totalPayloadSize) {
outgoingPackets[packetId] = outgoingPackets;
if (endIndex >= dataSize) {
packet.end = true;
break;
}
Expand All @@ -32,13 +48,17 @@ export async function bufferPacketization(data, sid, packets = [], maxPacketSize
} else {
const packet = {
pid: 0,
end: true
end: true,
data
};
if (contentType) {
packet.de = contentType;
}
packets[0] = packet;
buildMessage({
method,
contentType,
dataSize,
packet
});
console.log(source);
outgoingPackets[0] = packet;
}
console.log('bufferToPackets', packets);
return packets;
console.log('bufferToPackets', outgoingPackets);
}
16 changes: 16 additions & 0 deletions udsp/request/buildMessage.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
export function buildMessage({
method,
contentType,
dataSize,
packet
}) {
if (contentType) {
packet.head.contentType = contentType;
}
if (dataSize) {
packet.head.dataSize = dataSize;
}
if (method) {
packet.method = method;
}
}
2 changes: 1 addition & 1 deletion udsp/request/sendAll.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { eachArray } from '@universalweb/acid';
export async function sendAll() {
const thisReply = this;
console.log('outgoingPackets', thisReply.outgoingPackets.length);
console.log('outgoingPackets', thisReply.outgoingPackets);
eachArray(thisReply.outgoingPackets, (packet) => {
thisReply.sendPacket({
message: packet
Expand Down

0 comments on commit e0a43a4

Please sign in to comment.