Skip to content

Commit

Permalink
use uWebsockets.js as Socket.IO engine
Browse files Browse the repository at this point in the history
  • Loading branch information
igorls committed Oct 17, 2022
1 parent f7a18b5 commit 24ad40c
Show file tree
Hide file tree
Showing 5 changed files with 403 additions and 365 deletions.
208 changes: 208 additions & 0 deletions api/helpers/functions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,214 @@ import {createHash} from "crypto";
import * as _ from "lodash";
import {FastifyInstance, FastifyReply, FastifyRequest, FastifySchema, HTTPMethods} from "fastify";
import got from "got";
import {checkFilter, hLog} from "../../helpers/common_functions";

const deltaQueryFields = ['code', 'table', 'scope', 'payer'];

export async function streamPastDeltas(fastify: FastifyInstance, socket, data) {
const search_body = {
query: {bool: {must: []}},
sort: {block_num: 'asc'},
};
await addBlockRangeOpts(data, search_body, fastify);
deltaQueryFields.forEach(f => {
addTermMatch(data, search_body, f);
});
const responseQueue = [];
let counter = 0;
const init_response = await fastify.elastic.search({
index: fastify.manager.chain + '-delta-*',
scroll: '30s',
size: 20,
body: search_body,
});
responseQueue.push(init_response);
while (responseQueue.length) {
const {body} = responseQueue.shift();
counter += body['hits']['hits'].length;
if (socket.connected) {
socket.emit('message', {
type: 'delta_trace',
mode: 'history',
messages: body['hits']['hits'].map(doc => doc._source),
});
} else {
hLog('LOST CLIENT');
break;
}
if (body['hits'].total.value === counter) {
hLog(`${counter} past deltas streamed to ${socket.id}`);
break;
}

const next_response = await fastify.elastic.scroll({
body: {
scroll_id: body['_scroll_id'],
scroll: '30s'
}
});
responseQueue.push(next_response);
}
}

export async function streamPastActions(fastify: FastifyInstance, socket, data) {
const search_body = {
query: {bool: {must: []}},
sort: {global_sequence: 'asc'},
};

await addBlockRangeOpts(data, search_body, fastify);

if (data.account !== '') {
search_body.query.bool.must.push({
bool: {
should: [
{term: {'notified': data.account}},
{term: {'act.authorization.actor': data.account}},
],
},
});
}

if (data.contract !== '*' && data.contract !== '') {
search_body.query.bool.must.push({'term': {'act.account': data.contract}});
}

if (data.action !== '*' && data.action !== '') {
search_body.query.bool.must.push({'term': {'act.name': data.action}});
}

const onDemandFilters = [];
if (data.filters.length > 0) {
data.filters.forEach(f => {
if (f.field && f.value) {
if (f.field.startsWith('@') && !f.field.startsWith('act.data')) {
const _q = {};
_q[f.field] = f.value;
search_body.query.bool.must.push({'term': _q});
} else {
onDemandFilters.push(f);
}
}
});
}

const responseQueue = [];
let counter = 0;

const init_response = await fastify.elastic.search({
index: fastify.manager.chain + '-action-*',
scroll: '30s',
size: 20,
body: search_body,
});
responseQueue.push(init_response);
while (responseQueue.length) {
const {body} = responseQueue.shift();
const enqueuedMessages = [];
counter += body['hits']['hits'].length;
for (const doc of body['hits']['hits']) {
let allow = false;
if (onDemandFilters.length > 0) {
allow = onDemandFilters.every(filter => {
return checkFilter(filter, doc._source);
});
} else {
allow = true;
}
if (allow) {
enqueuedMessages.push(doc._source);
}
}
if (socket.connected) {
socket.emit('message', {type: 'action_trace', mode: 'history', messages: enqueuedMessages});
} else {
hLog('LOST CLIENT');
break;
}
if (body['hits'].total.value === counter) {
hLog(`${counter} past actions streamed to ${socket.id}`);
break;
}
if (init_response.body.hits.total.value < 1000) {
const next_response = await fastify.elastic.scroll({
body: {scroll_id: body['_scroll_id'], scroll: '30s'}
});
responseQueue.push(next_response);
} else {
hLog('Request too large!');
socket.emit('message', {type: 'action_trace', mode: 'history', messages: []});
}
}
}

export function addTermMatch(data, search_body, field) {
if (data[field] !== '*' && data[field] !== '') {
const termQuery = {};
termQuery[field] = data[field];
search_body.query.bool.must.push({'term': termQuery});
}
}

export async function addBlockRangeOpts(data, search_body, fastify: FastifyInstance) {

let timeRange;
let blockRange;
let head;

if (typeof data['start_from'] === 'string' && data['start_from'] !== '') {
if (!timeRange) {
timeRange = {"@timestamp": {}};
}
timeRange["@timestamp"]['gte'] = data['start_from'];
}

if (typeof data['read_until'] === 'string' && data['read_until'] !== '') {
if (!timeRange) {
timeRange = {"@timestamp": {}};
}
timeRange["@timestamp"]['lte'] = data['read_until'];
}

if (typeof data['start_from'] === 'number' && data['start_from'] !== 0) {
if (!blockRange) {
blockRange = {"block_num": {}};
}
if (data['start_from'] < 0) {
if (!head) {
head = (await fastify.eosjs.rpc.get_info()).head_block_num;
}
blockRange["block_num"]['gte'] = head + data['start_from'];
} else {
blockRange["block_num"]['gte'] = data['start_from'];
}
}

if (typeof data['read_until'] === 'number' && data['read_until'] !== 0) {
if (!blockRange) {
blockRange = {"block_num": {}};
}
if (data['read_until'] < 0) {
if (!head) {
head = (await fastify.eosjs.rpc.get_info()).head_block_num;
}
blockRange["block_num"]['lte'] = head + data['read_until'];
} else {
blockRange["block_num"]['lte'] = data['read_until'];
}
}

if (timeRange) {
search_body.query.bool.must.push({
range: timeRange,
});
}
if (blockRange) {
search_body.query.bool.must.push({
range: blockRange,
});
}
}

export function extendResponseSchema(responseProps: any) {
const props = {
Expand Down
Loading

0 comments on commit 24ad40c

Please sign in to comment.