Skip to content

Commit

Permalink
f
Browse files Browse the repository at this point in the history
  • Loading branch information
aduh95 committed Mar 11, 2024
1 parent 6c7c2b0 commit 42d6cfe
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 34 deletions.
43 changes: 20 additions & 23 deletions packages/@uppy/aws-s3-multipart/src/HTTPCommunicationQueue.ts
Original file line number Diff line number Diff line change
@@ -1,58 +1,53 @@
import type { Meta, UppyFile } from '@uppy/utils/lib/UppyFile'
import type {
RateLimitedQueue,
WrapPromiseFunctionType,
} from '@uppy/utils/lib/RateLimitedQueue'
import { pausingUploadReason, type Chunk } from './MultipartUploader.ts'
import type AwsS3Multipart from './index.ts'
import { throwIfAborted } from './utils.ts'
import type { Body, UploadPartBytesResult, UploadResult } from './utils.ts'
import type { AwsS3MultipartOptions, uploadPartBytes } from './index.ts'

type RateLimitedQueue = any // untyped

function removeMetadataFromURL(urlString: string) {
const urlObject = new URL(urlString)
urlObject.search = ''
urlObject.hash = ''
return urlObject.href
}

type AbortablePromise<T extends (...args: any) => Promise<any>> = (
...args: Parameters<T>
) => ReturnType<T> & {
abort: (reason?: any) => void
abortOn: (signal?: AbortSignal) => ReturnType<T>
}

export class HTTPCommunicationQueue<M extends Meta, B extends Body> {
#abortMultipartUpload: AbortablePromise<
#abortMultipartUpload: WrapPromiseFunctionType<
AwsS3Multipart<M, B>['abortMultipartUpload']
>

#cache = new WeakMap()

#createMultipartUpload: AbortablePromise<
#createMultipartUpload: WrapPromiseFunctionType<
AwsS3Multipart<M, B>['createMultipartUpload']
>

#fetchSignature: AbortablePromise<AwsS3Multipart<M, B>['signPart']>
#fetchSignature: WrapPromiseFunctionType<AwsS3Multipart<M, B>['signPart']>

#getUploadParameters: AbortablePromise<
#getUploadParameters: WrapPromiseFunctionType<
AwsS3Multipart<M, B>['getUploadParameters']
>

#listParts: AbortablePromise<AwsS3Multipart<M, B>['listParts']>
#listParts: WrapPromiseFunctionType<AwsS3Multipart<M, B>['listParts']>

#previousRetryDelay: number

#requests

#retryDelays: { values: () => Iterator<number> }

#sendCompletionRequest: AbortablePromise<
#sendCompletionRequest: WrapPromiseFunctionType<
AwsS3Multipart<M, B>['completeMultipartUpload']
>

#setS3MultipartState

#uploadPartBytes: AbortablePromise<uploadPartBytes>
#uploadPartBytes: WrapPromiseFunctionType<uploadPartBytes>

#getFile

Expand All @@ -73,25 +68,27 @@ export class HTTPCommunicationQueue<M extends Meta, B extends Body> {

if ('abortMultipartUpload' in options) {
this.#abortMultipartUpload = requests.wrapPromiseFunction(
options.abortMultipartUpload,
options.abortMultipartUpload as any,
{ priority: 1 },
)
}
if ('createMultipartUpload' in options) {
this.#createMultipartUpload = requests.wrapPromiseFunction(
options.createMultipartUpload,
options.createMultipartUpload as any,
{ priority: -1 },
)
}
if ('signPart' in options) {
this.#fetchSignature = requests.wrapPromiseFunction(options.signPart)
this.#fetchSignature = requests.wrapPromiseFunction(
options.signPart as any,
)
}
if ('listParts' in options) {
this.#listParts = requests.wrapPromiseFunction(options.listParts)
this.#listParts = requests.wrapPromiseFunction(options.listParts as any)
}
if ('completeMultipartUpload' in options) {
this.#sendCompletionRequest = requests.wrapPromiseFunction(
options.completeMultipartUpload,
options.completeMultipartUpload as any,
{ priority: 1 },
)
}
Expand All @@ -100,13 +97,13 @@ export class HTTPCommunicationQueue<M extends Meta, B extends Body> {
}
if ('uploadPartBytes' in options) {
this.#uploadPartBytes = requests.wrapPromiseFunction(
options.uploadPartBytes,
options.uploadPartBytes as any,
{ priority: Infinity },
)
}
if ('getUploadParameters' in options) {
this.#getUploadParameters = requests.wrapPromiseFunction(
options.getUploadParameters,
options.getUploadParameters as any,
)
}
}
Expand Down
10 changes: 4 additions & 6 deletions packages/@uppy/aws-s3-multipart/src/MultipartUploader.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
import type { Uppy } from '@uppy/core'
import { AbortController } from '@uppy/utils/lib/AbortController'
import type { Body, Meta, UppyFile } from '@uppy/utils/lib/UppyFile'
import type { Meta, UppyFile } from '@uppy/utils/lib/UppyFile'
import type { HTTPCommunicationQueue } from './HTTPCommunicationQueue'
import type { Body } from './utils'

const MB = 1024 * 1024

interface MultipartUploaderOptions<
M extends Meta,
B extends Body & { location: string },
> {
interface MultipartUploaderOptions<M extends Meta, B extends Body> {
getChunkSize?: (file: { size: number }) => number
onProgress?: (bytesUploaded: number, bytesTotal: number) => void
onPartComplete?: (part: { PartNumber: number; ETag: string }) => void
Expand Down Expand Up @@ -63,7 +61,7 @@ export const pausingUploadReason = Symbol('pausing upload, not an actual error')
* (based on the user-provided `shouldUseMultipart` option value) and to manage
* the chunk splitting.
*/
class MultipartUploader<M extends Meta, B extends Body & { location: string }> {
class MultipartUploader<M extends Meta, B extends Body> {
options: MultipartUploaderOptions<M, B> &
Required<Pick<MultipartUploaderOptions<M, B>, keyof typeof defaultOptions>>

Expand Down
2 changes: 0 additions & 2 deletions packages/@uppy/aws-s3-multipart/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import type { RequestOptions } from '@uppy/utils/lib/CompanionClientProvider.ts'
import type { Body as _Body, Meta, UppyFile } from '@uppy/utils/lib/UppyFile'
import type { Uppy } from '@uppy/core'
import EventManager from '@uppy/core/lib/EventManager.js'
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore untyped
import { RateLimitedQueue } from '@uppy/utils/lib/RateLimitedQueue'
import {
filterNonFailedFiles,
Expand Down
10 changes: 7 additions & 3 deletions packages/@uppy/utils/src/RateLimitedQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,15 @@ type QueueOptions = {
priority?: number
}

interface AbortablePromise<T> extends Promise<T> {
export interface AbortablePromise<T> extends Promise<T> {
abort(cause?: unknown): void
abortOn: typeof abortOn
abortOn: (...args: Parameters<typeof abortOn>) => AbortablePromise<T>
}

export type WrapPromiseFunctionType<T extends (...args: any[]) => any> = (
...args: Parameters<T>
) => AbortablePromise<Awaited<ReturnType<T>>>

export class RateLimitedQueue {
#activeRequests = 0

Expand Down Expand Up @@ -223,7 +227,7 @@ export class RateLimitedQueue {
outerPromise.abort = (cause) => {
queuedRequest.abort(cause)
}
outerPromise.abortOn = abortOn
outerPromise.abortOn = abortOn as any

return outerPromise
}
Expand Down

0 comments on commit 42d6cfe

Please sign in to comment.