From e06d9430267bd43b1ad7f5dd7b8a2473e81edb04 Mon Sep 17 00:00:00 2001 From: TSO Date: Wed, 27 Nov 2024 08:33:24 +0100 Subject: [PATCH] [IMP] wip --- src/config.js | 29 +++-- src/models/channel.js | 14 +- src/models/recorder.js | 180 +++++++++++++++++--------- src/models/session.js | 77 +++++++++++ src/server.js | 14 +- src/services/http.js | 17 ++- src/services/{rtc.js => resources.js} | 41 +++++- src/utils/errors.js | 4 + src/utils/ffmpeg.js | 0 src/utils/utils.js | 43 +++++- tests/http.test.js | 2 +- tests/models.test.js | 6 +- tests/rtc.test.js | 8 +- tests/utils/network.js | 6 +- 14 files changed, 339 insertions(+), 102 deletions(-) rename src/services/{rtc.js => resources.js} (68%) create mode 100644 src/utils/ffmpeg.js diff --git a/src/config.js b/src/config.js index 24dcb86..d963eba 100644 --- a/src/config.js +++ b/src/config.js @@ -175,16 +175,7 @@ export const LOG_COLOR = process.env.LOG_COLOR * * @type {boolean} */ -export const RECORDING = Boolean(process.env.RECORDING ?? true); -/** - * The file type of the recording output, this must be a fragmentable type. - * If not set, defaults to mp4 - * - * e.g: RECORDING_FILE_TYPE=mp4 - * - * @type {string} - */ -export const RECORDING_FILE_TYPE = process.env.RECORDING_FILE_TYPE || "mp4"; +export const RECORDING = !FALSY_INPUT.has(process.env.RECORDING); // ------------------------------------------------------------ // -------------------- SETTINGS -------------------------- @@ -219,6 +210,19 @@ const baseProducerOptions = { zeroRtpOnPause: true, }; +export const recording = Object.freeze({ + directory: os.tmpdir() + "/recordings", + enabled: RECORDING, + fileTTL: 1000 * 60 * 60 * 24, // 24 hours + fileType: "mp4", + videoLimit: 4, // how many videos can be merged into one recording +}); + +export const dynamicPorts = Object.freeze({ + min: 50000, + max: 59999, +}); + export const rtc = Object.freeze({ // https://mediasoup.org/documentation/v3/mediasoup/api/#WorkerSettings workerSettings: { @@ -247,6 +251,11 @@ export const rtc = Object.freeze({ }, ], }, + plainTransportOptions: { + listenIp: { ip: "0.0.0.0", announcedIp: PUBLIC_IP }, + rtcpMux: true, + comedia: false, + }, // https://mediasoup.org/documentation/v3/mediasoup/api/#WebRtcTransportOptions rtcTransportOptions: { maxSctpMessageSize: MAX_BUF_IN, diff --git a/src/models/channel.js b/src/models/channel.js index 073587a..084f73f 100644 --- a/src/models/channel.js +++ b/src/models/channel.js @@ -4,7 +4,7 @@ import * as config from "#src/config.js"; import { getAllowedCodecs, Logger } from "#src/utils/utils.js"; import { AuthenticationError, OvercrowdedError } from "#src/utils/errors.js"; import { Session, SESSION_CLOSE_CODE } from "#src/models/session.js"; -import { getWorker } from "#src/services/rtc.js"; +import { getWorker } from "#src/services/resources.js"; import { Recorder } from "#src/models/recorder.js"; const logger = new Logger("CHANNEL"); @@ -65,15 +65,16 @@ export class Channel extends EventEmitter { * @param {boolean} [options.useWebRtc=true] whether to use WebRTC: * with webRTC: can stream audio/video * without webRTC: can only use websocket + * @param {string} [options.uploadRoute] the route to which the recording will be uploaded */ - static async create(remoteAddress, issuer, { key, useWebRtc = true } = {}) { + static async create(remoteAddress, issuer, { key, useWebRtc = true, uploadRoute } = {}) { const safeIssuer = `${remoteAddress}::${issuer}`; const oldChannel = Channel.recordsByIssuer.get(safeIssuer); if (oldChannel) { logger.verbose(`reusing channel ${oldChannel.uuid}`); return oldChannel; } - const options = { key }; + const options = { key, uploadRoute }; if (useWebRtc) { options.worker = await getWorker(); options.router = await options.worker.createRouter({ @@ -128,8 +129,9 @@ export class Channel extends EventEmitter { * @param {string} [options.key] * @param {import("mediasoup").types.Worker} [options.worker] * @param {import("mediasoup").types.Router} [options.router] + * @param {string} [options.uploadRoute] the route to which the recording will be uploaded */ - constructor(remoteAddress, { key, worker, router } = {}) { + constructor(remoteAddress, { key, worker, router, uploadRoute } = {}) { super(); const now = new Date(); this.createDate = now.toISOString(); @@ -139,8 +141,8 @@ export class Channel extends EventEmitter { this.name = `${remoteAddress}*${this.uuid.slice(-5)}`; this.router = router; this._worker = worker; - if (config.RECORDING) { - this.recorder = new Recorder(this); + if (config.recording.enabled) { + this.recorder = new Recorder(this, uploadRoute); } this._onSessionClose = this._onSessionClose.bind(this); } diff --git a/src/models/recorder.js b/src/models/recorder.js index 2a404e3..d184514 100644 --- a/src/models/recorder.js +++ b/src/models/recorder.js @@ -1,38 +1,62 @@ import child_process from "node:child_process"; -import os from "node:os"; import path from "node:path"; import fs from "node:fs"; - import { EventEmitter } from "node:events"; // TODO remove if unnecessary -import { Logger } from "#src/utils/utils.js"; + +import { Logger, formatFfmpegSdp } from "#src/utils/utils.js"; import { STREAM_TYPE } from "#src/shared/enums.js"; -import { RECORDING_FILE_TYPE } from "#src/config.js"; +import { LOG_LEVEL, recording } from "#src/config.js"; +import * as config from "#src/config.js"; +import * as https from "node:https"; +import http from "node:http"; const logger = new Logger("RECORDER"); -const temp = os.tmpdir(); -const VIDEO_LIMIT = 4; - -/** - * @typedef {Object} RTPTransports - * @property {Array} audio - * @property {Array} camera - * @property {Array} screen - */ +fs.mkdir(recording.directory, { recursive: true }, (err) => { + if (err) { + logger.error(err); + } +}); +export function clearDirectory() { + const now = Date.now(); + fs.readdir(recording.directory, (err, files) => { + if (err) { + logger.error(err); + return; + } + for (const file of files) { + const stats = fs.statSync(path.join(recording.directory, file)); + if (stats.mtimeMs < now - config.recording.fileTTL) { + fs.unlink(path.join(recording.directory, file), (err) => { + if (err) { + logger.error(err); + } + logger.info(`Deleted recording ${file}`); + }); + } + fs.unlink(path.join(recording.directory, file), (err) => { + if (err) { + logger.error(err); + } + }); + } + }); +} /** * Wraps the FFMPEG process * TODO move in own file */ class FFMPEG extends EventEmitter { + /** @type {string} */ + filename; /** @type {child_process.ChildProcess} */ _process; /** @type {string} */ _filePath; get _args() { - return [ - "-loglevel", - "debug", // TODO warning in prod + const args = [ + // TODO "-protocol_whitelist", "pipe,udp,rtp", "-fflags", @@ -48,9 +72,13 @@ class FFMPEG extends EventEmitter { "-c:a", "aac", // audio codec "-f", - RECORDING_FILE_TYPE, + recording.fileType, this._filePath, ]; + if (LOG_LEVEL === "debug") { + args.unshift("-loglevel", "debug"); + } + return args; } /** @@ -59,6 +87,7 @@ class FFMPEG extends EventEmitter { constructor(filePath) { super(); this._filePath = filePath; + this.this.filename = `${Date.now()}.${recording.fileType}`; } /** @@ -72,7 +101,7 @@ class FFMPEG extends EventEmitter { if (!this._process.stdin.writable) { throw new Error("FFMPEG stdin not writable."); } - this._process.stdin.write(sdp); + this._process.stdin.write(sdp); // TODO (maybe pass args earlier) this._process.stdin.end(); this._process.stdout.on("data", (chunk) => { @@ -97,27 +126,39 @@ class FFMPEG extends EventEmitter { } export class Recorder extends EventEmitter { - static records = new Map(); - + /** @type {Map} */ + static generatedFiles = new Map(); /** @type {string} */ - uuid = crypto.randomUUID(); + uuid; /** @type {import("#src/models/channel").Channel} */ channel; /** @type {string} */ state; + /** @type {FFMPEG} */ ffmpeg; - /** @type {RTPTransports} */ - _rtpTransports; /** @type {string} */ filePath; + /** + * @param {string} uuid + * @param {http.ServerResponse} res + */ + static pipeToResponse(uuid, res) { + // TODO check if this can be executed, otherwise end request, or throw error (http service will throw anyways) + const fileStream = fs.createReadStream(Recorder.generatedFiles.get(uuid)); // may need to be explicitly closed? + res.writeHead(200, { + "Content-Type": `video/${recording.fileType}`, + "Content-Disposition": "inline", + }); + fileStream.pipe(res); // Pipe the file stream to the response + } /** * @param {import("#src/models/channel").Channel} channel + * @param {string} destination url to send the file to */ - constructor(channel) { + constructor(channel, destination) { super(); this.channel = channel; - this.filePath = path.join(temp, `${this.uuid}.${RECORDING_FILE_TYPE}`); - Recorder.records.set(this.uuid, this); + this._destination = destination; } /** @returns {number} */ @@ -129,60 +170,73 @@ export class Recorder extends EventEmitter { * @param {Array} ids * @returns {string} filePath */ - start(ids) { - // maybe internal state and check if already recording (recording = has ffmpeg child process). - this.stop(); + async start(ids) { + if (this.ffmpeg) { + return this.filePath; + } + this.uuid = crypto.randomUUID(); + const audioRtps = []; + const videoRtps = []; for (const id of ids) { const session = this.channel.sessions.get(id); - const audioRtp = this._createRtp( - session.producers[STREAM_TYPE.AUDIO], - STREAM_TYPE.AUDIO - ); - // TODO maybe some logic for priority on session id or stream type - audioRtp && this._rtpTransports.audio.push(audioRtp); + const audioRtpData = session.getRtp(STREAM_TYPE.AUDIO); + audioRtpData && audioRtps.push(audioRtpData); for (const type in [STREAM_TYPE.CAMERA, STREAM_TYPE.SCREEN]) { - if (this.videoCount < VIDEO_LIMIT) { - const rtp = this._createRtp(session.producers[type], type); - rtp && this._rtpTransports[type].push(rtp); + if (videoRtps.length < recording.videoLimit) { + const videoRtpData = session.getRtp(type); + videoRtpData && videoRtps.push(videoRtpData); } } } + this.filePath = path.join(recording.directory, `${this.uuid}.${recording.fileType}`); this.ffmpeg = new FFMPEG(this.filePath); - this.ffmpeg.spawn(); // args should be base on the rtp transports + await this.ffmpeg.spawn(formatFfmpegSdp(audioRtps, videoRtps)); // args should be base on the rtp transports + Recorder.generatedFiles.set(this.uuid, this.filePath); this.ffmpeg.once("success", () => { this.emit("download-ready", this.filePath); }); return this.filePath; } - pause() { - // TODO maybe shouldn't be able to pause + update(ids) { + // TODO see if ffmpeg input can be re-configured at runtime, otherwise no support or full restart + return this.filePath; } stop() { - // TODO - // cleanup all rtp transports - // stop ffmpeg process - Recorder.records.delete(this.uuid); + this.ffmpeg?.kill(); + this.uuid = undefined; + this.ffmpeg = undefined; } + upload() { + this.stop(); + if (!this._destination) { + logger.warn(`No upload destination set for ${this.uuid}`); + return; + } + const fileStream = fs.createReadStream(this.filePath); + const { hostname, pathname, protocol } = new URL(this._destination); + const options = { + hostname, + path: pathname, + method: "POST", + headers: { + "Content-Type": "application/octet-stream", + "Content-Length": fs.statSync(this.filePath).size, + }, + }; + + const request = (protocol === "https:" ? https : http).request(options, (res) => { + if (res.statusCode === 200) { + logger.info(`File uploaded to ${this._destination}`); + // TODO delete file + } else { + logger.error(`Failed to upload file: ${res.statusCode}`); + } + }); - /** - * @param {http.ServerResponse} res - */ - pipeToResponse(res) { - // TODO check if this can be executed, otherwise end request, or throw error - const fileStream = fs.createReadStream(this._filePath); // may need to be explicitly closed? - res.writeHead(200, { - "Content-Type": `video/${RECORDING_FILE_TYPE}`, - "Content-Disposition": "inline", + request.on("error", (error) => { + logger.error(`Failed to upload file: ${error.message}`); }); - fileStream.pipe(res); // Pipe the file stream to the response - } - /** - * @param {import("mediasoup").types.Producer} producer - * @param {STREAM_TYPE[keyof STREAM_TYPE]} type - * @return {Promise} probably just create transport with right ports and return that, - */ - async _createRtp(producer, type) { - // TODO + fileStream.pipe(request); } } diff --git a/src/models/session.js b/src/models/session.js index 4e5b75a..b09f853 100644 --- a/src/models/session.js +++ b/src/models/session.js @@ -8,6 +8,7 @@ import { SERVER_MESSAGE, SERVER_REQUEST, } from "#src/shared/enums.js"; +import { getPort, releasePort } from "#src/services/resources.js"; /** * @typedef {Object} SessionInfo @@ -42,6 +43,23 @@ import { * @property {import("mediasoup").types.Producer | null} screen */ +/** + * @typedef {Object} RtpData + * @property {import("mediasoup").types.PlainTransport} transport + * @property {string} payloadType + * @property {number} port + * @property {number} clockRate + * @property {string} codec + * @property {string} channels + */ + +/** + * @typedef {Object} RtpDataByType + * @property {RtpData | null} audio + * @property {RtpData | null} camera + * @property {RtpData | null} screen + */ + const logger = new Logger("SESSION"); export const SESSION_STATE = Object.freeze({ @@ -91,6 +109,15 @@ export class Session extends EventEmitter { camera: null, screen: null, }; + /** + * Transports and their information used to expose streams on a dynamic port. + * @type {Map} + */ + _rtp = { + audio: null, + camera: null, + screen: null, + }; /** @type {import("#src/models/channel").Channel} */ _channel; /** @type {Error[]} */ @@ -466,6 +493,56 @@ export class Session extends EventEmitter { } } + /** + * @param {STREAM_TYPE[keyof STREAM_TYPE]} type + * @return {Promise} + */ + async getRtp(type) { + if (this._rtp[type]) { + return this._rtp[type]; + } + const producer = this.producers[type]; + if (!producer) { + return; + } + const transport = await this._channel.router.createWebRtcTransport( + config.rtc.plainTransportOptions + ); + const port = getPort(); + await transport.connect({ + ip: "127.0.0.1", // just local, we only transport to a local child process + port, + }); + // TODO may want to use producer.getStats() to get the codec info + // for val of producer.getStats().values() { if val.type === "codec": val.minetype, val.clockRate,... } + //const codecData = this._channel.router.rtpCapabilities.codecs.find( + // (codec) => codec.kind === producer.kind + //); + const codecData = producer.rtpParameters.codecs[0]; + // TODO if we can hot swap consumers, the transport should be owned by the recorder + // TODO if not possible to swap here or even at ffmpeg level, worst case is to compose recording and merge it with ffmpeg again. + const consumer = await transport.consume({ + producerId: producer.id, + rtpCapabilities: producer.rtpParameters, + paused: true, + }); + this.once("close", () => { + consumer.close(); + transport.close(); + releasePort(port); + this._rtp[type] = null; + }); + this._rtp[type] = { + transport, + port, + payloadType: codecData.preferredPayloadType || codecData.payloadType, + clockRate: codecData.clockRate, + codec: codecData.mimeType.replace(`${producer.kind}`, ""), + channels: producer.kind === "audio" ? codecData.channels : undefined, + }; + return this._rtp[type]; + } + _broadcastInfo() { this._broadcast({ name: SERVER_MESSAGE.INFO_CHANGE, diff --git a/src/server.js b/src/server.js index 18ba10d..89ad1e3 100644 --- a/src/server.js +++ b/src/server.js @@ -1,22 +1,30 @@ -import * as rtc from "#src/services/rtc.js"; +import * as resources from "#src/services/resources.js"; import * as http from "#src/services/http.js"; import * as auth from "#src/services/auth.js"; import { Logger } from "#src/utils/utils.js"; import { Channel } from "#src/models/channel.js"; +import { clearDirectory } from "#src/models/recorder.js"; const logger = new Logger("SERVER", { logLevel: "all" }); +let fileCleanupInterval; async function run() { + clearDirectory(); + fileCleanupInterval = setInterval(() => { + clearDirectory(); + }, 1000 * 60 * 60 * 24); await auth.start(); - await rtc.start(); + await resources.start(); await http.start(); logger.info(`ready - PID: ${process.pid}`); } function cleanup() { + clearInterval(fileCleanupInterval); + clearDirectory(); Channel.closeAll(); http.close(); - rtc.close(); + resources.close(); logger.info("cleanup complete"); } diff --git a/src/services/http.js b/src/services/http.js index 8d27957..a3ae9a4 100644 --- a/src/services/http.js +++ b/src/services/http.js @@ -61,10 +61,15 @@ export async function start({ httpInterface = config.HTTP_INTERFACE, port = conf res.statusCode = 403; // forbidden return res.end(); } - const channel = await Channel.create(remoteAddress, claims.iss, { + const options = { key: claims.key, useWebRtc: searchParams.get("webRTC") !== "false", - }); + }; + const uploadRoute = searchParams.get("uploadRoute"); + if (uploadRoute) { + options.uploadRoute = `${protocol}://${host}/${uploadRoute}`; + } + const channel = await Channel.create(remoteAddress, claims.iss, options); res.setHeader("Content-Type", "application/json"); res.statusCode = 200; return res.end( @@ -79,12 +84,12 @@ export async function start({ httpInterface = config.HTTP_INTERFACE, port = conf return res.end(); }, }); - routeListener.get(`/v${API_VERSION}/recording/`, { + routeListener.get(`/v${API_VERSION}/recording/`, { callback: async (req, res, { remoteAddress, match }) => { try { - const { token } = match; - logger.info(`[${remoteAddress}]: requested recording ${token}`); - Recorder.records.get(token)?.pipeToResponse(res); + const { uuid } = match; + logger.info(`[${remoteAddress}]: requested recording ${uuid}`); + Recorder.pipeToResponse(uuid, res); // res not ended as we are streaming } catch (error) { logger.error(`[${remoteAddress}] failed to obtain recording: ${error.message}`); diff --git a/src/services/rtc.js b/src/services/resources.js similarity index 68% rename from src/services/rtc.js rename to src/services/resources.js index 9662995..3b16883 100644 --- a/src/services/rtc.js +++ b/src/services/resources.js @@ -2,8 +2,14 @@ import mediasoup from "mediasoup"; import * as config from "#src/config.js"; import { Logger } from "#src/utils/utils.js"; +import { PortLimitReachedError } from "#src/utils/errors.js"; -const logger = new Logger("RTC"); +const port_span = config.dynamicPorts.max - config.dynamicPorts.min; +let port_offset = 0; +/** @type {Set} */ +const usedPorts = new Set(); + +const logger = new Logger("RESOURCES"); /** @type {Set} */ const workers = new Set(); @@ -13,6 +19,7 @@ export async function start() { for (let i = 0; i < config.NUM_WORKERS; ++i) { await makeWorker(); } + logger.info(`${port_span} dynamic ports available`); logger.info(`initialized ${workers.size} mediasoup workers`); logger.info( `transport(RTC) layer at ${config.PUBLIC_IP}:${config.RTC_MIN_PORT}-${config.RTC_MAX_PORT}` @@ -24,6 +31,8 @@ export function close() { worker.appData.webRtcServer.close(); worker.close(); } + port_offset = 0; + usedPorts.clear(); workers.clear(); } @@ -64,3 +73,33 @@ export async function getWorker() { logger.debug(`worker ${leastUsedWorker.pid} with ${lowestUsage} ru_maxrss was selected`); return leastUsedWorker; } + +/** + * @returns {number} + */ +function _getPort() { + port_offset++; + return config.dynamicPorts.min + (port_offset % port_span); +} +/** + * Returns a dynamic port that is not in use. + * @returns {number} + */ +export function getPort() { + let port = _getPort(); + if (usedPorts.size === port_span) { + throw new PortLimitReachedError(); + } + while (usedPorts.has(port)) { + port = _getPort(); + } + usedPorts.add(port); + return port; +} + +/** + * @param {number} port + */ +export function releasePort(port) { + usedPorts.delete(port); +} diff --git a/src/utils/errors.js b/src/utils/errors.js index 5eb7855..62ee4f9 100644 --- a/src/utils/errors.js +++ b/src/utils/errors.js @@ -5,3 +5,7 @@ export class AuthenticationError extends Error { export class OvercrowdedError extends Error { name = "OvercrowdedError"; } + +export class PortLimitReachedError extends Error { + name = "PortLimitReachedError"; +} diff --git a/src/utils/ffmpeg.js b/src/utils/ffmpeg.js new file mode 100644 index 0000000..e69de29 diff --git a/src/utils/utils.js b/src/utils/utils.js index d1c6b43..71c29d3 100644 --- a/src/utils/utils.js +++ b/src/utils/utils.js @@ -114,9 +114,9 @@ export function parseBody(req, { json } = {}) { * @returns {{ host: string, pathname: string, protocol: string, remoteAddress: string, searchParams: URLSearchParams }} */ export function extractRequestInfo(req) { - const url = new URL(req.url, "http://localhost/"); // the second argument is required to parse the url correctly, but not relevant. + const url = new URL(req.url, `http://${req.headers.host}/`); const info = { - host: `${config.PUBLIC_IP}:${config.PORT}`, + host: req.headers.host, pathname: url.pathname, protocol: "http", remoteAddress: req.socket.remoteAddress, @@ -165,3 +165,42 @@ export function getAllowedCodecs() { } return codecs; } + +/** + * hard-coded ffmpeg sdp fragments for layouts with 1...4 videos + * TODO make the right resizing and vstack/hstack params + */ +const LAYOUT = { + 1: "TODO layout for 1 video", + 2: "TODO layout for 2 videos", + 3: "TODO layout for 3 videos", + 4: "TODO layout for 4 videos", +}; + +/** + * TODO + * @param {RtpData[]} audioRtps + * @param {RtpData[]} videoRtps + * @return {string[]} + */ +export function formatFfmpegSdp(audioRtps, videoRtps) { + // array of strings containing the sdp for ffmpeg, related to the stacking of videos + const sdp = ["v=0", "o=- 0 0 IN IP4 127.0.0.1", "s=FFmpeg", "c=IN IP4 127.0.0.1", "t=0 0"]; + const layout = LAYOUT[videoRtps.length]; + if (!layout) { + throw new Error(`unsupported layout for ${videoRtps.length} videos`); + } + for (const audioRtp of audioRtps) { + sdp.push(`m=audio ${audioRtp.port} RTP/AVP ${audioRtp.payloadType}`); + sdp.push(`a=rtpmap:${audioRtp.payloadType} ${audioRtp.codec}/${audioRtp.clockRate}`); + sdp.push(`a=sendonly`); + } + for (const videoRtp of videoRtps) { + // TODO do something with layout. Layout may contain a format function that takes below values as params, or the whole videoRtps[]. + sdp.push(`m=video ${videoRtp.port} RTP/AVP ${videoRtp.payloadType}`); + sdp.push(`a=rtpmap:${videoRtp.payloadType} ${videoRtp.codec}/${videoRtp.clockRate}`); + sdp.push(`a=sendonly`); + } + // TODO, layout only a small part of the full SDP. + return sdp; +} diff --git a/tests/http.test.js b/tests/http.test.js index bb70864..8b5b77a 100644 --- a/tests/http.test.js +++ b/tests/http.test.js @@ -72,7 +72,7 @@ describe("HTTP", () => { expect(response.ok).toBe(true); const { uuid, url } = await response.json(); expect(Channel.records.get(uuid)).toBeDefined(); - expect(url).toBe(`http://${config.PUBLIC_ADDRESS}:${config.PORT}`); + expect(url).toBe(`http://${HTTP_INTERFACE}:${PORT}`); }); test("/noop", async () => { const response = await fetch(`http://${HTTP_INTERFACE}:${PORT}/v${API_VERSION}/noop`, { diff --git a/tests/models.test.js b/tests/models.test.js index ce8a20c..be41778 100644 --- a/tests/models.test.js +++ b/tests/models.test.js @@ -1,17 +1,17 @@ import { describe, beforeEach, afterEach, expect, jest } from "@jest/globals"; -import * as rtc from "#src/services/rtc.js"; +import * as resources from "#src/services/resources.js"; import { Channel } from "#src/models/channel.js"; import { timeouts, CHANNEL_SIZE } from "#src/config.js"; import { OvercrowdedError } from "#src/utils/errors.js"; describe("Models", () => { beforeEach(async () => { - await rtc.start(); + await resources.start(); }); afterEach(() => { Channel.closeAll(); - rtc.close(); + resources.close(); }); test("Create channel and session", async () => { const channel = await Channel.create("testRemote", "testIssuer"); diff --git a/tests/rtc.test.js b/tests/rtc.test.js index 5926c78..96ebcd5 100644 --- a/tests/rtc.test.js +++ b/tests/rtc.test.js @@ -1,19 +1,19 @@ import { afterEach, beforeEach, describe, expect } from "@jest/globals"; -import * as rtc from "#src/services/rtc.js"; +import * as resources from "#src/services/resources.js"; import * as config from "#src/config.js"; describe("rtc service", () => { beforeEach(async () => { - await rtc.start(); + await resources.start(); }); afterEach(() => { - rtc.close(); + resources.close(); }); test("worker load should be evenly distributed", async () => { const usedWorkers = new Set(); for (let i = 0; i < config.NUM_WORKERS; ++i) { - const worker = await rtc.getWorker(); + const worker = await resources.getWorker(); const router = await worker.createRouter({}); const webRtcServer = await worker.createWebRtcServer(config.rtc.rtcServerOptions); const promises = []; diff --git a/tests/utils/network.js b/tests/utils/network.js index d9ff48d..622a0d1 100644 --- a/tests/utils/network.js +++ b/tests/utils/network.js @@ -6,7 +6,7 @@ import * as fakeParameters from "mediasoup-client/lib/test/fakeParameters"; import * as auth from "#src/services/auth.js"; import * as http from "#src/services/http.js"; -import * as rtc from "#src/services/rtc.js"; +import * as resources from "#src/services/resources.js"; import { SfuClient, SFU_CLIENT_STATE } from "#src/client.js"; import { Channel } from "#src/models/channel.js"; @@ -39,7 +39,7 @@ export class LocalNetwork { async start(hostname, port) { this.hostname = hostname; this.port = port; - await rtc.start(); + await resources.start(); await http.start({ hostname, port }); await auth.start(HMAC_B64_KEY); } @@ -122,6 +122,6 @@ export class LocalNetwork { Channel.closeAll(); auth.close(); http.close(); - rtc.close(); + resources.close(); } }