diff --git a/packages/@uppy/transloadit/src/Assembly.js b/packages/@uppy/transloadit/src/Assembly.js index f93acf03e4..1ab89ee134 100644 --- a/packages/@uppy/transloadit/src/Assembly.js +++ b/packages/@uppy/transloadit/src/Assembly.js @@ -36,6 +36,8 @@ class TransloaditAssembly extends Emitter { #previousFetchStatusStillPending = false + #sse + constructor (assembly, rateLimitedQueue) { super() @@ -53,6 +55,7 @@ class TransloaditAssembly extends Emitter { } connect () { + this.#connectServerSentEvents() this.#connectSocket() this.#beginPolling() } @@ -62,6 +65,63 @@ class TransloaditAssembly extends Emitter { this.close() } + #connectServerSentEvents () { + this.#sse = new EventSource(`${this.status.websocket_url}?assembly=${this.status.assembly_id}`) + + this.#sse.addEventListener('open', () => { + // if server side events works, we don't need websockets anymore (it's just a fallback) + if (this.socket) { + this.socket.disconnect() + this.socket = null + } + clearInterval(this.pollInterval) + this.pollInterval = null + }) + + /* + * The event "message" is a special case, as it + * will capture events without an event field + * as well as events that have the specific type + * other event type. + */ + this.#sse.addEventListener('message', (e) => { + if (e.data === 'assembly_finished') { + this.#onFinished() + } + + if (e.data === 'assembly_uploading_finished') { + this.emit('executing') + } + + if (e.data === 'assembly_upload_meta_data_extracted') { + this.emit('metadata') + this.#fetchStatus({ diff: false }) + } + }) + + this.#sse.addEventListener('assembly_upload_finished', (e) => { + const file = JSON.parse(e.data) + this.emit('upload', file) + this.status.uploads.push(file) + }) + + this.#sse.addEventListener('assembly_result_finished', (e) => { + const [stepName, result] = JSON.parse(e.data) + this.emit('result', stepName, result) + ;(this.status.results[stepName] ??= []).push(result) + }) + + this.#sse.addEventListener('assembly_error', (e) => { + try { + this.#onError(JSON.parse(e.data)) + } catch { + this.#onError({ msg: e.data }) + } + // Refetch for updated status code + this.#fetchStatus({ diff: false }) + }) + } + #connectSocket () { const parsed = parseUrl(this.status.websocket_url) const socket = io(parsed.origin, { @@ -102,10 +162,7 @@ class TransloaditAssembly extends Emitter { socket.on('assembly_result_finished', (stepName, result) => { this.emit('result', stepName, result) - if (!this.status.results[stepName]) { - this.status.results[stepName] = [] - } - this.status.results[stepName].push(result) + ;(this.status.results[stepName] ??= []).push(result) }) socket.on('assembly_error', (status) => { @@ -262,6 +319,10 @@ class TransloaditAssembly extends Emitter { */ close () { this.closed = true + if (this.#sse) { + this.#sse.close() + this.#sse = null + } if (this.socket) { this.socket.disconnect() this.socket = null