From 8920687a478c5dc6833771a661c5bc8516db3079 Mon Sep 17 00:00:00 2001 From: rjawesome Date: Fri, 2 Aug 2024 10:58:56 -0700 Subject: [PATCH] max records per edge/query --- src/edge_manager.ts | 86 ++++++++++++++++++++++----------------------- 1 file changed, 43 insertions(+), 43 deletions(-) diff --git a/src/edge_manager.ts b/src/edge_manager.ts index b0a46d4..1d27e17 100644 --- a/src/edge_manager.ts +++ b/src/edge_manager.ts @@ -13,6 +13,9 @@ import { RecordsByQEdgeID } from './results_assembly/query_results'; import path from 'path'; import { promises as fs } from 'fs'; +const MAX_RECORDS_PER_EDGE = parseInt(process.env.MAX_RECORDS_PER_EDGE ?? '100000'); +const MAX_RECORDS_PER_QUERY = parseInt(process.env.MAX_RECORDS_TOTAL ?? '200000'); + export default class QueryEdgeManager { private _qEdges: QEdge[]; private _metaKG: MetaKG; @@ -299,45 +302,6 @@ export default class QueryEdgeManager { currentQEdge.storeRecords(filteredRecords); } - /** - * Unused - */ - // updateNeighborsEdgeRecords(currentQEdge) { - // //update and filter only immediate neighbors - // debug(`Updating neighbors...`); - // const currentQEdgeID = currentQEdge.getID(); - // //get neighbors of this edges subject that are not this edge - // let left_connections = currentQEdge.subject.getConnections(); - // left_connections = left_connections.filter((qEdgeID) => qEdgeID !== currentQEdgeID); - // //get neighbors of this edges object that are not this edge - // let right_connections = currentQEdge.object.getConnections(); - // right_connections = right_connections.filter((qEdgeID) => qEdgeID !== currentQEdgeID); - // debug(`(${left_connections})<--edge neighbors-->(${right_connections})`); - // if (left_connections.length) { - // //find edge by id - // left_connections.forEach((qEdgeID) => { - // const edge = this._qEdges.find((edge) => edge.getID() == qEdgeID); - // if (edge && edge.records.length) { - // debug(`Updating "${edge.getID()}" neighbor edge of ${currentQEdgeID}`); - // debug(`Updating neighbor (X)<----()`); - // this.updateEdgeRecords(edge); - // } - // }); - // } - // - // if (right_connections.length) { - // //find edge by id - // right_connections.forEach((neighbor_id) => { - // const edge = this._qEdges.find((edge) => edge.getID() == neighbor_id); - // if (edge && edge.records.length) { - // debug(`Updating "${edge.getID()}" neighbor edge of ${currentQEdgeID}`); - // debug(`Updating neighbor ()---->(X)`); - // this.updateEdgeRecords(edge); - // } - // }); - // } - // } - updateAllOtherEdges(currentQEdge: QEdge): void { //update and filter all other edges debug(`Updating all other edges...`); @@ -353,10 +317,8 @@ export default class QueryEdgeManager { _createBatchQueryHandler(qEdge: QEdge, metaKG: MetaKG): BatchEdgeQueryHandler { const handler = new BatchEdgeQueryHandler(metaKG, { - caching: this.options.caching, - submitter: this.options.submitter, + ...this.options, recordHashEdgeAttributes: config.EDGE_ATTRIBUTES_USED_IN_RECORD_HASH, - provenanceUsesServiceProvider: this.options.provenanceUsesServiceProvider, } as BatchEdgeQueryOptions); handler.setEdges(qEdge); return handler; @@ -400,7 +362,7 @@ export default class QueryEdgeManager { ); debug(`(5) Executing current edge >> "${currentQEdge.getID()}"`); //execute current edge query - const queryRecords = await queryBatchHandler.query(queryBatchHandler.qEdges, unavailableAPIs); + let queryRecords = await queryBatchHandler.query(queryBatchHandler.qEdges, unavailableAPIs); this.logs = [...this.logs, ...queryBatchHandler.logs]; if (queryRecords === undefined) return; // create an edge execution summary @@ -410,6 +372,7 @@ export default class QueryEdgeManager { const cached = this.logs.filter( ({ data }) => data?.qEdgeID === currentQEdge.id && data?.type === 'cacheHit', ).length; + total += cached; this.logs .filter(({ data }) => data?.qEdgeID === currentQEdge.id && data?.type === 'query') .forEach(({ data }) => { @@ -437,6 +400,43 @@ export default class QueryEdgeManager { span.finish(); return; } + + // Check if record count threatens stability + const totalRecords = + this._qEdges.reduce((total, qEdge) => { + return total + qEdge.records.length; + }, 0) + queryRecords.length; + let maxRecordsMessage = [ + `Qedge ${currentQEdge.id}`, + `obtained ${queryRecords.length} records,`, + queryRecords.length === MAX_RECORDS_PER_EDGE ? 'meeting' : 'exceeding', + `maximum of ${MAX_RECORDS_PER_QUERY}`, + `Truncating records for this edge to ${MAX_RECORDS_PER_EDGE}.`, + `Your query may be too general?`, + ]; + if (totalRecords > MAX_RECORDS_PER_QUERY) { + maxRecordsMessage = maxRecordsMessage.slice(0, 2); + maxRecordsMessage.push( + ...[ + `totalling ${totalRecords} for this query.`, + `This exceeds the per-query maximum of ${MAX_RECORDS_PER_QUERY}.`, + `For stability purposes, this query is terminated.`, + `Please consider further refining your query.`, + ], + ); + debug(maxRecordsMessage.join(' ')); + this.logs.push(new LogEntry('WARNING', null, maxRecordsMessage.join(' ')).getLog()); + Telemetry.captureException(new Error(`Stopped on globalMaxRecords (exceeded ${MAX_RECORDS_PER_QUERY})`)); + return; + } + + if (queryRecords.length > MAX_RECORDS_PER_EDGE) { + debug(maxRecordsMessage.join(' ')); + this.logs.push(new LogEntry('WARNING', null, maxRecordsMessage.join(' ')).getLog()); + + queryRecords = queryRecords.slice(0, MAX_RECORDS_PER_EDGE); + } + // storing records will trigger a node entity count update currentQEdge.storeRecords(queryRecords);