Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

@uppy/transloadit: implement Server-sent event API #4098

Merged
merged 11 commits into from
Jul 13, 2023
70 changes: 66 additions & 4 deletions packages/@uppy/transloadit/src/Assembly.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ class TransloaditAssembly extends Emitter {

#previousFetchStatusStillPending = false

#sse

constructor (assembly, rateLimitedQueue) {
super()

Expand All @@ -53,6 +55,7 @@ class TransloaditAssembly extends Emitter {
}

connect () {
this.#connectServerSentEvents()
this.#connectSocket()
this.#beginPolling()
}
Expand All @@ -62,6 +65,64 @@ class TransloaditAssembly extends Emitter {
this.close()
}

#connectServerSentEvents () {
this.#sse = new EventSource(`${this.status.websocket_url}?assembly=${this.status.assembly_id}`)

this.#sse.addEventListener('open', () => {
// if server side events works, we don't need websockets anymore (it's just a fallback)
if (this.socket) {
aduh95 marked this conversation as resolved.
Show resolved Hide resolved
aduh95 marked this conversation as resolved.
Show resolved Hide resolved
this.socket.disconnect()
this.socket = null
}
clearInterval(this.pollInterval)
this.pollInterval = null
})

/*
* The event "message" is a special case, as it
* will capture events without an event field
* as well as events that have the specific type
* other event type.
*/
this.#sse.addEventListener('message', (e) => {
if (e.data === 'assembly_finished') {
this.#onFinished()
}

if (e.data === 'assembly_uploading_finished') {
this.emit('executing')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
this.emit('executing')
this.emit('transloadit:assembly-executing')

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is copied from

socket.on('assembly_uploading_finished', () => {
this.emit('executing')
})

Changing the event would be a breaking change, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotta be careful here indeed, as some of the events are Assembly internal events, not Uppy Core event emitter, it's confusing.

}

if (e.data === 'assembly_upload_meta_data_extracted') {
this.emit('metadata')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

internal only?

Suggested change
this.emit('metadata')
// used only internally to do X
this.emit('metadata')

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No idea, I've copied it from

socket.on('assembly_upload_meta_data_extracted', () => {
this.emit('metadata')
this.#fetchStatus({ diff: false })
})

this.#fetchStatus({ diff: false })
mifi marked this conversation as resolved.
Show resolved Hide resolved
mifi marked this conversation as resolved.
Show resolved Hide resolved
}
})

this.#sse.addEventListener('assembly_upload_finished', (e) => {
const file = e.data
this.emit('upload', file)
this.status.uploads.push(file)
})

this.#sse.addEventListener('assembly_result_finished', (e) => {
const [stepName, result] = JSON.parse(e.data)
this.emit('result', stepName, result)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't it be this?

Suggested change
this.emit('result', stepName, result)
this.emit('transloadit:result', stepName, result)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

socket.on('assembly_result_finished', (stepName, result) => {
this.emit('result', stepName, result)
if (!this.status.results[stepName]) {
this.status.results[stepName] = []
}
this.status.results[stepName].push(result)
})

;(this.status.results[stepName] ??= []).push(result)
})

this.#sse.addEventListener('assembly_error', (e) => {
try {
const error = JSON.parse(e.data)
this.#onError(new Error(error.message, { error }))
} catch {
this.#onError(new Error(e.data))
}
// Refetch for updated status code
this.#fetchStatus({ diff: false })
})
}

#connectSocket () {
const parsed = parseUrl(this.status.websocket_url)
const socket = io(parsed.origin, {
Expand Down Expand Up @@ -102,10 +163,7 @@ class TransloaditAssembly extends Emitter {

socket.on('assembly_result_finished', (stepName, result) => {
this.emit('result', stepName, result)
if (!this.status.results[stepName]) {
this.status.results[stepName] = []
}
this.status.results[stepName].push(result)
;(this.status.results[stepName] ??= []).push(result)
})

socket.on('assembly_error', (status) => {
Expand Down Expand Up @@ -262,6 +320,10 @@ class TransloaditAssembly extends Emitter {
*/
close () {
this.closed = true
if (this.#sse) {
this.#sse.close()
this.#sse = null
}
if (this.socket) {
this.socket.disconnect()
this.socket = null
Expand Down