Skip to content

Commit

Permalink
Merge branch 'move-redisclient' of https://github.com/biothings/bte_t…
Browse files Browse the repository at this point in the history
  • Loading branch information
tokebe committed Mar 22, 2024
2 parents 9bb68f1 + e62eece commit 40313ac
Show file tree
Hide file tree
Showing 13 changed files with 51 additions and 516 deletions.
19 changes: 0 additions & 19 deletions __test__/integration/BatchEdgeQueryHandler.test.ts

This file was deleted.

9 changes: 5 additions & 4 deletions __test__/integration/TRAPIQueryHandler.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import axios from 'axios';
import axios, { AxiosStatic } from 'axios';
jest.mock('axios');
const mockedAxios = axios as jest.Mocked<AxiosStatic>;

import TRAPIQueryHandler from '../../src/index';
import path from 'path';
Expand All @@ -26,7 +27,7 @@ describe('Testing TRAPIQueryHandler Module', () => {
};
describe('Testing query function', () => {
test('test with one query edge', async () => {
(axios.get as jest.Mock).mockResolvedValue({
(mockedAxios.get as jest.Mock).mockResolvedValue({
data: {
message: {
query_graph: {
Expand Down Expand Up @@ -101,7 +102,7 @@ describe('Testing TRAPIQueryHandler Module', () => {
workflow: [{ id: 'lookup' }],
},
});
(axios.post as jest.Mock).mockResolvedValue({
(mockedAxios.post as jest.Mock).mockResolvedValue({
data: {
'MONDO:0005737': {
id: { identifier: 'MONDO:0005737', label: 'Ebola hemorrhagic fever' },
Expand Down Expand Up @@ -139,6 +140,6 @@ describe('Testing TRAPIQueryHandler Module', () => {
queryHandler.setQueryGraph(OneHopQuery);
await queryHandler.query();
expect(queryHandler.knowledgeGraph.kg).toHaveProperty('nodes');
}, 15000);
}, 30000);
});
});
57 changes: 0 additions & 57 deletions __test__/unittest/redisClient.test.ts

This file was deleted.

5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,11 @@
"@biothings-explorer/node-expansion": "workspace:../node-expansion",
"@biothings-explorer/smartapi-kg": "workspace:../smartapi-kg",
"@biothings-explorer/utils": "workspace:../utils",
"@sentry/node": "^7.74.1",
"async": "^3.2.4",
"@biothings-explorer/types": "workspace:../types",
"biolink-model": "workspace:../biolink-model",
"biomedical_id_resolver": "workspace:../biomedical_id_resolver",
"@sentry/node": "^7.74.1",
"async": "^3.2.4",
"chi-square-p-value": "^1.0.5",
"debug": "^4.3.4",
"ioredis": "^5.3.2",
Expand Down
42 changes: 7 additions & 35 deletions src/batch_edge_query.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import call_api, { RedisClient } from '@biothings-explorer/call-apis';
import call_api from '@biothings-explorer/call-apis';
import { redisClient } from '@biothings-explorer/utils';
import QEdge2APIEdgeHandler, { APIEdge } from './qedge2apiedge';
import NodesUpdateHandler from './update_nodes';
import Debug from 'debug';
Expand All @@ -7,26 +8,25 @@ import CacheHandler from './cache_handler';
import { threadId } from 'worker_threads';
import MetaKG from '@biothings-explorer/smartapi-kg';
import { StampedLog } from '@biothings-explorer/utils';
import { QueryHandlerOptions, redisClient } from '.';
import { QueryHandlerOptions } from '@biothings-explorer/types';
import QEdge from './query_edge';
import { UnavailableAPITracker } from './types';
import { Record } from '@biothings-explorer/api-response-transform';

export interface BatchEdgeQueryOptions extends QueryHandlerOptions {
recordHashEdgeAttributes: string[];
caching: boolean;
}

export default class BatchEdgeQueryHandler {
metaKG: MetaKG;
subscribers: any[];
logs: StampedLog[];
caching: boolean;
options: QueryHandlerOptions;
resolveOutputIDs: boolean;
qEdges: QEdge | QEdge[];
constructor(metaKG: MetaKG, resolveOutputIDs = true, options?: BatchEdgeQueryOptions) {
this.metaKG = metaKG;
this.subscribers = [];
this.logs = [];
this.caching = options && options.caching;
this.options = options;
Expand Down Expand Up @@ -62,7 +62,7 @@ export default class BatchEdgeQueryHandler {
* @private
*/
async _queryAPIEdges(APIEdges: APIEdge[], unavailableAPIs: UnavailableAPITracker = {}): Promise<Record[]> {
const executor = new call_api(APIEdges, this.options, redisClient as RedisClient);
const executor = new call_api(APIEdges, this.options, redisClient);
const records: Record[] = await executor.query(this.resolveOutputIDs, unavailableAPIs);
this.logs = [...this.logs, ...executor.logs];
return records;
Expand Down Expand Up @@ -100,7 +100,7 @@ export default class BatchEdgeQueryHandler {
const equivalentAlreadyIncluded = qEdge
.getInputNode()
.getEquivalentIDs()
[curie].equivalentIDs.some((equivalentCurie) => reducedCuries.includes(equivalentCurie));
[curie].equivalentIDs.some((equivalentCurie) => reducedCuries.includes(equivalentCurie));
if (!equivalentAlreadyIncluded) {
reducedCuries.push(curie);
} else {
Expand All @@ -112,8 +112,7 @@ export default class BatchEdgeQueryHandler {
strippedCuries.push(...nodeStrippedCuries);
if (nodeStrippedCuries.length > 0) {
debug(
`stripped (${nodeStrippedCuries.length}) duplicate equivalent curies from ${
node.id
`stripped (${nodeStrippedCuries.length}) duplicate equivalent curies from ${node.id
}: ${nodeStrippedCuries.join(',')}`,
);
}
Expand Down Expand Up @@ -173,31 +172,4 @@ export default class BatchEdgeQueryHandler {
debug('Update nodes completed!');
return queryRecords;
}

/**
* Register subscribers
* @param {object} subscriber
*/
subscribe(subscriber): void {
this.subscribers.push(subscriber);
}

/**
* Unsubscribe a listener
* @param {object} subscriber
*/
unsubscribe(subscriber): void {
this.subscribers = this.subscribers.filter((fn) => {
if (fn != subscriber) return fn;
});
}

/**
* Nofity all listeners
*/
notify(res): void {
this.subscribers.map((subscriber) => {
subscriber.update(res);
});
}
}
8 changes: 4 additions & 4 deletions src/cache_handler.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { redisClient } from './redis-client';
import { redisClient } from '@biothings-explorer/utils';
import Debug from 'debug';
const debug = Debug('bte:biothings-explorer-trapi:cache_handler');
import { LogEntry, StampedLog } from '@biothings-explorer/utils';
Expand All @@ -10,8 +10,8 @@ import { Readable, Transform } from 'stream';
import { Record, RecordPackage } from '@biothings-explorer/api-response-transform';
import { threadId } from 'worker_threads';
import MetaKG from '../../smartapi-kg/built';
import { QueryHandlerOptions } from '.';
import QEdge from './query_edge';
import { QueryHandlerOptions } from '@biothings-explorer/types';

export interface RecordPacksByQedgeMetaKGHash {
[QEdgeHash: string]: RecordPackage;
Expand Down Expand Up @@ -113,7 +113,7 @@ export default class CacheHandler {
}

async categorizeEdges(qEdges: QEdge[]): Promise<{ cachedRecords: Record[]; nonCachedQEdges: QEdge[] }> {
if (this.cacheEnabled === false || process.env.INTERNAL_DISABLE_REDIS) {
if (this.cacheEnabled === false || process.env.INTERNAL_DISABLE_REDIS === "true") {
return {
cachedRecords: [],
nonCachedQEdges: qEdges,
Expand Down Expand Up @@ -210,7 +210,7 @@ export default class CacheHandler {
}

async cacheEdges(queryRecords: Record[]): Promise<void> {
if (this.cacheEnabled === false || process.env.INTERNAL_DISABLE_REDIS) {
if (this.cacheEnabled === false || process.env.INTERNAL_DISABLE_REDIS === "true") {
if (global.parentPort) {
global.parentPort.postMessage({ threadId, cacheDone: true });
}
Expand Down
20 changes: 9 additions & 11 deletions src/edge_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import BatchEdgeQueryHandler, { BatchEdgeQueryOptions } from './batch_edge_query
import { Telemetry } from '@biothings-explorer/utils';
import QEdge from './query_edge';
import MetaKG from '@biothings-explorer/smartapi-kg';
import { QueryHandlerOptions } from '.';
import { QueryHandlerOptions } from '@biothings-explorer/types';
import { Record } from '@biothings-explorer/api-response-transform';
import { UnavailableAPITracker } from './types';
import { RecordsByQEdgeID } from './results_assembly/query_results';
Expand Down Expand Up @@ -108,7 +108,7 @@ export default class QueryEdgeManager {
}
debug(
`(5) Sending next edge '${nextQEdge.getID()}' ` +
`WITH entity count...(${nextQEdge.subject.entity_count || nextQEdge.object.entity_count})`,
`WITH entity count...(${nextQEdge.subject.entity_count || nextQEdge.object.entity_count})`,
);
return this.preSendOffCheck(nextQEdge);
}
Expand All @@ -117,19 +117,18 @@ export default class QueryEdgeManager {
this._qEdges.forEach((qEdge) => {
debug(
`'${qEdge.getID()}'` +
` : (${qEdge.subject.entity_count || 0}) ` +
`${qEdge.reverse ? '<--' : '-->'}` +
` (${qEdge.object.entity_count || 0})`,
` : (${qEdge.subject.entity_count || 0}) ` +
`${qEdge.reverse ? '<--' : '-->'}` +
` (${qEdge.object.entity_count || 0})`,
);
});
}

_logSkippedQueries(unavailableAPIs: UnavailableAPITracker): void {
Object.entries(unavailableAPIs).forEach(([api, { skippedQueries }]) => {
if (skippedQueries > 0) {
const skipMessage = `${skippedQueries} additional quer${skippedQueries > 1 ? 'ies' : 'y'} to ${api} ${
skippedQueries > 1 ? 'were' : 'was'
} skipped as the API was unavailable.`;
const skipMessage = `${skippedQueries} additional quer${skippedQueries > 1 ? 'ies' : 'y'} to ${api} ${skippedQueries > 1 ? 'were' : 'was'
} skipped as the API was unavailable.`;
debug(skipMessage);
this.logs.push(new LogEntry('WARNING', null, skipMessage).getLog());
}
Expand Down Expand Up @@ -195,7 +194,7 @@ export default class QueryEdgeManager {
const objectCuries = qEdge.object.curie;
debug(
`'${qEdge.getID()}' Reversed[${qEdge.reverse}] (${JSON.stringify(subjectCuries.length || 0)})` +
`--(${JSON.stringify(objectCuries.length || 0)}) entities / (${records.length}) records.`,
`--(${JSON.stringify(objectCuries.length || 0)}) entities / (${records.length}) records.`,
);
// debug(`IDS SUB ${JSON.stringify(sub_count)}`)
// debug(`IDS OBJ ${JSON.stringify(obj_count)}`)
Expand Down Expand Up @@ -397,8 +396,7 @@ export default class QueryEdgeManager {
new LogEntry(
'INFO',
null,
`Executing ${currentQEdge.getID()}${currentQEdge.isReversed() ? ' (reversed)' : ''}: ${
currentQEdge.subject.id
`Executing ${currentQEdge.getID()}${currentQEdge.isReversed() ? ' (reversed)' : ''}: ${currentQEdge.subject.id
} ${currentQEdge.isReversed() ? '<--' : '-->'} ${currentQEdge.object.id}`,
).getLog(),
);
Expand Down
3 changes: 1 addition & 2 deletions src/graph/knowledge_graph.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { toArray } from '../utils';
import Debug from 'debug';
import {
APIList,
TrapiAttribute,
TrapiKnowledgeGraph,
TrapiKGEdge,
Expand All @@ -10,11 +9,11 @@ import {
TrapiKGNodes,
TrapiQualifier,
TrapiSource,
APIDefinition,
} from '../types';
import KGNode from './kg_node';
import KGEdge from './kg_edge';
import { BTEGraphUpdate } from './graph';
import { APIDefinition } from '@biothings-explorer/types';

const debug = Debug('bte:biothings-explorer-trapi:KnowledgeGraph');

Expand Down
Loading

0 comments on commit 40313ac

Please sign in to comment.