Skip to content

Commit

Permalink
max records per edge/query
Browse files Browse the repository at this point in the history
  • Loading branch information
rjawesome committed Aug 2, 2024
1 parent 3ea03a7 commit 8920687
Showing 1 changed file with 43 additions and 43 deletions.
86 changes: 43 additions & 43 deletions src/edge_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ import { RecordsByQEdgeID } from './results_assembly/query_results';
import path from 'path';
import { promises as fs } from 'fs';

const MAX_RECORDS_PER_EDGE = parseInt(process.env.MAX_RECORDS_PER_EDGE ?? '100000');
const MAX_RECORDS_PER_QUERY = parseInt(process.env.MAX_RECORDS_TOTAL ?? '200000');

export default class QueryEdgeManager {
private _qEdges: QEdge[];
private _metaKG: MetaKG;
Expand Down Expand Up @@ -299,45 +302,6 @@ export default class QueryEdgeManager {
currentQEdge.storeRecords(filteredRecords);
}

/**
* Unused
*/
// updateNeighborsEdgeRecords(currentQEdge) {
// //update and filter only immediate neighbors
// debug(`Updating neighbors...`);
// const currentQEdgeID = currentQEdge.getID();
// //get neighbors of this edges subject that are not this edge
// let left_connections = currentQEdge.subject.getConnections();
// left_connections = left_connections.filter((qEdgeID) => qEdgeID !== currentQEdgeID);
// //get neighbors of this edges object that are not this edge
// let right_connections = currentQEdge.object.getConnections();
// right_connections = right_connections.filter((qEdgeID) => qEdgeID !== currentQEdgeID);
// debug(`(${left_connections})<--edge neighbors-->(${right_connections})`);
// if (left_connections.length) {
// //find edge by id
// left_connections.forEach((qEdgeID) => {
// const edge = this._qEdges.find((edge) => edge.getID() == qEdgeID);
// if (edge && edge.records.length) {
// debug(`Updating "${edge.getID()}" neighbor edge of ${currentQEdgeID}`);
// debug(`Updating neighbor (X)<----()`);
// this.updateEdgeRecords(edge);
// }
// });
// }
//
// if (right_connections.length) {
// //find edge by id
// right_connections.forEach((neighbor_id) => {
// const edge = this._qEdges.find((edge) => edge.getID() == neighbor_id);
// if (edge && edge.records.length) {
// debug(`Updating "${edge.getID()}" neighbor edge of ${currentQEdgeID}`);
// debug(`Updating neighbor ()---->(X)`);
// this.updateEdgeRecords(edge);
// }
// });
// }
// }

updateAllOtherEdges(currentQEdge: QEdge): void {
//update and filter all other edges
debug(`Updating all other edges...`);
Expand All @@ -353,10 +317,8 @@ export default class QueryEdgeManager {

_createBatchQueryHandler(qEdge: QEdge, metaKG: MetaKG): BatchEdgeQueryHandler {
const handler = new BatchEdgeQueryHandler(metaKG, {
caching: this.options.caching,
submitter: this.options.submitter,
...this.options,
recordHashEdgeAttributes: config.EDGE_ATTRIBUTES_USED_IN_RECORD_HASH,
provenanceUsesServiceProvider: this.options.provenanceUsesServiceProvider,
} as BatchEdgeQueryOptions);
handler.setEdges(qEdge);
return handler;
Expand Down Expand Up @@ -400,7 +362,7 @@ export default class QueryEdgeManager {
);
debug(`(5) Executing current edge >> "${currentQEdge.getID()}"`);
//execute current edge query
const queryRecords = await queryBatchHandler.query(queryBatchHandler.qEdges, unavailableAPIs);
let queryRecords = await queryBatchHandler.query(queryBatchHandler.qEdges, unavailableAPIs);
this.logs = [...this.logs, ...queryBatchHandler.logs];
if (queryRecords === undefined) return;
// create an edge execution summary
Expand All @@ -410,6 +372,7 @@ export default class QueryEdgeManager {
const cached = this.logs.filter(
({ data }) => data?.qEdgeID === currentQEdge.id && data?.type === 'cacheHit',
).length;
total += cached;
this.logs
.filter(({ data }) => data?.qEdgeID === currentQEdge.id && data?.type === 'query')
.forEach(({ data }) => {
Expand Down Expand Up @@ -437,6 +400,43 @@ export default class QueryEdgeManager {
span.finish();
return;
}

// Check if record count threatens stability
const totalRecords =
this._qEdges.reduce((total, qEdge) => {
return total + qEdge.records.length;
}, 0) + queryRecords.length;
let maxRecordsMessage = [
`Qedge ${currentQEdge.id}`,
`obtained ${queryRecords.length} records,`,
queryRecords.length === MAX_RECORDS_PER_EDGE ? 'meeting' : 'exceeding',
`maximum of ${MAX_RECORDS_PER_QUERY}`,
`Truncating records for this edge to ${MAX_RECORDS_PER_EDGE}.`,
`Your query may be too general?`,
];
if (totalRecords > MAX_RECORDS_PER_QUERY) {
maxRecordsMessage = maxRecordsMessage.slice(0, 2);
maxRecordsMessage.push(
...[
`totalling ${totalRecords} for this query.`,
`This exceeds the per-query maximum of ${MAX_RECORDS_PER_QUERY}.`,
`For stability purposes, this query is terminated.`,
`Please consider further refining your query.`,
],
);
debug(maxRecordsMessage.join(' '));
this.logs.push(new LogEntry('WARNING', null, maxRecordsMessage.join(' ')).getLog());
Telemetry.captureException(new Error(`Stopped on globalMaxRecords (exceeded ${MAX_RECORDS_PER_QUERY})`));
return;
}

if (queryRecords.length > MAX_RECORDS_PER_EDGE) {
debug(maxRecordsMessage.join(' '));
this.logs.push(new LogEntry('WARNING', null, maxRecordsMessage.join(' ')).getLog());

queryRecords = queryRecords.slice(0, MAX_RECORDS_PER_EDGE);
}

// storing records will trigger a node entity count update
currentQEdge.storeRecords(queryRecords);

Expand Down

0 comments on commit 8920687

Please sign in to comment.