Skip to content

Commit

Permalink
allow queries to be aborted
Browse files Browse the repository at this point in the history
  • Loading branch information
rjawesome committed Jun 24, 2024
1 parent da2271f commit b86e66a
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 16 deletions.
16 changes: 9 additions & 7 deletions src/batch_edge_query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ export default class BatchEdgeQueryHandler {
/**
* @private
*/
async _queryAPIEdges(APIEdges: APIEdge[], unavailableAPIs: UnavailableAPITracker = {}): Promise<Record[]> {
async _queryAPIEdges(APIEdges: APIEdge[], unavailableAPIs: UnavailableAPITracker = {}, abortSignal?: AbortSignal): Promise<Record[]> {
const executor = new call_api(APIEdges, this.options, redisClient);
const records: Record[] = await executor.query(this.resolveOutputIDs, unavailableAPIs);
const records: Record[] = await executor.query(this.resolveOutputIDs, unavailableAPIs, abortSignal);
this.logs = [...this.logs, ...executor.logs];
return records;
}
Expand Down Expand Up @@ -123,18 +123,20 @@ export default class BatchEdgeQueryHandler {
});
}

async query(qEdges: QEdge | QEdge[], unavailableAPIs: UnavailableAPITracker = {}): Promise<Record[]> {
async query(qEdges: QEdge | QEdge[], unavailableAPIs: UnavailableAPITracker = {}, abortSignal?: AbortSignal): Promise<Record[]> {
debug('Node Update Start');
// it's now a single edge but convert to arr to simplify refactoring
qEdges = Array.isArray(qEdges) ? qEdges : [qEdges];
const nodeUpdate = new NodesUpdateHandler(qEdges);
// difference is there is no previous edge info anymore
await nodeUpdate.setEquivalentIDs(qEdges);
await nodeUpdate.setEquivalentIDs(qEdges, abortSignal);
await this._rmEquivalentDuplicates(qEdges);
debug('Node Update Success');

if (abortSignal?.aborted) return [];

const cacheHandler = new CacheHandler(this.caching, this.metaKG, this.options);
const { cachedRecords, nonCachedQEdges } = await cacheHandler.categorizeEdges(qEdges);
const { cachedRecords, nonCachedQEdges } = await cacheHandler.categorizeEdges(qEdges, abortSignal);
this.logs = [...this.logs, ...cacheHandler.logs];
let queryRecords: Record[];

Expand All @@ -154,8 +156,8 @@ export default class BatchEdgeQueryHandler {
}
const expanded_APIEdges = this._expandAPIEdges(APIEdges);
debug('Start to query APIEdges....');
queryRecords = await this._queryAPIEdges(expanded_APIEdges, unavailableAPIs);
if (queryRecords === undefined) return;
queryRecords = await this._queryAPIEdges(expanded_APIEdges, unavailableAPIs, abortSignal);
if (queryRecords === undefined || abortSignal?.aborted) return;
debug('APIEdges are successfully queried....');
queryRecords = await this._postQueryFilter(queryRecords);
debug(`Total number of records is (${queryRecords.length})`);
Expand Down
5 changes: 4 additions & 1 deletion src/cache_handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ export default class CacheHandler {
);
}

async categorizeEdges(qEdges: QEdge[]): Promise<{ cachedRecords: Record[]; nonCachedQEdges: QEdge[] }> {
async categorizeEdges(qEdges: QEdge[], abortSignal?: AbortSignal): Promise<{ cachedRecords: Record[]; nonCachedQEdges: QEdge[] }> {
if (this.cacheEnabled === false || process.env.INTERNAL_DISABLE_REDIS === 'true') {
return {
cachedRecords: [],
Expand All @@ -123,13 +123,16 @@ export default class CacheHandler {
let cachedRecords: Record[] = [];
debug('Begin edge cache lookup...');
await async.eachSeries(qEdges, async (qEdge) => {
if (abortSignal?.aborted) return;
const qEdgeMetaKGHash = this._hashEdgeByMetaKG(qEdge.getHashedEdgeRepresentation());
const unpackedRecords: Record[] = await new Promise((resolve) => {
const redisID = 'bte:edgeCache:' + qEdgeMetaKGHash;
redisClient.client.usingLock([`redisLock:${redisID}`], 600000, async () => {
try {
const compressedRecordPack = await redisClient.client.hgetallTimeout(redisID);

if (abortSignal?.aborted) resolve([]);

if (compressedRecordPack && Object.keys(compressedRecordPack).length) {
const recordPack = [];

Expand Down
6 changes: 4 additions & 2 deletions src/edge_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -384,9 +384,11 @@ export default class QueryEdgeManager {
debug(logMessage);
}

async executeEdges(): Promise<boolean> {
async executeEdges(abortSignal?: AbortSignal): Promise<boolean> {
const unavailableAPIs: UnavailableAPITracker = {};
while (this.getEdgesNotExecuted()) {
if (abortSignal?.aborted) return false;

const span = Telemetry.startSpan({ description: 'edgeExecution' });
//next available/most efficient edge
const currentQEdge = this.getNext();
Expand All @@ -402,7 +404,7 @@ export default class QueryEdgeManager {
);
debug(`(5) Executing current edge >> "${currentQEdge.getID()}"`);
//execute current edge query
const queryRecords = await queryBatchHandler.query(queryBatchHandler.qEdges, unavailableAPIs);
const queryRecords = await queryBatchHandler.query(queryBatchHandler.qEdges, unavailableAPIs, abortSignal);
this.logs = [...this.logs, ...queryBatchHandler.logs];
if (queryRecords === undefined) return;
// create an edge execution summary
Expand Down
6 changes: 4 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ export default class TRAPIQueryHandler {
];
};

async query(): Promise<void> {
async query(abortSignal?: AbortSignal): Promise<void> {
this._initializeResponse();
await this.addQueryNodes();

Expand Down Expand Up @@ -681,12 +681,14 @@ export default class TRAPIQueryHandler {
}
const manager = new EdgeManager(queryEdges, metaKG, this.options);

const executionSuccess = await manager.executeEdges();
const executionSuccess = await manager.executeEdges(abortSignal);
this.logs = [...this.logs, ...manager.logs];
if (!executionSuccess) {
return;
}

if (abortSignal?.aborted) return;

const span3 = Telemetry.startSpan({ description: 'resultsAssembly' });

// update query graph
Expand Down
8 changes: 4 additions & 4 deletions src/update_nodes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,17 @@ export default class NodesUpdateHandler {
* Resolve input ids
* @param {object} curies - each key represents the category, e.g. gene, value is an array of curies.
*/
async _getEquivalentIDs(curies: ResolverInput): Promise<SRIResolverOutput> {
async _getEquivalentIDs(curies: ResolverInput, abortSignal?: AbortSignal): Promise<SRIResolverOutput> {
// const resolver = new id_resolver.Resolver('biolink');
// const equivalentIDs = await resolver.resolve(curies);
return await resolveSRI(curies);
return await resolveSRI(curies, abortSignal);
}

async setEquivalentIDs(qEdges: QEdge[]): Promise<void> {
async setEquivalentIDs(qEdges: QEdge[], abortSignal?: AbortSignal): Promise<void> {
debug(`Getting equivalent IDs...`);
const curies = this._getCuries(this.qEdges);
debug(`curies: ${JSON.stringify(curies)}`);
const equivalentIDs = await this._getEquivalentIDs(curies);
const equivalentIDs = await this._getEquivalentIDs(curies, abortSignal);
qEdges.map((qEdge) => {
const edgeEquivalentIDs = Object.keys(equivalentIDs)
.filter((key) => qEdge.getInputCurie().includes(key))
Expand Down

0 comments on commit b86e66a

Please sign in to comment.