From 4e94dd0c003845f52880ea41f1f423d1342f910d Mon Sep 17 00:00:00 2001 From: Mikael Finstad Date: Thu, 24 Mar 2022 20:14:10 +0800 Subject: [PATCH] Companion server upload events (#3544) * modernise code * pull out config related functions and middleware * make test more readable * Expose companion emitter to allow the consumer to subscribe to upload success/failure events fixes #3435 * disable client socket timeout for tests or jest will wait for them to time out also fix broken test 'uploader respects maxFileSize with unknown size' * document the event emitter usage --- src/companion.js | 7 ++-- src/config/companion.js | 1 + src/server/Uploader.js | 35 +++++++++++++++++-- src/server/helpers/upload.js | 4 ++- src/server/s3-client.js | 5 +-- src/standalone/helper.js | 2 ++ src/standalone/index.js | 3 +- test/__tests__/companion.js | 3 +- test/__tests__/providers.js | 3 +- test/__tests__/uploader.js | 67 +++++++++++++++++++++--------------- test/__tests__/url.js | 2 +- test/mockserver.js | 7 ++-- 12 files changed, 98 insertions(+), 41 deletions(-) diff --git a/src/companion.js b/src/companion.js index bdd2a52743..ab2d2501c0 100644 --- a/src/companion.js +++ b/src/companion.js @@ -11,7 +11,7 @@ const providerManager = require('./server/provider') const controllers = require('./server/controllers') const s3 = require('./server/controllers/s3') const url = require('./server/controllers/url') -const emitter = require('./server/emitter') +const createEmitter = require('./server/emitter') const redis = require('./server/redis') const { getURLBuilder } = require('./server/helpers/utils') const jobs = require('./server/jobs') @@ -79,7 +79,7 @@ module.exports.app = (optionsArg = {}) => { if (options.redisUrl) { redis.client(merge({ url: options.redisUrl }, options.redisOptions || {})) } - emitter(options.redisUrl, options.redisPubSubScope) + const emitter = createEmitter(options.redisUrl, options.redisPubSubScope) const app = express() @@ -152,5 +152,8 @@ module.exports.app = (optionsArg = {}) => { processId, }) + // todo split emitter from app in next major + // @ts-ignore + app.companionEmitter = emitter return app } diff --git a/src/config/companion.js b/src/config/companion.js index d24254d6a1..5afbdeb788 100644 --- a/src/config/companion.js +++ b/src/config/companion.js @@ -22,6 +22,7 @@ const defaultOptions = { logClientVersion: true, periodicPingUrls: [], streamingUpload: false, + clientSocketConnectTimeout: 60000, } /** diff --git a/src/server/Uploader.js b/src/server/Uploader.js index c9e60bdd6a..6ad8c7723d 100644 --- a/src/server/Uploader.js +++ b/src/server/Uploader.js @@ -290,6 +290,8 @@ class Uploader { */ async tryUploadStream (stream) { try { + emitter().emit('upload-start', { token: this.token }) + const ret = await this.uploadStream(stream) if (!ret) return const { url, extraData } = ret @@ -358,10 +360,37 @@ class Uploader { return Uploader.shortenToken(this.token) } - async awaitReady () { - // TODO timeout after a while? Else we could leak emitters + async awaitReady (timeout) { logger.debug('waiting for socket connection', 'uploader.socket.wait', this.shortToken) - await new Promise((resolve) => emitter().once(`connection:${this.token}`, resolve)) + + // TODO: replace the Promise constructor call when dropping support for Node.js <16 with + // await once(emitter, eventName, timeout && { signal: AbortSignal.timeout(timeout) }) + await new Promise((resolve, reject) => { + const eventName = `connection:${this.token}` + let timer + let onEvent + + function cleanup () { + emitter().removeListener(eventName, onEvent) + clearTimeout(timer) + } + + if (timeout) { + // Need to timeout after a while, or we could leak emitters + timer = setTimeout(() => { + cleanup() + reject(new Error('Timed out waiting for socket connection')) + }, timeout) + } + + onEvent = () => { + cleanup() + resolve() + } + + emitter().once(eventName, onEvent) + }) + logger.debug('socket connection received', 'uploader.socket.wait', this.shortToken) } diff --git a/src/server/helpers/upload.js b/src/server/helpers/upload.js index ee45c79f82..60d16355b1 100644 --- a/src/server/helpers/upload.js +++ b/src/server/helpers/upload.js @@ -7,10 +7,12 @@ const { ValidationError } = Uploader async function startDownUpload ({ req, res, getSize, download, onUnhandledError }) { try { const size = await getSize() + const { clientSocketConnectTimeout } = req.companion.options logger.debug('Instantiating uploader.', null, req.id) const uploader = new Uploader(Uploader.reqToOptions(req, size)) + logger.debug('Starting download stream.', null, req.id) const stream = await download() // "Forking" off the upload operation to background, so we can return the http request: @@ -18,7 +20,7 @@ async function startDownUpload ({ req, res, getSize, download, onUnhandledError // wait till the client has connected to the socket, before starting // the download, so that the client can receive all download/upload progress. logger.debug('Waiting for socket connection before beginning remote download/upload.', null, req.id) - await uploader.awaitReady() + await uploader.awaitReady(clientSocketConnectTimeout) logger.debug('Socket connection received. Starting remote download/upload.', null, req.id) await uploader.tryUploadStream(stream) diff --git a/src/server/s3-client.js b/src/server/s3-client.js index b7d09a79c2..cce3b41aac 100644 --- a/src/server/s3-client.js +++ b/src/server/s3-client.js @@ -1,3 +1,6 @@ +const S3 = require('aws-sdk/clients/s3') +const AWS = require('aws-sdk') + /** * instantiates the aws-sdk s3 client that will be used for s3 uploads. * @@ -6,8 +9,6 @@ module.exports = (companionOptions) => { let s3Client = null if (companionOptions.providerOptions.s3) { - const S3 = require('aws-sdk/clients/s3') - const AWS = require('aws-sdk') const s3ProviderOptions = companionOptions.providerOptions.s3 if (s3ProviderOptions.accessKeyId || s3ProviderOptions.secretAccessKey) { diff --git a/src/standalone/helper.js b/src/standalone/helper.js index 1cd26baa2d..e7b7fcd317 100644 --- a/src/standalone/helper.js +++ b/src/standalone/helper.js @@ -112,6 +112,8 @@ const getConfigFromEnv = () => { streamingUpload: process.env.COMPANION_STREAMING_UPLOAD === 'true', maxFileSize: process.env.COMPANION_MAX_FILE_SIZE ? parseInt(process.env.COMPANION_MAX_FILE_SIZE, 10) : undefined, chunkSize: process.env.COMPANION_CHUNK_SIZE ? parseInt(process.env.COMPANION_CHUNK_SIZE, 10) : undefined, + clientSocketConnectTimeout: process.env.COMPANION_CLIENT_SOCKET_CONNECT_TIMEOUT + ? parseInt(process.env.COMPANION_CLIENT_SOCKET_CONNECT_TIMEOUT, 10) : undefined, } } diff --git a/src/standalone/index.js b/src/standalone/index.js index bb9c493e4c..0cfec8431b 100644 --- a/src/standalone/index.js +++ b/src/standalone/index.js @@ -13,6 +13,7 @@ const companion = require('../companion') const helper = require('./helper') const middlewares = require('../server/middlewares') const { getURLBuilder } = require('../server/helpers/utils') +const connectRedis = require('connect-redis') /** * Configures an Express app for running Companion standalone @@ -139,7 +140,7 @@ module.exports = function server (inputCompanionOptions = {}) { } if (companionOptions.redisUrl) { - const RedisStore = require('connect-redis')(session) + const RedisStore = connectRedis(session) const redisClient = redis.client( merge({ url: companionOptions.redisUrl }, companionOptions.redisOptions), ) diff --git a/test/__tests__/companion.js b/test/__tests__/companion.js index 48b0115f7d..c71c624d37 100644 --- a/test/__tests__/companion.js +++ b/test/__tests__/companion.js @@ -15,7 +15,8 @@ const request = require('supertest') const tokenService = require('../../src/server/helpers/jwt') const { getServer } = require('../mockserver') -const authServer = getServer() +// todo don't share server between tests. rewrite to not use env variables +const authServer = getServer({ COMPANION_CLIENT_SOCKET_CONNECT_TIMEOUT: '0' }) const authData = { dropbox: 'token value', box: 'token value', diff --git a/test/__tests__/providers.js b/test/__tests__/providers.js index dcfc4f14dc..c48ad99a8b 100644 --- a/test/__tests__/providers.js +++ b/test/__tests__/providers.js @@ -17,7 +17,8 @@ const tokenService = require('../../src/server/helpers/jwt') const { getServer } = require('../mockserver') const defaults = require('../fixtures/constants') -const authServer = getServer() +// todo don't share server between tests. rewrite to not use env variables +const authServer = getServer({ COMPANION_CLIENT_SOCKET_CONNECT_TIMEOUT: '0' }) const OAUTH_STATE = 'some-cool-nice-encrytpion' const providers = require('../../src/server/provider').getDefaultProviders() diff --git a/test/__tests__/uploader.js b/test/__tests__/uploader.js index 2d1bd66fa8..fcb3640aff 100644 --- a/test/__tests__/uploader.js +++ b/test/__tests__/uploader.js @@ -9,6 +9,7 @@ const nock = require('nock') const Uploader = require('../../src/server/Uploader') const socketClient = require('../mocksocket') const standalone = require('../../src/standalone') +const Emitter = require('../../src/server/emitter') afterAll(() => { nock.cleanAll() @@ -64,33 +65,45 @@ describe('uploader with tus protocol', () => { const uploadToken = uploader.token expect(uploadToken).toBeTruthy() - return new Promise((resolve, reject) => { - // validate that the test is resolved on socket connection - uploader.awaitReady().then(() => { - uploader.tryUploadStream(stream).then(() => resolve()) - }) + let progressReceived = 0 - let progressReceived = 0 - // emulate socket connection - socketClient.connect(uploadToken) - socketClient.onProgress(uploadToken, (message) => { - progressReceived = message.payload.bytesUploaded - try { - expect(message.payload.bytesTotal).toBe(fileContent.length) - } catch (err) { - reject(err) - } - }) - socketClient.onUploadSuccess(uploadToken, (message) => { - try { - expect(progressReceived).toBe(fileContent.length) - // see __mocks__/tus-js-client.js - expect(message.payload.url).toBe('https://tus.endpoint/files/foo-bar') - } catch (err) { - reject(err) - } - }) + const onProgress = jest.fn() + const onUploadSuccess = jest.fn() + const onBeginUploadEvent = jest.fn() + const onUploadEvent = jest.fn() + + const emitter = Emitter() + emitter.on('upload-start', onBeginUploadEvent) + emitter.on(uploadToken, onUploadEvent) + + const promise = uploader.awaitReady(60000) + // emulate socket connection + socketClient.connect(uploadToken) + socketClient.onProgress(uploadToken, (message) => { + progressReceived = message.payload.bytesUploaded + onProgress(message) + }) + socketClient.onUploadSuccess(uploadToken, onUploadSuccess) + await promise + await uploader.tryUploadStream(stream) + + expect(progressReceived).toBe(fileContent.length) + + expect(onProgress).toHaveBeenLastCalledWith(expect.objectContaining({ + payload: expect.objectContaining({ + bytesTotal: fileContent.length, + }), + })) + const expectedPayload = expect.objectContaining({ + // see __mocks__/tus-js-client.js + url: 'https://tus.endpoint/files/foo-bar', }) + expect(onUploadSuccess).toHaveBeenCalledWith(expect.objectContaining({ + payload: expectedPayload, + })) + + expect(onBeginUploadEvent).toHaveBeenCalledWith({ token: uploadToken }) + expect(onUploadEvent).toHaveBeenLastCalledWith({ action: 'success', payload: expectedPayload }) }) test('upload functions with tus protocol without size', async () => { @@ -110,7 +123,7 @@ describe('uploader with tus protocol', () => { return new Promise((resolve, reject) => { // validate that the test is resolved on socket connection - uploader.awaitReady().then(() => { + uploader.awaitReady(60000).then(() => { uploader.tryUploadStream(stream).then(() => { try { expect(fs.existsSync(uploader.path)).toBe(false) @@ -257,7 +270,7 @@ describe('uploader with tus protocol', () => { const uploadToken = uploader.token // validate that the test is resolved on socket connection - uploader.awaitReady().then(uploader.tryUploadStream(stream)) + uploader.awaitReady(60000).then(() => uploader.tryUploadStream(stream)) socketClient.connect(uploadToken) return new Promise((resolve, reject) => { diff --git a/test/__tests__/url.js b/test/__tests__/url.js index 201518cdad..a220f6091d 100644 --- a/test/__tests__/url.js +++ b/test/__tests__/url.js @@ -14,7 +14,7 @@ jest.mock('../../src/server/helpers/request', () => { }) const { getServer } = require('../mockserver') -const mockServer = getServer() +const mockServer = getServer({ COMPANION_CLIENT_SOCKET_CONNECT_TIMEOUT: '0' }) beforeAll(() => { nock('http://url.myendpoint.com').get('/files').reply(200, () => '') diff --git a/test/mockserver.js b/test/mockserver.js index b9c2de3346..87758162b3 100644 --- a/test/mockserver.js +++ b/test/mockserver.js @@ -1,4 +1,3 @@ -/* global jest:false */ const express = require('express') const session = require('express-session') @@ -36,6 +35,8 @@ const defaultEnv = { COMPANION_PATH: '', COMPANION_PERIODIC_PING_URLS: '', + + COMPANION_CLIENT_SOCKET_CONNECT_TIMEOUT: '', } function updateEnv (env) { @@ -54,7 +55,9 @@ module.exports.getServer = (extraEnv) => { updateEnv(env) - // delete from cache to force the server to reload companionOptions from the new env vars + // companion stores certain global state like emitter, metrics, logger (frozen object), so we need to reset modules + // todo rewrite companion to not use global state + // https://github.com/transloadit/uppy/issues/3284 jest.resetModules() const standalone = require('../src/standalone') const authServer = express()