diff --git a/packages/@uppy/tus/src/index.ts b/packages/@uppy/tus/src/index.ts index 4480dd5e48..97adb68164 100644 --- a/packages/@uppy/tus/src/index.ts +++ b/packages/@uppy/tus/src/index.ts @@ -220,8 +220,8 @@ export default class Tus extends BasePlugin< // Create a new tus upload return new Promise((resolve, reject) => { - let queuedRequest: RateLimitedQueue.QueueEntry - let qRequest: () => void + let queuedRequest: ReturnType + let qRequest: () => () => void let upload: tus.Upload const opts = { diff --git a/packages/@uppy/utils/src/RateLimitedQueue.test.js b/packages/@uppy/utils/src/RateLimitedQueue.test.ts similarity index 94% rename from packages/@uppy/utils/src/RateLimitedQueue.test.js rename to packages/@uppy/utils/src/RateLimitedQueue.test.ts index da2b000cc4..9acdb74f35 100644 --- a/packages/@uppy/utils/src/RateLimitedQueue.test.js +++ b/packages/@uppy/utils/src/RateLimitedQueue.test.ts @@ -1,10 +1,10 @@ import { describe, expect, it } from 'vitest' -import { RateLimitedQueue } from './RateLimitedQueue.js' +import { RateLimitedQueue } from './RateLimitedQueue.ts' import delay from './delay.ts' describe('RateLimitedQueue', () => { let pending = 0 - function fn() { + async function fn() { pending++ return delay(15).then(() => pending--) } diff --git a/packages/@uppy/utils/src/RateLimitedQueue.js b/packages/@uppy/utils/src/RateLimitedQueue.ts similarity index 66% rename from packages/@uppy/utils/src/RateLimitedQueue.js rename to packages/@uppy/utils/src/RateLimitedQueue.ts index 9daecf6b2b..1c769e9a19 100644 --- a/packages/@uppy/utils/src/RateLimitedQueue.js +++ b/packages/@uppy/utils/src/RateLimitedQueue.ts @@ -1,34 +1,58 @@ -function createCancelError (cause) { +function createCancelError(cause?: string) { return new Error('Cancelled', { cause }) } -function abortOn (signal) { +function abortOn( + this: { abort: (cause: string) => void; then?: Promise['then'] }, + signal?: AbortSignal, +) { if (signal != null) { const abortPromise = () => this.abort(signal.reason) signal.addEventListener('abort', abortPromise, { once: true }) - const removeAbortListener = () => { signal.removeEventListener('abort', abortPromise) } + const removeAbortListener = () => { + signal.removeEventListener('abort', abortPromise) + } this.then?.(removeAbortListener, removeAbortListener) } return this } +type Handler = { + shouldBeRequeued?: boolean + fn: () => (...args: any[]) => Promise | void + priority: number + abort: (cause?: unknown) => void + done: () => void +} + +type QueueOptions = { + priority?: number +} + +interface AbortablePromise extends Promise { + abort(cause?: unknown): void + abortOn: typeof abortOn +} + export class RateLimitedQueue { #activeRequests = 0 - #queuedHandlers = [] + #queuedHandlers: Handler[] = [] #paused = false - #pauseTimer + #pauseTimer: ReturnType #downLimit = 1 - #upperLimit + #upperLimit: number + + #rateLimitingTimer: ReturnType - #rateLimitingTimer + limit: number - constructor (limit) { + constructor(limit?: number) { if (typeof limit !== 'number' || limit === 0) { this.limit = Infinity } else { @@ -36,12 +60,12 @@ export class RateLimitedQueue { } } - #call (fn) { + #call(fn: Handler['fn']) { this.#activeRequests += 1 let done = false - let cancelActive + let cancelActive: (cause?: unknown) => void try { cancelActive = fn() } catch (err) { @@ -50,7 +74,7 @@ export class RateLimitedQueue { } return { - abort: (cause) => { + abort: (cause?: unknown) => { if (done) return done = true this.#activeRequests -= 1 @@ -67,14 +91,14 @@ export class RateLimitedQueue { } } - #queueNext () { + #queueNext() { // Do it soon but not immediately, this allows clearing out the entire queue synchronously // one by one without continuously _advancing_ it (and starting new tasks before immediately // aborting them) queueMicrotask(() => this.#next()) } - #next () { + #next() { if (this.#paused || this.#activeRequests >= this.limit) { return } @@ -86,20 +110,25 @@ export class RateLimitedQueue { // so that cancelling it does the Right Thing (and doesn't just try // to dequeue an already-running request). const next = this.#queuedHandlers.shift() + if (next == null) { + throw new Error('Invariant violation: next is null') + } const handler = this.#call(next.fn) next.abort = handler.abort next.done = handler.done } - #queue (fn, options = {}) { - const handler = { + #queue(fn: Handler['fn'], options?: QueueOptions) { + const handler: Handler = { fn, - priority: options.priority || 0, + priority: options?.priority || 0, abort: () => { this.#dequeue(handler) }, done: () => { - throw new Error('Cannot mark a queued request as done: this indicates a bug') + throw new Error( + 'Cannot mark a queued request as done: this indicates a bug', + ) }, } @@ -114,22 +143,27 @@ export class RateLimitedQueue { return handler } - #dequeue (handler) { + #dequeue(handler: Handler) { const index = this.#queuedHandlers.indexOf(handler) if (index !== -1) { this.#queuedHandlers.splice(index, 1) } } - run (fn, queueOptions) { + run( + fn: Handler['fn'], + queueOptions?: QueueOptions, + ): Handler | Omit { if (!this.#paused && this.#activeRequests < this.limit) { return this.#call(fn) } return this.#queue(fn, queueOptions) } - wrapSyncFunction (fn, queueOptions) { - return (...args) => { + wrapSyncFunction(fn: () => void, queueOptions: QueueOptions) { + return ( + ...args: Parameters + ): { abortOn: typeof abortOn; abort: Handler['abort'] } => { const queuedRequest = this.run(() => { fn(...args) queueMicrotask(() => queuedRequest.done()) @@ -138,19 +172,22 @@ export class RateLimitedQueue { return { abortOn, - abort () { + abort() { queuedRequest.abort() }, } } } - wrapPromiseFunction (fn, queueOptions) { - return (...args) => { - let queuedRequest + wrapPromiseFunction any>( + fn: T, + queueOptions?: QueueOptions, + ) { + return (...args: Parameters): AbortablePromise> => { + let queuedRequest: ReturnType const outerPromise = new Promise((resolve, reject) => { queuedRequest = this.run(() => { - let cancelError + let cancelError: ReturnType let innerPromise try { innerPromise = Promise.resolve(fn(...args)) @@ -158,27 +195,30 @@ export class RateLimitedQueue { innerPromise = Promise.reject(err) } - innerPromise.then((result) => { - if (cancelError) { - reject(cancelError) - } else { - queuedRequest.done() - resolve(result) - } - }, (err) => { - if (cancelError) { - reject(cancelError) - } else { - queuedRequest.done() - reject(err) - } - }) + innerPromise.then( + (result) => { + if (cancelError) { + reject(cancelError) + } else { + queuedRequest.done() + resolve(result) + } + }, + (err) => { + if (cancelError) { + reject(cancelError) + } else { + queuedRequest.done() + reject(err) + } + }, + ) return (cause) => { cancelError = createCancelError(cause) } }, queueOptions) - }) + }) as AbortablePromise> outerPromise.abort = (cause) => { queuedRequest.abort(cause) @@ -189,7 +229,7 @@ export class RateLimitedQueue { } } - resume () { + resume(): void { this.#paused = false clearTimeout(this.#pauseTimer) for (let i = 0; i < this.limit; i++) { @@ -205,7 +245,7 @@ export class RateLimitedQueue { * @param {number | null } [duration] Duration for the pause to happen, in milliseconds. * If omitted, the queue won't resume automatically. */ - pause (duration = null) { + pause(duration: number | null = null): void { this.#paused = true clearTimeout(this.#pauseTimer) if (duration != null) { @@ -223,7 +263,7 @@ export class RateLimitedQueue { * * @param {number} duration in milliseconds. */ - rateLimit (duration) { + rateLimit(duration: number): void { clearTimeout(this.#rateLimitingTimer) this.pause(duration) if (this.limit > 1 && Number.isFinite(this.limit)) { @@ -250,7 +290,9 @@ export class RateLimitedQueue { } } - get isPaused () { return this.#paused } + get isPaused(): boolean { + return this.#paused + } } export const internalRateLimitedQueue = Symbol('__queue')