diff --git a/src/client.js b/src/client.js index e954d25..ada7532 100644 --- a/src/client.js +++ b/src/client.js @@ -85,6 +85,10 @@ const ACTIVE_STATES = new Set([ export class SfuClient extends EventTarget { /** @type {Error[]} */ errors = []; + /** + * @type {{ recording: boolean, webRtc: boolean }} + */ + features = {}; /** @type {SFU_CLIENT_STATE[keyof SFU_CLIENT_STATE]} */ _state = SFU_CLIENT_STATE.DISCONNECTED; /** @type {Bus | undefined} */ @@ -418,7 +422,9 @@ export class SfuClient extends EventTarget { */ webSocket.addEventListener( "message", - () => { + (initDataMessage) => { + const { features } = JSON.parse(initDataMessage.data || initDataMessage); + this.features = features; resolve(new Bus(webSocket)); }, { once: true } diff --git a/src/config.js b/src/config.js index 8f68ca6..1663636 100644 --- a/src/config.js +++ b/src/config.js @@ -1,6 +1,6 @@ import os from "node:os"; -const FALSY_INPUT = new Set(["disable", "false", "none", "no", "0"]); +const FALSY_INPUT = new Set(["disable", "false", "none", "no", "0", "off"]); // ------------------------------------------------------------ // ------------------ ENV VARIABLES ----------------------- @@ -167,6 +167,15 @@ export const LOG_TIMESTAMP = !FALSY_INPUT.has(process.env.LOG_TIMESTAMP); export const LOG_COLOR = process.env.LOG_COLOR ? Boolean(process.env.LOG_COLOR) : process.stdout.isTTY; +/** + * Whether recording is allowed + * If true, users can request their call to be recorded. + * + * e.g: RECORDING=1 or RECORDING=off + * + * @type {boolean} + */ +export const RECORDING = !FALSY_INPUT.has(process.env.RECORDING); // ------------------------------------------------------------ // -------------------- SETTINGS -------------------------- @@ -201,6 +210,24 @@ const baseProducerOptions = { zeroRtpOnPause: true, }; +export const recording = Object.freeze({ + directory: os.tmpdir() + "/recordings", + enabled: RECORDING, + maxDuration: 1000 * 60 * 60, // 1 hour + fileTTL: 1000 * 60 * 60 * 24, // 24 hours + fileType: "mp4", + videoCodec: "libx264", + audioCodec: "aac", + audioLimit: 20, + cameraLimit: 4, // how many camera can be merged into one recording + screenLimit: 1, +}); + +export const dynamicPorts = Object.freeze({ + min: 50000, + max: 59999, +}); + export const rtc = Object.freeze({ // https://mediasoup.org/documentation/v3/mediasoup/api/#WorkerSettings workerSettings: { @@ -229,6 +256,11 @@ export const rtc = Object.freeze({ }, ], }, + plainTransportOptions: { + listenIp: { ip: "0.0.0.0", announcedIp: PUBLIC_IP }, + rtcpMux: true, + comedia: false, + }, // https://mediasoup.org/documentation/v3/mediasoup/api/#WebRtcTransportOptions rtcTransportOptions: { maxSctpMessageSize: MAX_BUF_IN, diff --git a/src/models/channel.js b/src/models/channel.js index 2af4d70..b22064a 100644 --- a/src/models/channel.js +++ b/src/models/channel.js @@ -4,7 +4,8 @@ import * as config from "#src/config.js"; import { getAllowedCodecs, Logger } from "#src/utils/utils.js"; import { AuthenticationError, OvercrowdedError } from "#src/utils/errors.js"; import { Session, SESSION_CLOSE_CODE } from "#src/models/session.js"; -import { getWorker } from "#src/services/rtc.js"; +import { getWorker } from "#src/services/resources.js"; +import { Recorder } from "#src/models/recorder.js"; const logger = new Logger("CHANNEL"); @@ -18,11 +19,27 @@ const mediaCodecs = getAllowedCodecs(); * @property {number} screenCount */ +/** + * @typedef {Object} Features + * @property {boolean} recording + * @property {boolean} webRtc + */ + /** * @fires Channel#sessionJoin * @fires Channel#sessionLeave * @fires Channel#close */ + +/** + * @typedef {Object} ChannelStats + * @property {string} uuid + * @property {string} remoteAddress + * @property {SessionsStats} sessionsStats + * @property {string} createDate + * @property {boolean} webRtcEnabled + */ + export class Channel extends EventEmitter { /** @type {Map} */ static records = new Map(); @@ -39,6 +56,8 @@ export class Channel extends EventEmitter { name; /** @type {WithImplicitCoercion} base 64 buffer key */ key; + /** @type {Recorder | undefined} */ + recorder; /** @type {import("mediasoup").types.Router}*/ router; /** @type {Map} */ @@ -56,15 +75,16 @@ export class Channel extends EventEmitter { * @param {boolean} [options.useWebRtc=true] whether to use WebRTC: * with webRTC: can stream audio/video * without webRTC: can only use websocket + * @param {string} [options.uploadRoute] the route to which the recording will be uploaded */ - static async create(remoteAddress, issuer, { key, useWebRtc = true } = {}) { + static async create(remoteAddress, issuer, { key, useWebRtc = true, uploadRoute } = {}) { const safeIssuer = `${remoteAddress}::${issuer}`; const oldChannel = Channel.recordsByIssuer.get(safeIssuer); if (oldChannel) { logger.verbose(`reusing channel ${oldChannel.uuid}`); return oldChannel; } - const options = { key }; + const options = { key, uploadRoute }; if (useWebRtc) { options.worker = await getWorker(); options.router = await options.worker.createRouter({ @@ -119,8 +139,9 @@ export class Channel extends EventEmitter { * @param {string} [options.key] * @param {import("mediasoup").types.Worker} [options.worker] * @param {import("mediasoup").types.Router} [options.router] + * @param {string} [options.uploadRoute] the route to which the recording will be uploaded */ - constructor(remoteAddress, { key, worker, router } = {}) { + constructor(remoteAddress, { key, worker, router, uploadRoute } = {}) { super(); const now = new Date(); this.createDate = now.toISOString(); @@ -130,11 +151,24 @@ export class Channel extends EventEmitter { this.name = `${remoteAddress}*${this.uuid.slice(-5)}`; this.router = router; this._worker = worker; + if (config.recording.enabled) { + this.recorder = new Recorder(this, uploadRoute); + } this._onSessionClose = this._onSessionClose.bind(this); } /** - * @returns {Promise<{ uuid: string, remoteAddress: string, sessionsStats: SessionsStats, createDate: string }>} + * @type {Features} + */ + get features() { + return { + recording: Boolean(this.recorder), + webRtc: Boolean(this.router), + }; + } + + /** + * @returns {Promise} */ async getStats() { return { @@ -249,6 +283,7 @@ export class Channel extends EventEmitter { } clearTimeout(this._closeTimeout); this.sessions.clear(); + this.recorder?.stop(); Channel.records.delete(this.uuid); /** * @event Channel#close diff --git a/src/models/recorder.js b/src/models/recorder.js new file mode 100644 index 0000000..5267788 --- /dev/null +++ b/src/models/recorder.js @@ -0,0 +1,222 @@ +import path from "node:path"; +import fs from "node:fs"; +import { EventEmitter } from "node:events"; // TODO remove if unnecessary + +import { Logger } from "#src/utils/utils.js"; +import { STREAM_TYPE } from "#src/shared/enums.js"; +import { FFMPEG } from "#src/utils/ffmpeg.js"; +import { recording } from "#src/config.js"; +import * as config from "#src/config.js"; +import { upload as httpUpload } from "#src/services/http.js"; + +const logger = new Logger("RECORDER"); + +export const RECORDER_STATE = { + IDLE: "IDLE", + RECORDING: "RECORDING", + UPLOADING: "UPLOADING", + UPDATING: "UPDATING", +}; + +fs.mkdir(recording.directory, { recursive: true }, (err) => { + if (err) { + logger.error(err); + } +}); +export function clearDirectory() { + const now = Date.now(); + fs.readdir(recording.directory, (err, files) => { + if (err) { + logger.error(err); + return; + } + for (const file of files) { + const stats = fs.statSync(path.join(recording.directory, file)); + if (stats.mtimeMs < now - config.recording.fileTTL) { + fs.unlink(path.join(recording.directory, file), (err) => { + if (err) { + logger.error(err); + } + logger.info(`Deleted recording ${file}`); + }); + } + fs.unlink(path.join(recording.directory, file), (err) => { + if (err) { + logger.error(err); + } + }); + } + }); +} + +/** + * @fires Recorder#stateChange + * @fires Recorder#ready + */ +export class Recorder extends EventEmitter { + /** @type {Map} */ + static generatedFiles = new Map(); + /** @type {string} */ + uuid; + /** @type {import("#src/models/channel").Channel} */ + channel; + /** @type {FFMPEG} */ + ffmpeg; + /** @type {string} */ + _destination; + /** @type {number} */ + _limitTimeout; + /** @type {string[]} */ + _tempFilePathAccumulator = []; + /** @type {RECORDER_STATE[keyof RECORDER_STATE]} */ + _state = RECORDER_STATE.IDLE; + + /** + * @param {import("#src/models/channel").Channel} channel + * @param {string} destination url to send the file to + */ + constructor(channel, destination) { + super(); + this.channel = channel; + this._destination = destination; + } + + /** + * @param {RECORDER_STATE[keyof RECORDER_STATE]} state + * @fires Recorder#stateChange + */ + set state(state) { + this._state = state; + /** + * stateChange event. + * @event Recorder#stateChange + * @type {string} `RECORDER_STATE` + */ + this.emit("stateChange", state); + } + + /** + * @param {Array} ids TODO may specify more than just ids, maybe we want specific streams. could be some tuple [id, mediaTypes] + */ + async start(ids) { + if (this.ffmpeg) { + logger.debug("Already recording"); + return; + } + this._limitTimeout = setTimeout(() => { + this.upload(); + }, recording.maxDuration); + this.uuid = crypto.randomUUID(); + return this._start_fragment(ids); + } + + async _start_fragment(ids) { + const oldProcess = this.ffmpeg; + this.state = RECORDER_STATE.UPDATING; + /** @type {RtpData[]} */ + const audioRtps = []; + /** @type {RtpData[]} */ + const cameraRtps = []; + /** @type {RtpData[]} */ + const screenRtps = []; + for (const id of ids) { + const session = this.channel.sessions.get(id); + if (!session) { + logger.warn(`Session ${id} not found`); + continue; + } + if (audioRtps.length < recording.audioLimit) { + const audioRtpData = await session.getRtp(STREAM_TYPE.AUDIO); + audioRtpData && audioRtps.push(audioRtpData); + } + if (cameraRtps.length < recording.cameraLimit) { + const cameraRtpData = await session.getRtp(STREAM_TYPE.CAMERA); + cameraRtpData && cameraRtps.push(cameraRtpData); + } + if (screenRtps.length < recording.screenLimit) { + const screenRtpData = await session.getRtp(STREAM_TYPE.SCREEN); + screenRtpData && screenRtps.push(screenRtpData); + } + } + const tempPath = path.join( + recording.directory, + `__FRAGMENT__-${this.uuid}-${Date.now()}.${recording.fileType}` + ); + this.ffmpeg = new FFMPEG(tempPath); + try { + await this.ffmpeg.merge({ audioRtps, screenRtps, cameraRtps }); // args should be base on the rtp transports + this.state = RECORDER_STATE.RECORDING; + } catch (error) { + logger.error(`Failed to start recording: ${error.message}`); + this.stop(); + } + oldProcess?.kill(); + this._tempFilePathAccumulator.push(tempPath); + } + + async update(ids) { + /** TODO see if ffmpeg input can be re-configured at runtime, otherwise full restart + * Possibilities for hot-swap: + * - ffmpeg stdin is writable, so it may be possible to write new sdp (with new inputs) to it + * - could see if the consumer of the RtpTransport can be swapped at runtime, in which case, RtpTransport should + * be owned by the Recorder (4 RtpTransport per recorder, and consume on demand). + * If hot-swap is not possible: + * Kill the ffmpeg process and register the path in a queue (array). + * Keep killing and starting processes as update is called, + * kill should happen as late as possible (when next process has started) to avoid losses. + * When "upload" is called, first use ffmpeg again to merge all the files in the queue. + * then upload that real file. (if queue.length === 1, just upload that file). + */ + await this._start_fragment(ids); + } + stop() { + this.ffmpeg?.kill(); + this.uuid = undefined; + this.ffmpeg = undefined; + this._tempFilePathAccumulator = []; // TODO probably also delete all files here + clearTimeout(this._limitTimeout); + this.state = RECORDER_STATE.IDLE; + } + + /** + * @fires Recorder#ready + */ + async upload() { + const filePaths = this._tempFilePathAccumulator; + this.stop(); + if (!this._destination) { + logger.warn(`No upload destination set for ${this.uuid}`); + return; + } + if (filePaths.length === 0) { + logger.warn(`No files to upload for ${this.uuid}`); + return; + } + this.state = RECORDER_STATE.UPLOADING; + const filePath = await this._mergeFiles(filePaths); + Recorder.generatedFiles.set(this.uuid, filePath); + /** + * @event Recorder#ready + * @type {string} `uuid` + */ + this.emit("ready", this.uuid); + httpUpload(filePath, this._destination); + this.state = RECORDER_STATE.IDLE; + } + + /** + * @param {string[]} filePaths + */ + async _mergeFiles(filePaths) { + // TODO + if (filePaths.length === 1) { + return filePaths[0]; + } + const ffmpeg = new FFMPEG( + path.join(recording.directory, `__MERGED__-${this.uuid}.${recording.fileType}`) + ); + // should await for ffmpeg complete event. + await ffmpeg.concat(filePaths); + return ""; // TODO merge logic with FFMPEG + } +} diff --git a/src/models/session.js b/src/models/session.js index 4e5b75a..0017723 100644 --- a/src/models/session.js +++ b/src/models/session.js @@ -8,6 +8,7 @@ import { SERVER_MESSAGE, SERVER_REQUEST, } from "#src/shared/enums.js"; +import { getPort, releasePort } from "#src/services/resources.js"; /** * @typedef {Object} SessionInfo @@ -42,6 +43,24 @@ import { * @property {import("mediasoup").types.Producer | null} screen */ +/** + * @typedef {Object} RtpData + * @property {import("mediasoup").types.PlainTransport} transport + * @property {string} payloadType + * @property {number} port + * @property {number} clockRate + * @property {string} codec + * @property {string} channels + * @property {string} label + */ + +/** + * @typedef {Object} RtpDataByType + * @property {RtpData | null} audio + * @property {RtpData | null} camera + * @property {RtpData | null} screen + */ + const logger = new Logger("SESSION"); export const SESSION_STATE = Object.freeze({ @@ -83,6 +102,8 @@ export class Session extends EventEmitter { }); /** @type {string} */ remote; + /** @type {string} */ + userName; /** @type {Map} */ _consumers = new Map(); /** @type {Producers} */ @@ -91,6 +112,15 @@ export class Session extends EventEmitter { camera: null, screen: null, }; + /** + * Transports and their information used to expose streams on a dynamic port. + * @type {Map} + */ + _rtp = { + audio: null, + camera: null, + screen: null, + }; /** @type {import("#src/models/channel").Channel} */ _channel; /** @type {Error[]} */ @@ -129,7 +159,7 @@ export class Session extends EventEmitter { * @returns {string} */ get name() { - return `${this._channel.name}:${this.id}@${this.remote}`; + return `${this._channel.name}:${this.id}@${this.remote} (${this.userName})`; } /** @@ -466,6 +496,57 @@ export class Session extends EventEmitter { } } + /** + * @param {STREAM_TYPE[keyof STREAM_TYPE]} type + * @return {RtpData} + */ + async getRtp(type) { + if (this._rtp[type]) { + return this._rtp[type]; + } + const producer = this.producers[type]; + if (!producer) { + return; + } + const transport = await this._channel.router.createWebRtcTransport( + config.rtc.plainTransportOptions + ); + const port = getPort(); + await transport.connect({ + ip: "127.0.0.1", // just local, we only transport to a local child process + port, + }); + // TODO may want to use producer.getStats() to get the codec info + // for val of producer.getStats().values() { if val.type === "codec": val.minetype, val.clockRate,... } + //const codecData = this._channel.router.rtpCapabilities.codecs.find( + // (codec) => codec.kind === producer.kind + //); + const codecData = producer.rtpParameters.codecs[0]; + // TODO if we can hot swap consumers, the transport should be owned by the recorder + // TODO if not possible to swap here or even at ffmpeg level, worst case is to compose recording and merge it with ffmpeg again. + const consumer = await transport.consume({ + producerId: producer.id, + rtpCapabilities: producer.rtpParameters, + paused: true, + }); + this.once("close", () => { + consumer.close(); + transport.close(); + releasePort(port); + this._rtp[type] = null; + }); + this._rtp[type] = { + transport, + port, + payloadType: codecData.preferredPayloadType || codecData.payloadType, + clockRate: codecData.clockRate, + codec: codecData.mimeType.replace(`${producer.kind}`, ""), + channels: producer.kind === "audio" ? codecData.channels : undefined, + label: this.userName, + }; + return this._rtp[type]; + } + _broadcastInfo() { this._broadcast({ name: SERVER_MESSAGE.INFO_CHANGE, diff --git a/src/server.js b/src/server.js index 18ba10d..89ad1e3 100644 --- a/src/server.js +++ b/src/server.js @@ -1,22 +1,30 @@ -import * as rtc from "#src/services/rtc.js"; +import * as resources from "#src/services/resources.js"; import * as http from "#src/services/http.js"; import * as auth from "#src/services/auth.js"; import { Logger } from "#src/utils/utils.js"; import { Channel } from "#src/models/channel.js"; +import { clearDirectory } from "#src/models/recorder.js"; const logger = new Logger("SERVER", { logLevel: "all" }); +let fileCleanupInterval; async function run() { + clearDirectory(); + fileCleanupInterval = setInterval(() => { + clearDirectory(); + }, 1000 * 60 * 60 * 24); await auth.start(); - await rtc.start(); + await resources.start(); await http.start(); logger.info(`ready - PID: ${process.pid}`); } function cleanup() { + clearInterval(fileCleanupInterval); + clearDirectory(); Channel.closeAll(); http.close(); - rtc.close(); + resources.close(); logger.info("cleanup complete"); } diff --git a/src/services/http.js b/src/services/http.js index 26da53e..f910a32 100644 --- a/src/services/http.js +++ b/src/services/http.js @@ -6,6 +6,10 @@ import * as config from "#src/config.js"; import { Logger, parseBody, extractRequestInfo } from "#src/utils/utils.js"; import { SESSION_CLOSE_CODE } from "#src/models/session.js"; import { Channel } from "#src/models/channel.js"; +import { Recorder } from "#src/models/recorder.js"; +import fs from "node:fs"; +import path from "node:path"; +import * as https from "node:https"; /** * @typedef {function} routeCallback @@ -15,6 +19,7 @@ import { Channel } from "#src/models/channel.js"; * @param {string} param2.remoteAddress * @param {string} param2.protocol * @param {string} param2.host + * @param {Object} param2.match name/value mapping of route variables * @param {URLSearchParams} param2.searchParams * @return {http.ServerResponse} */ @@ -31,12 +36,22 @@ let httpServer; export async function start({ httpInterface = config.HTTP_INTERFACE, port = config.PORT } = {}) { logger.info("starting..."); const routeListener = new RouteListener(); + /** + * A no-operation endpoint that returns a simple "ok" response. + * + * @returns {http.ServerResponse>}. + */ routeListener.get(`/v${API_VERSION}/noop`, { callback: (req, res) => { res.statusCode = 200; return res.end(JSON.stringify({ result: "ok" })); }, }); + /** + * Retrieves statistics for all channels. + * + * @returns {http.ServerResponse>} + */ routeListener.get(`/v${API_VERSION}/stats`, { callback: async (req, res) => { const proms = []; @@ -48,6 +63,12 @@ export async function start({ httpInterface = config.HTTP_INTERFACE, port = conf return res.end(JSON.stringify(channelStats)); }, }); + /** + * @param {URLSearchParams} searchParams (query parameters) + * @param {undefined | "false"} searchParams.webRTC whether to use WebRTC or not + * @param {string} searchParams.uploadRoute the route to which recordings will be uploaded + * @returns {http.ServerResponse>} + */ routeListener.get(`/v${API_VERSION}/channel`, { callback: async (req, res, { host, protocol, remoteAddress, searchParams }) => { try { @@ -59,10 +80,12 @@ export async function start({ httpInterface = config.HTTP_INTERFACE, port = conf res.statusCode = 403; // forbidden return res.end(); } - const channel = await Channel.create(remoteAddress, claims.iss, { + const options = { key: claims.key, useWebRtc: searchParams.get("webRTC") !== "false", - }); + uploadRoute: searchParams.get("uploadRoute"), // TODO this route should be constrained to avoid being use for DDoS (eg for a malicious Odoo.sh customer) + }; + const channel = await Channel.create(remoteAddress, claims.iss, options); res.setHeader("Content-Type", "application/json"); res.statusCode = 200; return res.end( @@ -77,6 +100,34 @@ export async function start({ httpInterface = config.HTTP_INTERFACE, port = conf return res.end(); }, }); + /** + * Streams a recording with the specified UUID. + */ + routeListener.get(`/v${API_VERSION}/recording/`, { + callback: async (req, res, { remoteAddress, match }) => { + try { + const { uuid } = match; + logger.info(`[${remoteAddress}]: requested recording ${uuid}`); + const filePath = Recorder.generatedFiles.get(uuid); + if (!filePath) { + res.statusCode = 404; + return res.end(); + } + res.setHeader("Content-Type", "application/octet-stream"); + res.setHeader( + "Content-Disposition", + `attachment; filename="${path.basename(filePath)}"` + ); + return fs.createReadStream(filePath).pipe(res); + } catch (error) { + logger.error(`[${remoteAddress}] failed to obtain recording: ${error.message}`); + return res.end(); + } + }, + }); + /** + * Disconnects sessions from channels based on the provided JWT. + */ routeListener.post(`/v${API_VERSION}/disconnect`, { callback: async (req, res, { remoteAddress }) => { try { @@ -124,6 +175,33 @@ export function close() { httpServer?.close(); } +export function upload(filePath, destination) { + const fileStream = fs.createReadStream(filePath); + const { hostname, pathname, protocol } = new URL(destination); + const options = { + hostname, + path: pathname, + method: "POST", + headers: { + "Content-Type": "application/octet-stream", + "Content-Length": fs.statSync(filePath).size, + }, + }; + // TODO implement the route and route-passing in odoo/discuss + const request = (protocol === "https:" ? https : http).request(options, (res) => { + if (res.statusCode === 200) { + logger.info(`File uploaded to ${destination}`); + // TODO delete file + } else { + logger.error(`Failed to upload file: ${res.statusCode}`); + } + }); + request.on("error", (error) => { + logger.error(`Failed to upload file: ${error.message}`); + }); + fileStream.pipe(request); +} + class RouteListener { /** @type {Map} */ GETs = new Map(); @@ -183,7 +261,8 @@ class RouteListener { break; } for (const [pattern, options] of registeredRoutes) { - if (pathname === pattern) { + const match = this._extractPattern(pathname, pattern); + if (match) { if (options?.cors) { res.setHeader("Access-Control-Allow-Origin", options.cors); res.setHeader("Access-Control-Allow-Methods", options.methods); @@ -195,6 +274,7 @@ class RouteListener { host, protocol, remoteAddress, + match, searchParams, }); } catch (error) { @@ -212,4 +292,32 @@ class RouteListener { } return res.end(); } + + /** + * Matches a pathname against a pattern with named parameters. + * @param {string} pathname - The URL path requested, e.g., "/channel/6/person/42/" + * @param {string} pattern - The pattern to match, e.g., "/channel//session/" + * @returns {object|undefined} - Returns undefined if no match. If matched, returns an object mapping keys to values, + * the object is empty if matching a pattern with no variables. + * eg: { channelId: "6", sessionId: "42" } | {} | undefined + */ + _extractPattern(pathname, pattern) { + pathname = pathname.replace(/\/+$/, ""); + pattern = pattern.replace(/\/+$/, ""); + const paramNames = []; + const regexPattern = pattern.replace(/<([^>]+)>/g, (_, paramName) => { + paramNames.push(paramName); + return "([^/]+)"; + }); + const regex = new RegExp(`^${regexPattern}$`); + const match = pathname.match(regex); + if (!match) { + return; + } + const params = {}; + paramNames.forEach((name, index) => { + params[name] = match[index + 1]; + }); + return params; + } } diff --git a/src/services/rtc.js b/src/services/resources.js similarity index 68% rename from src/services/rtc.js rename to src/services/resources.js index 9662995..3b16883 100644 --- a/src/services/rtc.js +++ b/src/services/resources.js @@ -2,8 +2,14 @@ import mediasoup from "mediasoup"; import * as config from "#src/config.js"; import { Logger } from "#src/utils/utils.js"; +import { PortLimitReachedError } from "#src/utils/errors.js"; -const logger = new Logger("RTC"); +const port_span = config.dynamicPorts.max - config.dynamicPorts.min; +let port_offset = 0; +/** @type {Set} */ +const usedPorts = new Set(); + +const logger = new Logger("RESOURCES"); /** @type {Set} */ const workers = new Set(); @@ -13,6 +19,7 @@ export async function start() { for (let i = 0; i < config.NUM_WORKERS; ++i) { await makeWorker(); } + logger.info(`${port_span} dynamic ports available`); logger.info(`initialized ${workers.size} mediasoup workers`); logger.info( `transport(RTC) layer at ${config.PUBLIC_IP}:${config.RTC_MIN_PORT}-${config.RTC_MAX_PORT}` @@ -24,6 +31,8 @@ export function close() { worker.appData.webRtcServer.close(); worker.close(); } + port_offset = 0; + usedPorts.clear(); workers.clear(); } @@ -64,3 +73,33 @@ export async function getWorker() { logger.debug(`worker ${leastUsedWorker.pid} with ${lowestUsage} ru_maxrss was selected`); return leastUsedWorker; } + +/** + * @returns {number} + */ +function _getPort() { + port_offset++; + return config.dynamicPorts.min + (port_offset % port_span); +} +/** + * Returns a dynamic port that is not in use. + * @returns {number} + */ +export function getPort() { + let port = _getPort(); + if (usedPorts.size === port_span) { + throw new PortLimitReachedError(); + } + while (usedPorts.has(port)) { + port = _getPort(); + } + usedPorts.add(port); + return port; +} + +/** + * @param {number} port + */ +export function releasePort(port) { + usedPorts.delete(port); +} diff --git a/src/services/ws.js b/src/services/ws.js index eef232d..42652dd 100644 --- a/src/services/ws.js +++ b/src/services/ws.js @@ -106,7 +106,7 @@ async function connect(webSocket, { channelUUID, jwt }) { let channel = Channel.records.get(channelUUID); /** @type {{sfu_channel_uuid: string, session_id: number, ice_servers: Object[] }} */ const authResult = await verify(jwt, channel?.key); - const { sfu_channel_uuid, session_id, ice_servers } = authResult; + const { sfu_channel_uuid, session_id, ice_servers, user_name } = authResult; if (!channelUUID && sfu_channel_uuid) { // Cases where the channelUUID is not provided in the credentials for backwards compatibility with version 1.1 and earlier. channel = Channel.records.get(sfu_channel_uuid); @@ -122,9 +122,10 @@ async function connect(webSocket, { channelUUID, jwt }) { if (!session_id) { throw new AuthenticationError("Malformed JWT payload"); } - webSocket.send(); // client can start using ws after this message. + webSocket.send(JSON.stringify({ features: channel.features })); // client can start using ws after this message. const bus = new Bus(webSocket, { batchDelay: config.timeouts.busBatch }); const { session } = Channel.join(channel.uuid, session_id); + session.userName = user_name; session.once("close", ({ code }) => { let wsCloseCode = WS_CLOSE_CODE.CLEAN; switch (code) { diff --git a/src/shared/enums.js b/src/shared/enums.js index 031f4db..f108e0c 100644 --- a/src/shared/enums.js +++ b/src/shared/enums.js @@ -12,6 +12,12 @@ export const WS_CLOSE_CODE = { CHANNEL_FULL: 4109, }; +export const STREAM_TYPE = { + AUDIO: "audio", + CAMERA: "camera", + SCREEN: "screen", +}; + export const SERVER_REQUEST = { /** Requests the creation of a consumer that is used to forward a track to the client */ INIT_CONSUMER: "INIT_CONSUMER", diff --git a/src/utils/errors.js b/src/utils/errors.js index 5eb7855..62ee4f9 100644 --- a/src/utils/errors.js +++ b/src/utils/errors.js @@ -5,3 +5,7 @@ export class AuthenticationError extends Error { export class OvercrowdedError extends Error { name = "OvercrowdedError"; } + +export class PortLimitReachedError extends Error { + name = "PortLimitReachedError"; +} diff --git a/src/utils/ffmpeg.js b/src/utils/ffmpeg.js new file mode 100644 index 0000000..f7b42f6 --- /dev/null +++ b/src/utils/ffmpeg.js @@ -0,0 +1,168 @@ +import child_process from "node:child_process"; +import { EventEmitter } from "node:events"; + +import { Logger } from "#src/utils/utils.js"; +import { recording, LOG_LEVEL } from "#src/config.js"; + +const logger = new Logger("FFMPEG"); + +/** + * hard-coded ffmpeg sdp fragments for layouts with 1...4 videos + * TODO make the right resizing and vstack/hstack params + */ + +const drawText = (label, index) => `[${index}:v]drawtext=text='${label}':x=10:y=h-30[v${index}]`; + +const SCREEN_LAYOUT = { + 1: (labels) => `a=filter:complex ${drawText(labels[0], 0)},scale=1280:720[v0]; -map [v0]`, + 2: (labels) => + `a=filter:complex ${drawText(labels[0], 0)},scale=640:720[v0];${drawText( + labels[1], + 1 + )},scale=640:720[v1];[v0][v1]hstack=inputs=2[v]; -map [v]`, + 3: (labels) => + `a=filter:complex ${drawText(labels[0], 0)},scale=640:360[v0];${drawText( + labels[1], + 1 + )},scale=640:360[v1];[v0][v1]hstack=inputs=2[top];${drawText( + labels[2], + 2 + )},scale=1280:360[v2];[top][v2]vstack=inputs=2[v]; -map [v]`, + 4: (labels) => + `a=filter:complex ${drawText(labels[0], 0)},scale=640:360[v0];${drawText( + labels[1], + 1 + )},scale=640:360[v1];[v0][v1]hstack=inputs=2[top];${drawText( + labels[2], + 2 + )},scale=640:360[v2];${drawText( + labels[3], + 3 + )},scale=640:360[v3];[v2][v3]hstack=inputs=2[bottom];[top][bottom]vstack=inputs=2[v]; -map [v]`, +}; + +/** + * TODO + * @param {RtpData[]} audioRtps + * @param {RtpData[]} cameraRtps + * @param {RtpData[]} screenRtps + * @return {string} + */ +function formatSdp({ audioRtps, screenRtps, cameraRtps }) { + logger.info(`TODO: ${screenRtps}`); + const sdp = ["v=0", "o=- 0 0 IN IP4 127.0.0.1", "s=FFmpeg", "c=IN IP4 127.0.0.1", "t=0 0"]; + for (const audioRtp of audioRtps) { + sdp.push(`m=audio ${audioRtp.port} RTP/AVP ${audioRtp.payloadType}`); + sdp.push(`a=rtpmap:${audioRtp.payloadType} ${audioRtp.codec}/${audioRtp.clockRate}`); + sdp.push(`a=sendonly`); + } + sdp.push( + `-c:a ${recording.audioCodec} -b:a 128k -ac 2 -filter_complex amerge=inputs=${audioRtps.length}` + ); + if (cameraRtps.length > 0) { + const layout = SCREEN_LAYOUT[cameraRtps.length]; + if (!layout) { + throw new Error(`unsupported layout for ${cameraRtps.length} videos`); + } + for (const videoRtp of cameraRtps) { + sdp.push(`m=video ${videoRtp.port} RTP/AVP ${videoRtp.payloadType}`); + sdp.push(`a=rtpmap:${videoRtp.payloadType} ${videoRtp.codec}/${videoRtp.clockRate}`); + sdp.push(`a=sendonly`); + } + sdp.push(`-filter_complex`, layout(cameraRtps.map((rtp) => rtp.label))); + sdp.push(`-c:v ${recording.videoCodec}`); // TODO move outside of the condition, should also account for screenRtps + } + return sdp.join("\n"); +} + +/** + * Wraps the FFMPEG process + * TODO move in own file + */ +export class FFMPEG extends EventEmitter { + /** @type {child_process.ChildProcess} */ + _process; + /** @type {string} */ + _filePath; + + get _processArgs() { + const args = [ + // TODO + "-protocol_whitelist", + "pipe,udp,rtp", + "-fflags", + "+genpts", + "-f", + "sdp", + "-i", + "pipe:0", + "-vf", + "scale=1280:720", // 720p + "-r", + "30", // 30fps + "-f", + recording.fileType, + this._filePath, + ]; + if (LOG_LEVEL === "debug") { + args.unshift("-loglevel", "debug"); + } + return args; + } + + /** + * @param {string} filePath + */ + constructor(filePath) { + super(); + this._filePath = filePath; + } + + /** + * @param {Object} rtpInputs + * @param {RtpData[]} rtpInputs.audioRtps + * @param {RtpData[]} rtpInputs.screenRtps + * @param {RtpData[]} rtpInputs.cameraRtps + */ + async merge(rtpInputs) { + this._startProcess(this._processArgs, formatSdp(rtpInputs)); + } + + _startProcess(args, sdp) { + this._process = child_process.spawn("ffmpeg", args, { + stdio: ["pipe", "pipe", process.stderr], + }); + if (!this._process.stdin.writable) { + throw new Error("FFMPEG stdin not writable."); + } + this._process.stdin.write(sdp); + this._process.stdin.end(); + this._process.on("close", (code) => { + this.emit("complete", code); + }); + + logger.debug( + `FFMPEG process (pid:${this._process.pid}) spawned, outputting to ${this._filePath}` + ); + } + + concat(filePaths) { + this._startProcess( + this._processArgs, // TODO check if all args make sense or if need custom + `-f concat -safe 0 -i ${filePaths.join("\n")} -c copy` // SDP + ); + return new Promise((resolve, reject) => { + this.once("complete", (code) => { + if (code === 0) { + resolve(this._filePath); + } else { + reject(new Error(`FFMPEG exited with code ${code}`)); + } + }); + }); + } + + kill() { + this._process?.kill("SIGINT"); + } +} diff --git a/tests/http.test.js b/tests/http.test.js index bb70864..f4a58de 100644 --- a/tests/http.test.js +++ b/tests/http.test.js @@ -72,7 +72,7 @@ describe("HTTP", () => { expect(response.ok).toBe(true); const { uuid, url } = await response.json(); expect(Channel.records.get(uuid)).toBeDefined(); - expect(url).toBe(`http://${config.PUBLIC_ADDRESS}:${config.PORT}`); + expect(url).toBe(`http://${config.PUBLIC_IP}:${config.PORT}`); }); test("/noop", async () => { const response = await fetch(`http://${HTTP_INTERFACE}:${PORT}/v${API_VERSION}/noop`, { diff --git a/tests/models.test.js b/tests/models.test.js index ce8a20c..be41778 100644 --- a/tests/models.test.js +++ b/tests/models.test.js @@ -1,17 +1,17 @@ import { describe, beforeEach, afterEach, expect, jest } from "@jest/globals"; -import * as rtc from "#src/services/rtc.js"; +import * as resources from "#src/services/resources.js"; import { Channel } from "#src/models/channel.js"; import { timeouts, CHANNEL_SIZE } from "#src/config.js"; import { OvercrowdedError } from "#src/utils/errors.js"; describe("Models", () => { beforeEach(async () => { - await rtc.start(); + await resources.start(); }); afterEach(() => { Channel.closeAll(); - rtc.close(); + resources.close(); }); test("Create channel and session", async () => { const channel = await Channel.create("testRemote", "testIssuer"); diff --git a/tests/network.test.js b/tests/network.test.js index 1cc385b..6cdb8e2 100644 --- a/tests/network.test.js +++ b/tests/network.test.js @@ -36,6 +36,14 @@ describe("Full network", () => { const [thirdStateChange] = await once(user3.session, "stateChange"); expect(thirdStateChange).toBe(SESSION_STATE.CONNECTED); }); + test("Get features information after connecting", async () => { + const channelUUID = await network.getChannelUUID(); + const user1 = await network.connect(channelUUID, 1); + const [firstStateChange] = await once(user1.session, "stateChange"); + expect(firstStateChange).toBe(SESSION_STATE.CONNECTED); + expect(user1.sfuClient.features.recording).toBe(true); + expect(user1.sfuClient.features.webRtc).toBe(true); + }); test("The session of the server closes when the client is disconnected", async () => { const channelUUID = await network.getChannelUUID(); const user1 = await network.connect(channelUUID, 1); diff --git a/tests/rtc.test.js b/tests/rtc.test.js index 5926c78..96ebcd5 100644 --- a/tests/rtc.test.js +++ b/tests/rtc.test.js @@ -1,19 +1,19 @@ import { afterEach, beforeEach, describe, expect } from "@jest/globals"; -import * as rtc from "#src/services/rtc.js"; +import * as resources from "#src/services/resources.js"; import * as config from "#src/config.js"; describe("rtc service", () => { beforeEach(async () => { - await rtc.start(); + await resources.start(); }); afterEach(() => { - rtc.close(); + resources.close(); }); test("worker load should be evenly distributed", async () => { const usedWorkers = new Set(); for (let i = 0; i < config.NUM_WORKERS; ++i) { - const worker = await rtc.getWorker(); + const worker = await resources.getWorker(); const router = await worker.createRouter({}); const webRtcServer = await worker.createWebRtcServer(config.rtc.rtcServerOptions); const promises = []; diff --git a/tests/utils/network.js b/tests/utils/network.js index d9ff48d..622a0d1 100644 --- a/tests/utils/network.js +++ b/tests/utils/network.js @@ -6,7 +6,7 @@ import * as fakeParameters from "mediasoup-client/lib/test/fakeParameters"; import * as auth from "#src/services/auth.js"; import * as http from "#src/services/http.js"; -import * as rtc from "#src/services/rtc.js"; +import * as resources from "#src/services/resources.js"; import { SfuClient, SFU_CLIENT_STATE } from "#src/client.js"; import { Channel } from "#src/models/channel.js"; @@ -39,7 +39,7 @@ export class LocalNetwork { async start(hostname, port) { this.hostname = hostname; this.port = port; - await rtc.start(); + await resources.start(); await http.start({ hostname, port }); await auth.start(HMAC_B64_KEY); } @@ -122,6 +122,6 @@ export class LocalNetwork { Channel.closeAll(); auth.close(); http.close(); - rtc.close(); + resources.close(); } }