Skip to content

Commit

Permalink
@uppy/transloadit: implement Server-sent event API (#4098)
Browse files Browse the repository at this point in the history
  • Loading branch information
aduh95 authored Jul 13, 2023
1 parent a83373b commit 9ac5842
Showing 1 changed file with 65 additions and 4 deletions.
69 changes: 65 additions & 4 deletions packages/@uppy/transloadit/src/Assembly.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ class TransloaditAssembly extends Emitter {

#previousFetchStatusStillPending = false

#sse

constructor (assembly, rateLimitedQueue) {
super()

Expand All @@ -53,6 +55,7 @@ class TransloaditAssembly extends Emitter {
}

connect () {
this.#connectServerSentEvents()
this.#connectSocket()
this.#beginPolling()
}
Expand All @@ -62,6 +65,63 @@ class TransloaditAssembly extends Emitter {
this.close()
}

#connectServerSentEvents () {
this.#sse = new EventSource(`${this.status.websocket_url}?assembly=${this.status.assembly_id}`)

this.#sse.addEventListener('open', () => {
// if server side events works, we don't need websockets anymore (it's just a fallback)
if (this.socket) {
this.socket.disconnect()
this.socket = null
}
clearInterval(this.pollInterval)
this.pollInterval = null
})

/*
* The event "message" is a special case, as it
* will capture events without an event field
* as well as events that have the specific type
* other event type.
*/
this.#sse.addEventListener('message', (e) => {
if (e.data === 'assembly_finished') {
this.#onFinished()
}

if (e.data === 'assembly_uploading_finished') {
this.emit('executing')
}

if (e.data === 'assembly_upload_meta_data_extracted') {
this.emit('metadata')
this.#fetchStatus({ diff: false })
}
})

this.#sse.addEventListener('assembly_upload_finished', (e) => {
const file = JSON.parse(e.data)
this.emit('upload', file)
this.status.uploads.push(file)
})

this.#sse.addEventListener('assembly_result_finished', (e) => {
const [stepName, result] = JSON.parse(e.data)
this.emit('result', stepName, result)
;(this.status.results[stepName] ??= []).push(result)
})

this.#sse.addEventListener('assembly_error', (e) => {
try {
this.#onError(JSON.parse(e.data))
} catch {
this.#onError({ msg: e.data })
}
// Refetch for updated status code
this.#fetchStatus({ diff: false })
})
}

#connectSocket () {
const parsed = parseUrl(this.status.websocket_url)
const socket = io(parsed.origin, {
Expand Down Expand Up @@ -102,10 +162,7 @@ class TransloaditAssembly extends Emitter {

socket.on('assembly_result_finished', (stepName, result) => {
this.emit('result', stepName, result)
if (!this.status.results[stepName]) {
this.status.results[stepName] = []
}
this.status.results[stepName].push(result)
;(this.status.results[stepName] ??= []).push(result)
})

socket.on('assembly_error', (status) => {
Expand Down Expand Up @@ -262,6 +319,10 @@ class TransloaditAssembly extends Emitter {
*/
close () {
this.closed = true
if (this.#sse) {
this.#sse.close()
this.#sse = null
}
if (this.socket) {
this.socket.disconnect()
this.socket = null
Expand Down

0 comments on commit 9ac5842

Please sign in to comment.