Skip to content

Commit

Permalink
[IMP] add recording feature
Browse files Browse the repository at this point in the history
  • Loading branch information
ThanhDodeurOdoo committed Nov 19, 2024
1 parent 2b5a531 commit 6ad0f49
Show file tree
Hide file tree
Showing 4 changed files with 243 additions and 1 deletion.
4 changes: 4 additions & 0 deletions src/models/channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { getAllowedCodecs, Logger } from "#src/utils/utils.js";
import { AuthenticationError, OvercrowdedError } from "#src/utils/errors.js";
import { Session, SESSION_CLOSE_CODE } from "#src/models/session.js";
import { getWorker } from "#src/services/rtc.js";
import { Recorder } from "#src/models/recorder.js";

const logger = new Logger("CHANNEL");

Expand Down Expand Up @@ -39,6 +40,7 @@ export class Channel extends EventEmitter {
name;
/** @type {WithImplicitCoercion<string>} base 64 buffer key */
key;
recorder;
/** @type {import("mediasoup").types.Router}*/
router;
/** @type {Map<number, Session>} */
Expand Down Expand Up @@ -130,6 +132,7 @@ export class Channel extends EventEmitter {
this.name = `${remoteAddress}*${this.uuid.slice(-5)}`;
this.router = router;
this._worker = worker;
this.recorder = new Recorder(this);
this._onSessionClose = this._onSessionClose.bind(this);
}

Expand Down Expand Up @@ -249,6 +252,7 @@ export class Channel extends EventEmitter {
}
clearTimeout(this._closeTimeout);
this.sessions.clear();
this.recorder.stop();
Channel.records.delete(this.uuid);
/**
* @event Channel#close
Expand Down
187 changes: 187 additions & 0 deletions src/models/recorder.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
import child_process from "node:child_process";
import os from "node:os";
import path from "node:path";
import fs from "node:fs";

import { EventEmitter } from "node:events"; // TODO remove if unnecessary
import { Logger } from "#src/utils/utils.js";
import { STREAM_TYPE } from "#src/shared/enums.js";

const logger = new Logger("RECORDER");
const temp = os.tmpdir();
const fileType = "mp4"; // TODO config
const VIDEO_LIMIT = 4; // TODO config (and other name?)

/**
* @typedef {Object} RTPTransports
* @property {Array<import("mediasoup").types.Transport>} audio
* @property {Array<import("mediasoup").types.Transport>} camera
* @property {Array<import("mediasoup").types.Transport>} screen
*/

/**
* Wraps the FFMPEG process
* TODO move in own file
*/
class FFMPEG extends EventEmitter {
/** @type {child_process.ChildProcess} */
_process;
/** @type {string} */
_filePath;

get _args() {
return [
"-loglevel",
"debug", // TODO warning in prod
"-protocol_whitelist",
"pipe,udp,rtp",
"-fflags",
"+genpts",
"-f",
"sdp",
"-i",
"pipe:0",
"-movflags",
"frag_keyframe+empty_moov+default_base_moof", // fragmented
"-c:v",
"libx264", // vid codec
"-c:a",
"aac", // audio codec
"-f",
fileType,
this._filePath,
];
}

/**
* @param {string} filePath
*/
constructor(filePath) {
super();
this._filePath = filePath;
}

/**
* @param {String[]} [sdp]
*/
async spawn(sdp) {
this._process = child_process.spawn("ffmpeg", this._args, {
stdio: ["pipe", "pipe", process.stderr],
});

if (!this._process.stdin.writable) {
throw new Error("FFMPEG stdin not writable.");
}
this._process.stdin.write(sdp);
this._process.stdin.end();

this._process.stdout.on("data", (chunk) => {
this.emit("data", chunk); // Emit data chunks as they become available
// may need to ues this to pipe to request if file stream does not work
});

this._process.on("close", (code) => {
if (code === 0) {
this.emit("success");
}
});

logger.debug(
`FFMPEG process (pid:${this._process.pid}) spawned, outputting to ${this._filePath}`
);
}

kill() {
this._process?.kill("SIGINT");
}
}

export class Recorder extends EventEmitter {
static records = new Map();

/** @type {string} */
uuid = crypto.randomUUID();
/** @type {import("#src/models/channel").Channel} */
channel;
/** @type {string} */
state;
ffmpeg;
/** @type {RTPTransports} */
_rtpTransports;
/** @type {string} */
filePath;
/**
* @param {import("#src/models/channel").Channel} channel
*/
constructor(channel) {
super();
this.channel = channel;
this.filePath = path.join(temp, `${this.uuid}.${fileType}`);
Recorder.records.set(this.uuid, this);
}

/** @returns {number} */
get videoCount() {
return this._rtpTransports.camera.length + this._rtpTransports.screen.length;
}

/**
* @param {Array} ids
* @returns {string} filePath
*/
start(ids) {
// maybe internal state and check if already recording (recording = has ffmpeg child process).
this.stop();
for (const id of ids) {
const session = this.channel.sessions.get(id);
const audioRtp = this._createRtp(
session.producers[STREAM_TYPE.AUDIO],
STREAM_TYPE.AUDIO
);
audioRtp && this._rtpTransports.audio.push(audioRtp);
for (const type in [STREAM_TYPE.CAMERA, STREAM_TYPE.SCREEN]) {
if (this.videoCount < VIDEO_LIMIT) {
const rtp = this._createRtp(session.producers[type], type);
rtp && this._rtpTransports[type].push(rtp);
}
}
}
this.ffmpeg = new FFMPEG(this.filePath);
this.ffmpeg.spawn(); // args should be base on the rtp transports
this.ffmpeg.once("success", () => {
this.emit("download-ready", this.filePath);
});
return this.filePath;
}
pause() {
// TODO maybe shouldn't be able to pause
}
stop() {
// TODO
// cleanup all rtp transports
// stop ffmpeg process
Recorder.records.delete(this.uuid);
}

/**
* @param {http.ServerResponse} res
*/
pipeToResponse(res) {
// TODO check if this can be executed, otherwise end request, or throw error
const fileStream = fs.createReadStream(this._filePath); // may need to be explicitly closed?
res.writeHead(200, {
"Content-Type": `video/${fileType}`,
"Content-Disposition": "inline",
});
fileStream.pipe(res); // Pipe the file stream to the response
}

/**
* @param {import("mediasoup").types.Producer} producer
* @param {STREAM_TYPE[keyof STREAM_TYPE]} type
* @return {Promise<void>} probably just create transport with right ports and return that,
*/
async _createRtp(producer, type) {
// TODO
}
}
47 changes: 46 additions & 1 deletion src/services/http.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import * as config from "#src/config.js";
import { Logger, parseBody, extractRequestInfo } from "#src/utils/utils.js";
import { SESSION_CLOSE_CODE } from "#src/models/session.js";
import { Channel } from "#src/models/channel.js";
import { Recorder } from "#src/models/recorder.js";

/**
* @typedef {function} routeCallback
Expand All @@ -15,6 +16,7 @@ import { Channel } from "#src/models/channel.js";
* @param {string} param2.remoteAddress
* @param {string} param2.protocol
* @param {string} param2.host
* @param {Object} param2.match name/value mapping of route variables
* @param {URLSearchParams} param2.searchParams
* @return {http.ServerResponse}
*/
Expand Down Expand Up @@ -77,6 +79,19 @@ export async function start({ httpInterface = config.HTTP_INTERFACE, port = conf
return res.end();
},
});
routeListener.get(`/v${API_VERSION}/recording/<token>`, {
callback: async (req, res, { remoteAddress, match }) => {
try {
const { token } = match;
logger.info(`[${remoteAddress}]: requested recording ${token}`);
Recorder.records.get(token)?.pipeToResponse(res);
// res not ended as we are streaming
} catch (error) {
logger.error(`[${remoteAddress}] failed to obtain recording: ${error.message}`);
return res.end();
}
},
});
routeListener.post(`/v${API_VERSION}/disconnect`, {
callback: async (req, res, { remoteAddress }) => {
try {
Expand Down Expand Up @@ -183,7 +198,8 @@ class RouteListener {
break;
}
for (const [pattern, options] of registeredRoutes) {
if (pathname === pattern) {
const match = this._extractPattern(pathname, pattern);
if (match) {
if (options?.cors) {
res.setHeader("Access-Control-Allow-Origin", options.cors);
res.setHeader("Access-Control-Allow-Methods", options.methods);
Expand All @@ -195,6 +211,7 @@ class RouteListener {
host,
protocol,
remoteAddress,
match,
searchParams,
});
} catch (error) {
Expand All @@ -212,4 +229,32 @@ class RouteListener {
}
return res.end();
}

/**
* Matches a pathname against a pattern with named parameters.
* @param {string} pathname - The URL path requested, e.g., "/channel/6/person/42/"
* @param {string} pattern - The pattern to match, e.g., "/channel/<channelId>/session/<sessionId>"
* @returns {object|undefined} - Returns undefined if no match. If matched, returns an object mapping keys to values,
* the object is empty if matching a pattern with no variables.
* eg: { channelId: "6", sessionId: "42" } | {} | undefined
*/
_extractPattern(pathname, pattern) {
pathname = pathname.replace(/\/+$/, "");
pattern = pattern.replace(/\/+$/, "");
const paramNames = [];
const regexPattern = pattern.replace(/<([^>]+)>/g, (_, paramName) => {
paramNames.push(paramName);
return "([^/]+)";
});
const regex = new RegExp(`^${regexPattern}$`);
const match = pathname.match(regex);
if (!match) {
return;
}
const params = {};
paramNames.forEach((name, index) => {
params[name] = match[index + 1];
});
return params;
}
}
6 changes: 6 additions & 0 deletions src/shared/enums.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ export const WS_CLOSE_CODE = {
CHANNEL_FULL: 4109,
};

export const STREAM_TYPE = {
AUDIO: "audio",
CAMERA: "camera",
SCREEN: "screen",
};

export const SERVER_REQUEST = {
/** Requests the creation of a consumer that is used to forward a track to the client */
INIT_CONSUMER: "INIT_CONSUMER",
Expand Down

0 comments on commit 6ad0f49

Please sign in to comment.