Skip to content

Commit

Permalink
[IMP] wip
Browse files Browse the repository at this point in the history
  • Loading branch information
ThanhDodeurOdoo committed Nov 27, 2024
1 parent 70bccf8 commit e06d943
Show file tree
Hide file tree
Showing 14 changed files with 339 additions and 102 deletions.
29 changes: 19 additions & 10 deletions src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -175,16 +175,7 @@ export const LOG_COLOR = process.env.LOG_COLOR
*
* @type {boolean}
*/
export const RECORDING = Boolean(process.env.RECORDING ?? true);
/**
* The file type of the recording output, this must be a fragmentable type.
* If not set, defaults to mp4
*
* e.g: RECORDING_FILE_TYPE=mp4
*
* @type {string}
*/
export const RECORDING_FILE_TYPE = process.env.RECORDING_FILE_TYPE || "mp4";
export const RECORDING = !FALSY_INPUT.has(process.env.RECORDING);

// ------------------------------------------------------------
// -------------------- SETTINGS --------------------------
Expand Down Expand Up @@ -219,6 +210,19 @@ const baseProducerOptions = {
zeroRtpOnPause: true,
};

export const recording = Object.freeze({
directory: os.tmpdir() + "/recordings",
enabled: RECORDING,
fileTTL: 1000 * 60 * 60 * 24, // 24 hours
fileType: "mp4",
videoLimit: 4, // how many videos can be merged into one recording
});

export const dynamicPorts = Object.freeze({
min: 50000,
max: 59999,
});

export const rtc = Object.freeze({
// https://mediasoup.org/documentation/v3/mediasoup/api/#WorkerSettings
workerSettings: {
Expand Down Expand Up @@ -247,6 +251,11 @@ export const rtc = Object.freeze({
},
],
},
plainTransportOptions: {
listenIp: { ip: "0.0.0.0", announcedIp: PUBLIC_IP },
rtcpMux: true,
comedia: false,
},
// https://mediasoup.org/documentation/v3/mediasoup/api/#WebRtcTransportOptions
rtcTransportOptions: {
maxSctpMessageSize: MAX_BUF_IN,
Expand Down
14 changes: 8 additions & 6 deletions src/models/channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import * as config from "#src/config.js";
import { getAllowedCodecs, Logger } from "#src/utils/utils.js";
import { AuthenticationError, OvercrowdedError } from "#src/utils/errors.js";
import { Session, SESSION_CLOSE_CODE } from "#src/models/session.js";
import { getWorker } from "#src/services/rtc.js";
import { getWorker } from "#src/services/resources.js";
import { Recorder } from "#src/models/recorder.js";

const logger = new Logger("CHANNEL");
Expand Down Expand Up @@ -65,15 +65,16 @@ export class Channel extends EventEmitter {
* @param {boolean} [options.useWebRtc=true] whether to use WebRTC:
* with webRTC: can stream audio/video
* without webRTC: can only use websocket
* @param {string} [options.uploadRoute] the route to which the recording will be uploaded
*/
static async create(remoteAddress, issuer, { key, useWebRtc = true } = {}) {
static async create(remoteAddress, issuer, { key, useWebRtc = true, uploadRoute } = {}) {
const safeIssuer = `${remoteAddress}::${issuer}`;
const oldChannel = Channel.recordsByIssuer.get(safeIssuer);
if (oldChannel) {
logger.verbose(`reusing channel ${oldChannel.uuid}`);
return oldChannel;
}
const options = { key };
const options = { key, uploadRoute };
if (useWebRtc) {
options.worker = await getWorker();
options.router = await options.worker.createRouter({
Expand Down Expand Up @@ -128,8 +129,9 @@ export class Channel extends EventEmitter {
* @param {string} [options.key]
* @param {import("mediasoup").types.Worker} [options.worker]
* @param {import("mediasoup").types.Router} [options.router]
* @param {string} [options.uploadRoute] the route to which the recording will be uploaded
*/
constructor(remoteAddress, { key, worker, router } = {}) {
constructor(remoteAddress, { key, worker, router, uploadRoute } = {}) {
super();
const now = new Date();
this.createDate = now.toISOString();
Expand All @@ -139,8 +141,8 @@ export class Channel extends EventEmitter {
this.name = `${remoteAddress}*${this.uuid.slice(-5)}`;
this.router = router;
this._worker = worker;
if (config.RECORDING) {
this.recorder = new Recorder(this);
if (config.recording.enabled) {
this.recorder = new Recorder(this, uploadRoute);
}
this._onSessionClose = this._onSessionClose.bind(this);
}
Expand Down
180 changes: 117 additions & 63 deletions src/models/recorder.js
Original file line number Diff line number Diff line change
@@ -1,38 +1,62 @@
import child_process from "node:child_process";
import os from "node:os";
import path from "node:path";
import fs from "node:fs";

import { EventEmitter } from "node:events"; // TODO remove if unnecessary
import { Logger } from "#src/utils/utils.js";

import { Logger, formatFfmpegSdp } from "#src/utils/utils.js";
import { STREAM_TYPE } from "#src/shared/enums.js";
import { RECORDING_FILE_TYPE } from "#src/config.js";
import { LOG_LEVEL, recording } from "#src/config.js";
import * as config from "#src/config.js";
import * as https from "node:https";
import http from "node:http";

const logger = new Logger("RECORDER");
const temp = os.tmpdir();
const VIDEO_LIMIT = 4;

/**
* @typedef {Object} RTPTransports
* @property {Array<import("mediasoup").types.Transport>} audio
* @property {Array<import("mediasoup").types.Transport>} camera
* @property {Array<import("mediasoup").types.Transport>} screen
*/

fs.mkdir(recording.directory, { recursive: true }, (err) => {
if (err) {
logger.error(err);
}
});
export function clearDirectory() {
const now = Date.now();
fs.readdir(recording.directory, (err, files) => {
if (err) {
logger.error(err);
return;
}
for (const file of files) {
const stats = fs.statSync(path.join(recording.directory, file));
if (stats.mtimeMs < now - config.recording.fileTTL) {
fs.unlink(path.join(recording.directory, file), (err) => {
if (err) {
logger.error(err);
}
logger.info(`Deleted recording ${file}`);
});
}
fs.unlink(path.join(recording.directory, file), (err) => {
if (err) {
logger.error(err);
}
});
}
});
}
/**
* Wraps the FFMPEG process
* TODO move in own file
*/
class FFMPEG extends EventEmitter {
/** @type {string} */
filename;
/** @type {child_process.ChildProcess} */
_process;
/** @type {string} */
_filePath;

get _args() {
return [
"-loglevel",
"debug", // TODO warning in prod
const args = [
// TODO
"-protocol_whitelist",
"pipe,udp,rtp",
"-fflags",
Expand All @@ -48,9 +72,13 @@ class FFMPEG extends EventEmitter {
"-c:a",
"aac", // audio codec
"-f",
RECORDING_FILE_TYPE,
recording.fileType,
this._filePath,
];
if (LOG_LEVEL === "debug") {
args.unshift("-loglevel", "debug");
}
return args;
}

/**
Expand All @@ -59,6 +87,7 @@ class FFMPEG extends EventEmitter {
constructor(filePath) {
super();
this._filePath = filePath;
this.this.filename = `${Date.now()}.${recording.fileType}`;
}

/**
Expand All @@ -72,7 +101,7 @@ class FFMPEG extends EventEmitter {
if (!this._process.stdin.writable) {
throw new Error("FFMPEG stdin not writable.");
}
this._process.stdin.write(sdp);
this._process.stdin.write(sdp); // TODO (maybe pass args earlier)
this._process.stdin.end();

this._process.stdout.on("data", (chunk) => {
Expand All @@ -97,27 +126,39 @@ class FFMPEG extends EventEmitter {
}

export class Recorder extends EventEmitter {
static records = new Map();

/** @type {Map<string, string>} */
static generatedFiles = new Map();
/** @type {string} */
uuid = crypto.randomUUID();
uuid;
/** @type {import("#src/models/channel").Channel} */
channel;
/** @type {string} */
state;
/** @type {FFMPEG} */
ffmpeg;
/** @type {RTPTransports} */
_rtpTransports;
/** @type {string} */
filePath;
/**
* @param {string} uuid
* @param {http.ServerResponse} res
*/
static pipeToResponse(uuid, res) {
// TODO check if this can be executed, otherwise end request, or throw error (http service will throw anyways)
const fileStream = fs.createReadStream(Recorder.generatedFiles.get(uuid)); // may need to be explicitly closed?
res.writeHead(200, {
"Content-Type": `video/${recording.fileType}`,
"Content-Disposition": "inline",
});
fileStream.pipe(res); // Pipe the file stream to the response
}
/**
* @param {import("#src/models/channel").Channel} channel
* @param {string} destination url to send the file to
*/
constructor(channel) {
constructor(channel, destination) {
super();
this.channel = channel;
this.filePath = path.join(temp, `${this.uuid}.${RECORDING_FILE_TYPE}`);
Recorder.records.set(this.uuid, this);
this._destination = destination;
}

/** @returns {number} */
Expand All @@ -129,60 +170,73 @@ export class Recorder extends EventEmitter {
* @param {Array} ids
* @returns {string} filePath
*/
start(ids) {
// maybe internal state and check if already recording (recording = has ffmpeg child process).
this.stop();
async start(ids) {
if (this.ffmpeg) {
return this.filePath;
}
this.uuid = crypto.randomUUID();
const audioRtps = [];
const videoRtps = [];
for (const id of ids) {
const session = this.channel.sessions.get(id);
const audioRtp = this._createRtp(
session.producers[STREAM_TYPE.AUDIO],
STREAM_TYPE.AUDIO
);
// TODO maybe some logic for priority on session id or stream type
audioRtp && this._rtpTransports.audio.push(audioRtp);
const audioRtpData = session.getRtp(STREAM_TYPE.AUDIO);
audioRtpData && audioRtps.push(audioRtpData);
for (const type in [STREAM_TYPE.CAMERA, STREAM_TYPE.SCREEN]) {
if (this.videoCount < VIDEO_LIMIT) {
const rtp = this._createRtp(session.producers[type], type);
rtp && this._rtpTransports[type].push(rtp);
if (videoRtps.length < recording.videoLimit) {
const videoRtpData = session.getRtp(type);
videoRtpData && videoRtps.push(videoRtpData);
}
}
}
this.filePath = path.join(recording.directory, `${this.uuid}.${recording.fileType}`);
this.ffmpeg = new FFMPEG(this.filePath);
this.ffmpeg.spawn(); // args should be base on the rtp transports
await this.ffmpeg.spawn(formatFfmpegSdp(audioRtps, videoRtps)); // args should be base on the rtp transports
Recorder.generatedFiles.set(this.uuid, this.filePath);
this.ffmpeg.once("success", () => {
this.emit("download-ready", this.filePath);
});
return this.filePath;
}
pause() {
// TODO maybe shouldn't be able to pause
update(ids) {
// TODO see if ffmpeg input can be re-configured at runtime, otherwise no support or full restart
return this.filePath;
}
stop() {
// TODO
// cleanup all rtp transports
// stop ffmpeg process
Recorder.records.delete(this.uuid);
this.ffmpeg?.kill();
this.uuid = undefined;
this.ffmpeg = undefined;
}
upload() {
this.stop();
if (!this._destination) {
logger.warn(`No upload destination set for ${this.uuid}`);
return;
}
const fileStream = fs.createReadStream(this.filePath);
const { hostname, pathname, protocol } = new URL(this._destination);
const options = {
hostname,
path: pathname,
method: "POST",
headers: {
"Content-Type": "application/octet-stream",
"Content-Length": fs.statSync(this.filePath).size,
},
};

const request = (protocol === "https:" ? https : http).request(options, (res) => {
if (res.statusCode === 200) {
logger.info(`File uploaded to ${this._destination}`);
// TODO delete file
} else {
logger.error(`Failed to upload file: ${res.statusCode}`);
}
});

/**
* @param {http.ServerResponse} res
*/
pipeToResponse(res) {
// TODO check if this can be executed, otherwise end request, or throw error
const fileStream = fs.createReadStream(this._filePath); // may need to be explicitly closed?
res.writeHead(200, {
"Content-Type": `video/${RECORDING_FILE_TYPE}`,
"Content-Disposition": "inline",
request.on("error", (error) => {
logger.error(`Failed to upload file: ${error.message}`);
});
fileStream.pipe(res); // Pipe the file stream to the response
}

/**
* @param {import("mediasoup").types.Producer} producer
* @param {STREAM_TYPE[keyof STREAM_TYPE]} type
* @return {Promise<void>} probably just create transport with right ports and return that,
*/
async _createRtp(producer, type) {
// TODO
fileStream.pipe(request);
}
}
Loading

0 comments on commit e06d943

Please sign in to comment.