From 9e417b3a02be860f3439ce62843e71095e2669c7 Mon Sep 17 00:00:00 2001 From: Colin Date: Mon, 18 Nov 2024 01:52:37 -0500 Subject: [PATCH] Simplify --- packages/core/rpc/BaseRpcDriver.ts | 6 +- packages/core/util/index.ts | 3 + packages/core/util/stopToken.ts | 4 + .../alignments/src/BamAdapter/BamAdapter.ts | 145 +++++++---------- .../alignments/src/CramAdapter/CramAdapter.ts | 153 ++++++++---------- website/yarn.lock | 14 +- 6 files changed, 149 insertions(+), 176 deletions(-) diff --git a/packages/core/rpc/BaseRpcDriver.ts b/packages/core/rpc/BaseRpcDriver.ts index c72ab87693..2f246ca5a1 100644 --- a/packages/core/rpc/BaseRpcDriver.ts +++ b/packages/core/rpc/BaseRpcDriver.ts @@ -133,7 +133,11 @@ export default abstract class BaseRpcDriver { } } - async remoteAbort(sessionId: string, functionName: string, stopTokenId: number) { + async remoteAbort( + sessionId: string, + functionName: string, + stopTokenId: number, + ) { const worker = await this.getWorker(sessionId) await worker.call( functionName, diff --git a/packages/core/util/index.ts b/packages/core/util/index.ts index b9c9384203..7cd7aa75f9 100644 --- a/packages/core/util/index.ts +++ b/packages/core/util/index.ts @@ -1009,6 +1009,9 @@ export async function updateStatus( cb('') return res } + +// call statusCallback with current status and clear when finished, and check +// stopToken afterwards export async function updateStatus2( msg: string, cb: (arg: string) => void, diff --git a/packages/core/util/stopToken.ts b/packages/core/util/stopToken.ts index 1900add92f..92c6cbb951 100644 --- a/packages/core/util/stopToken.ts +++ b/packages/core/util/stopToken.ts @@ -29,10 +29,14 @@ */ export function createStopToken() { + // URL not available in jest and can't properly mock it + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition return URL.createObjectURL?.(new Blob()) || `${Math.random()}` } export function stopStopToken(stopToken: string) { + // URL not available in jest and can't properly mock it + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition URL.revokeObjectURL?.(stopToken) } diff --git a/plugins/alignments/src/BamAdapter/BamAdapter.ts b/plugins/alignments/src/BamAdapter/BamAdapter.ts index 854c668c3e..83fa8ade57 100644 --- a/plugins/alignments/src/BamAdapter/BamAdapter.ts +++ b/plugins/alignments/src/BamAdapter/BamAdapter.ts @@ -7,15 +7,15 @@ import { BaseOptions, } from '@jbrowse/core/data_adapters/BaseAdapter' import { Region } from '@jbrowse/core/util/types' -import { bytesForRegions, updateStatus2, Feature } from '@jbrowse/core/util' +import { bytesForRegions, updateStatus, Feature } from '@jbrowse/core/util' import { openLocation } from '@jbrowse/core/util/io' import { ObservableCreate } from '@jbrowse/core/util/rxjs' -import { checkStopToken } from '@jbrowse/core/util/stopToken' import QuickLRU from '@jbrowse/core/util/QuickLRU' // locals import BamSlightlyLazyFeature from './BamSlightlyLazyFeature' import { FilterBy } from '../shared/types' +import { checkStopToken } from '@jbrowse/core/util/stopToken' interface Header { idToName: string[] @@ -78,23 +78,20 @@ export default class BamAdapter extends BaseFeatureDataAdapter { async getHeader(_opts?: BaseOptions) { const { bam } = await this.configure() - // TODO:ABORT return bam.getHeaderText() } private async setupPre(opts?: BaseOptions) { - const { stopToken, statusCallback = () => {} } = opts || {} + const { statusCallback = () => {} } = opts || {} const { bam } = await this.configure() - this.samHeader = await updateStatus2( + this.samHeader = await updateStatus( 'Downloading index', statusCallback, - stopToken, async () => { - // TODO:ABORT const samHeader = await bam.getHeader() - // use the @SQ lines in the header to figure out the mapping between - // ref ref ID numbers and names + // use the @SQ lines in the header to figure out the + // mapping between ref ref ID numbers and names const idToName: string[] = [] const nameToId: Record = {} samHeader @@ -130,12 +127,7 @@ export default class BamAdapter extends BaseFeatureDataAdapter { return idToName } - private async seqFetch( - refName: string, - start: number, - end: number, - opts?: { stopToken?: string }, - ) { + private async seqFetch(refName: string, start: number, end: number) { const { sequenceAdapter } = await this.configure() const refSeqStore = sequenceAdapter if (!refSeqStore) { @@ -145,15 +137,12 @@ export default class BamAdapter extends BaseFeatureDataAdapter { return undefined } - const features = refSeqStore.getFeatures( - { - refName, - start, - end, - assemblyName: '', - }, - opts, - ) + const features = refSeqStore.getFeatures({ + refName, + start, + end, + assemblyName: '', + }) const seqChunks = await firstValueFrom(features.pipe(toArray())) @@ -191,80 +180,70 @@ export default class BamAdapter extends BaseFeatureDataAdapter { const { refName, start, end, originalRefName } = region const { stopToken, filterBy, statusCallback = () => {} } = opts || {} return ObservableCreate(async observer => { - checkStopToken(stopToken) const { bam } = await this.configure() + await this.setup(opts) checkStopToken(stopToken) - - // TODO:ABORT - const records = await updateStatus2( + const records = await updateStatus( 'Downloading alignments', statusCallback, - stopToken, () => bam.getRecordsForRange(refName, start, end), ) - await updateStatus2( - 'Processing alignments', - statusCallback, - stopToken, - async () => { - const { - flagInclude = 0, - flagExclude = 0, - tagFilter, - readName, - } = filterBy || {} + checkStopToken(stopToken) - for (const record of records) { - let ref: string | undefined - if (!record.tags.MD) { - ref = await this.seqFetch( - originalRefName || refName, - record.start, - record.end, - opts, - ) - } + await updateStatus('Processing alignments', statusCallback, async () => { + const { + flagInclude = 0, + flagExclude = 0, + tagFilter, + readName, + } = filterBy || {} + + for (const record of records) { + let ref: string | undefined + if (!record.tags.MD) { + ref = await this.seqFetch( + originalRefName || refName, + record.start, + record.end, + ) + } - const flags = record.flags + const flags = record.flags + if ((flags & flagInclude) !== flagInclude && !(flags & flagExclude)) { + continue + } + + if (tagFilter) { + const readVal = record.tags[tagFilter.tag] + const filterVal = tagFilter.value if ( - (flags & flagInclude) !== flagInclude && - !(flags & flagExclude) + filterVal === '*' + ? readVal === undefined + : `${readVal}` !== `${filterVal}` ) { continue } + } - if (tagFilter) { - const readVal = record.tags[tagFilter.tag] - const filterVal = tagFilter.value - if ( - filterVal === '*' - ? readVal === undefined - : `${readVal}` !== `${filterVal}` - ) { - continue - } - } - - if (readName && record.name !== readName) { - continue - } + if (readName && record.name !== readName) { + continue + } - // retrieve a feature from our feature cache if it is available, the - // features in the cache have pre-computed mismatches objects that - // can be re-used across blocks - const ret = this.ultraLongFeatureCache.get(`${record.id}`) - if (!ret) { - const elt = new BamSlightlyLazyFeature(record, this, ref) - this.ultraLongFeatureCache.set(`${record.id}`, elt) - observer.next(elt) - } else { - observer.next(ret) - } + // retrieve a feature from our feature cache if it is available, the + // features in the cache have pre-computed mismatches objects that + // can be re-used across blocks + const ret = this.ultraLongFeatureCache.get(`${record.id}`) + if (!ret) { + const elt = new BamSlightlyLazyFeature(record, this, ref) + this.ultraLongFeatureCache.set(`${record.id}`, elt) + observer.next(elt) + } else { + observer.next(ret) } - }, - ) - observer.complete() - }, stopToken) + } + observer.complete() + }) + }) } async getMultiRegionFeatureDensityStats( diff --git a/plugins/alignments/src/CramAdapter/CramAdapter.ts b/plugins/alignments/src/CramAdapter/CramAdapter.ts index 834799775c..704bf4e22c 100644 --- a/plugins/alignments/src/CramAdapter/CramAdapter.ts +++ b/plugins/alignments/src/CramAdapter/CramAdapter.ts @@ -8,7 +8,7 @@ import { BaseSequenceAdapter, } from '@jbrowse/core/data_adapters/BaseAdapter' import type { Region, Feature } from '@jbrowse/core/util' -import { updateStatus2, toLocale } from '@jbrowse/core/util' +import { updateStatus, toLocale } from '@jbrowse/core/util' import { openLocation } from '@jbrowse/core/util/io' import { ObservableCreate } from '@jbrowse/core/util/rxjs' import QuickLRU from '@jbrowse/core/util/QuickLRU' @@ -59,9 +59,7 @@ export default class CramAdapter extends BaseFeatureDataAdapter { const cram = new IndexedCramFile({ cramFilehandle: openLocation(cramLocation, pm), - index: new CraiIndex({ - filehandle: openLocation(craiLocation, pm), - }), + index: new CraiIndex({ filehandle: openLocation(craiLocation, pm) }), seqFetch: (...args) => this.seqFetch(...args), checkSequenceMD5: false, }) @@ -148,40 +146,35 @@ export default class CramAdapter extends BaseFeatureDataAdapter { } private async setupPre(opts?: BaseOptions) { - const { stopToken, statusCallback = () => {} } = opts || {} - return updateStatus2( - 'Downloading index', - statusCallback, - stopToken, - async () => { - const conf = await this.configure() - const { cram } = conf - const samHeader = await cram.cram.getSamHeader() - - // use the @SQ lines in the header to figure out the - // mapping between ref ID numbers and names - const idToName: string[] = [] - const nameToId: Record = {} - samHeader - .filter(l => l.tag === 'SQ') - .forEach((sqLine, refId) => { - const SN = sqLine.data.find(item => item.tag === 'SN') - if (SN) { - const refName = SN.value - nameToId[refName] = refId - idToName[refId] = refName - } - }) + const { statusCallback = () => {} } = opts || {} + return updateStatus('Downloading index', statusCallback, async () => { + const conf = await this.configure() + const { cram } = conf + const samHeader = await cram.cram.getSamHeader() + + // use the @SQ lines in the header to figure out the + // mapping between ref ID numbers and names + const idToName: string[] = [] + const nameToId: Record = {} + samHeader + .filter(l => l.tag === 'SQ') + .forEach((sqLine, refId) => { + const SN = sqLine.data.find(item => item.tag === 'SN') + if (SN) { + const refName = SN.value + nameToId[refName] = refId + idToName[refId] = refName + } + }) - const readGroups = samHeader - .filter(l => l.tag === 'RG') - .map(rgLine => rgLine.data.find(item => item.tag === 'ID')?.value) + const readGroups = samHeader + .filter(l => l.tag === 'RG') + .map(rgLine => rgLine.data.find(item => item.tag === 'ID')?.value) - const data = { idToName, nameToId, readGroups } - this.samHeader = data - return { samHeader: data, ...conf } - }, - ) + const data = { idToName, nameToId, readGroups } + this.samHeader = data + return { samHeader: data, ...conf } + }) } private async setup(opts?: BaseOptions) { @@ -246,67 +239,57 @@ export default class CramAdapter extends BaseFeatureDataAdapter { if (originalRefName) { this.seqIdToOriginalRefName[refId] = originalRefName } - checkStopToken(stopToken) - const records = await updateStatus2( + const records = await updateStatus( 'Downloading alignments', statusCallback, - stopToken, () => cram.getRecordsForRange(refId, start, end), ) checkStopToken(stopToken) - await updateStatus2( - 'Processing alignments', - statusCallback, - stopToken, - () => { - const { - flagInclude = 0, - flagExclude = 0, - tagFilter, - readName, - } = filterBy || {} - - for (const record of records) { - const flags = record.flags + await updateStatus('Processing alignments', statusCallback, () => { + const { + flagInclude = 0, + flagExclude = 0, + tagFilter, + readName, + } = filterBy || {} + + for (const record of records) { + const flags = record.flags + if ((flags & flagInclude) !== flagInclude && !(flags & flagExclude)) { + continue + } + + if (tagFilter) { + const readVal = + tagFilter.tag === 'RG' + ? samHeader.readGroups?.[record.readGroupId] + : record.tags[tagFilter.tag] + const filterVal = tagFilter.value if ( - (flags & flagInclude) !== flagInclude && - !(flags & flagExclude) + filterVal === '*' + ? readVal === undefined + : `${readVal}` !== `${filterVal}` ) { continue } + } - if (tagFilter) { - const readVal = - tagFilter.tag === 'RG' - ? samHeader.readGroups?.[record.readGroupId] - : record.tags[tagFilter.tag] - const filterVal = tagFilter.value - if ( - filterVal === '*' - ? readVal === undefined - : `${readVal}` !== `${filterVal}` - ) { - continue - } - } - - if (readName && record.readName !== readName) { - continue - } + if (readName && record.readName !== readName) { + continue + } - const ret = this.ultraLongFeatureCache.get(`${record.uniqueId}`) - if (!ret) { - const elt = this.cramRecordToFeature(record) - this.ultraLongFeatureCache.set(`${record.uniqueId}`, elt) - observer.next(elt) - } else { - observer.next(ret) - } + const ret = this.ultraLongFeatureCache.get(`${record.uniqueId}`) + if (!ret) { + const elt = this.cramRecordToFeature(record) + this.ultraLongFeatureCache.set(`${record.uniqueId}`, elt) + observer.next(elt) + } else { + observer.next(ret) } - checkStopToken(stopToken) - observer.complete() - }, - ) + } + + observer.complete() + }) }, stopToken) } diff --git a/website/yarn.lock b/website/yarn.lock index 5c0aba7658..5ad564284e 100644 --- a/website/yarn.lock +++ b/website/yarn.lock @@ -4213,12 +4213,12 @@ execa@^5.0.0: dependencies: cross-spawn "^7.0.3" get-stream "^6.0.0" - human-stopTokens "^2.1.0" + human-signals "^2.1.0" is-stream "^2.0.0" merge-stream "^2.0.0" npm-run-path "^4.0.1" onetime "^5.1.2" - stopToken-exit "^3.0.3" + signal-exit "^3.0.3" strip-final-newline "^2.0.0" express@^4.17.3: @@ -4980,9 +4980,9 @@ http2-wrapper@^2.1.10: quick-lru "^5.1.1" resolve-alpn "^1.2.0" -human-stopTokens@^2.1.0: +human-signals@^2.1.0: version "2.1.0" - resolved "https://registry.yarnpkg.com/human-stopTokens/-/human-stopTokens-2.1.0.tgz#dc91fcba42e4d06e4abaed33b3e7a3c02f514ea0" + resolved "https://registry.yarnpkg.com/human-signals/-/human-signals-2.1.0.tgz#dc91fcba42e4d06e4abaed33b3e7a3c02f514ea0" integrity sha512-B4FFZ6q/T2jhhksgkbEW3HBvWIfDW85snkQgawt07S7J5QXTk6BkNV+0yAeZrM5QpMAdYlocGoljn0sJ/WQkFw== iconv-lite@0.4.24: @@ -7924,9 +7924,9 @@ side-channel@^1.0.6: get-intrinsic "^1.2.4" object-inspect "^1.13.1" -stopToken-exit@^3.0.2, stopToken-exit@^3.0.3: +signal-exit@^3.0.2, signal-exit@^3.0.3: version "3.0.7" - resolved "https://registry.yarnpkg.com/stopToken-exit/-/stopToken-exit-3.0.7.tgz#a9a1767f8af84155114eaabd73f99273c8f59ad9" + resolved "https://registry.yarnpkg.com/signal-exit/-/signal-exit-3.0.7.tgz#a9a1767f8af84155114eaabd73f99273c8f59ad9" integrity sha512-wnD2ZE+l+SPC/uoS0vXeE9L1+0wuaMqKlfz9AMUo38JsyLSBWSFcHR1Rri62LZc12vLr1gb3jl7iwQhgwpAbGQ== sirv@^2.0.3: @@ -8775,7 +8775,7 @@ write-file-atomic@^3.0.3: dependencies: imurmurhash "^0.1.4" is-typedarray "^1.0.0" - stopToken-exit "^3.0.2" + signal-exit "^3.0.2" typedarray-to-buffer "^3.1.5" ws@^7.3.1: