Skip to content

Commit

Permalink
rebroadcast: refactor codec and resolution detection
Browse files Browse the repository at this point in the history
  • Loading branch information
koush committed Jun 24, 2024
1 parent 5dfa088 commit a081e6e
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 147 deletions.
45 changes: 6 additions & 39 deletions plugins/prebuffer-mixin/src/ffmpeg-rebroadcast.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,9 @@ const { mediaManager } = sdk;

export interface ParserSession<T extends string> {
parserSpecific?: any;
sdp: Promise<Buffer[]>;
sdp: Promise<string>;
resetActivityTimer?: () => void,
negotiateMediaStream(requestMediaStream: RequestMediaStreamOptions): ResponseMediaStreamOptions;
inputAudioCodec?: string;
inputVideoCodec?: string;
inputVideoResolution?: {
width: number,
height: number,
},
negotiateMediaStream(requestMediaStream: RequestMediaStreamOptions, inputVideoCodec: string, inputAudioCodec: string): ResponseMediaStreamOptions;
start(): void;
kill(error?: Error): void;
killed: Promise<void>;
Expand Down Expand Up @@ -118,10 +112,6 @@ export async function startParserSession<T extends string>(ffmpegInput: FFmpegIn
// need this to prevent kill from throwing due to uncaught Error during cleanup
events.on('error', e => console.error('rebroadcast error', e));

let inputAudioCodec: string;
let inputVideoCodec: string;
let inputVideoResolution: string[];

let sessionKilled: any;
const killed = new Promise<void>(resolve => {
sessionKilled = resolve;
Expand Down Expand Up @@ -171,7 +161,7 @@ export async function startParserSession<T extends string>(ffmpegInput: FFmpegIn
try {
ensureActive(() => socket.destroy());

for await (const chunk of parser.parse(socket, parseInt(inputVideoResolution?.[2]), parseInt(inputVideoResolution?.[3]))) {
for await (const chunk of parser.parse(socket, undefined, undefined)) {
events.emit(container, chunk);
resetActivityTimer();
}
Expand Down Expand Up @@ -218,7 +208,7 @@ export async function startParserSession<T extends string>(ffmpegInput: FFmpegIn
try {
const { resetActivityTimer } = setupActivityTimer(container, kill, events, options?.timeout);

for await (const chunk of parser.parse(pipe as any, parseInt(inputVideoResolution?.[2]), parseInt(inputVideoResolution?.[3]))) {
for await (const chunk of parser.parse(pipe as any, undefined, undefined)) {
await deferredStart.promise;
events.emit(container, chunk);
resetActivityTimer();
Expand All @@ -232,43 +222,20 @@ export async function startParserSession<T extends string>(ffmpegInput: FFmpegIn
};

const rtsp = (options.parsers as any).rtsp as ReturnType<typeof createRtspParser>;
rtsp.sdp.then(sdp => {
const parsed = parseSdp(sdp);
const audio = parsed.msections.find(msection => msection.type === 'audio');
const video = parsed.msections.find(msection => msection.type === 'video');
inputVideoCodec = video?.codec;
inputAudioCodec = audio?.codec;
});

const sdp = new Deferred<Buffer[]>();
rtsp.sdp.then(r => sdp.resolve([Buffer.from(r)]));
killed.then(() => sdp.reject(new Error("ffmpeg killed before sdp could be parsed")));

start();

return {
start() {
deferredStart.resolve();
},
sdp: sdp.promise,
get inputAudioCodec() {
return inputAudioCodec;
},
get inputVideoCodec() {
return inputVideoCodec;
},
get inputVideoResolution() {
return {
width: parseInt(inputVideoResolution?.[2]),
height: parseInt(inputVideoResolution?.[3]),
}
},
sdp: rtsp.sdp,
get isActive() { return isActive },
kill(error?: Error) {
kill(error);
},
killed,
negotiateMediaStream: () => {
negotiateMediaStream: (requestMediaStream: RequestMediaStreamOptions, inputVideoCodec, inputAudioCodec) => {
const ret: ResponseMediaStreamOptions = cloneDeep(ffmpegInput.mediaStreamOptions) || {
id: undefined,
name: undefined,
Expand Down
133 changes: 92 additions & 41 deletions plugins/prebuffer-mixin/src/main.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import { AutoenableMixinProvider } from '@scrypted/common/src/autoenable-mixin-provider';
import { getDebugModeH264EncoderArgs, getH264EncoderArgs } from '@scrypted/common/src/ffmpeg-hardware-acceleration';
import { parse as spsParse } from "h264-sps-parser";

import { addVideoFilterArguments } from '@scrypted/common/src/ffmpeg-helpers';
import { ListenZeroSingleClientTimeoutError, closeQuiet, listenZeroSingleClient } from '@scrypted/common/src/listen-cluster';
import { readLength } from '@scrypted/common/src/read-stream';
import { H264_NAL_TYPE_FU_B, H264_NAL_TYPE_IDR, H264_NAL_TYPE_MTAP16, H264_NAL_TYPE_MTAP32, H264_NAL_TYPE_RESERVED0, H264_NAL_TYPE_RESERVED30, H264_NAL_TYPE_RESERVED31, H264_NAL_TYPE_SEI, H264_NAL_TYPE_STAP_B, RtspServer, RtspTrack, createRtspParser, findH264NaluType, getNaluTypes, listenSingleRtspClient } from '@scrypted/common/src/rtsp-server';
import { addTrackControls, parseSdp } from '@scrypted/common/src/sdp-utils';
import { H264_NAL_TYPE_FU_B, H264_NAL_TYPE_IDR, H264_NAL_TYPE_MTAP16, H264_NAL_TYPE_MTAP32, H264_NAL_TYPE_RESERVED0, H264_NAL_TYPE_RESERVED30, H264_NAL_TYPE_RESERVED31, H264_NAL_TYPE_SEI, H264_NAL_TYPE_SPS, H264_NAL_TYPE_STAP_B, RtspServer, RtspTrack, createRtspParser, findH264NaluType, getNaluTypes, listenSingleRtspClient } from '@scrypted/common/src/rtsp-server';
import { addTrackControls, getSpsPps, parseSdp } from '@scrypted/common/src/sdp-utils';
import { SettingsMixinDeviceBase, SettingsMixinDeviceOptions } from "@scrypted/common/src/settings-mixin";
import { sleep } from '@scrypted/common/src/sleep';
import { StreamChunk, StreamParser } from '@scrypted/common/src/stream-parser';
Expand All @@ -25,6 +27,7 @@ import { connectRFC4571Parser, startRFC4571Parser } from './rfc4571';
import { RtspSessionParserSpecific, startRtspSession } from './rtsp-session';
import { createStreamSettings } from './stream-settings';
import { TRANSCODE_MIXIN_PROVIDER_NATIVE_ID, TranscodeMixinProvider, getTranscodeMixinProviderId } from './transcode-settings';
import { getSpsResolution } from './sps-resolution';

const { mediaManager, log, systemManager, deviceManager } = sdk;

Expand Down Expand Up @@ -235,6 +238,51 @@ class PrebufferSession {
}
}

async parseCodecs(skipResolution?: boolean) {
const sdp = await this.parserSession.sdp;
const parsedSdp = parseSdp(sdp);
const videoSection = parsedSdp.msections.find(msection => msection.type === 'video');
const audioSection = parsedSdp.msections.find(msection => msection.type === 'audio');

const inputAudioCodec = audioSection?.codec;
const inputVideoCodec = videoSection.codec;
let inputVideoResolution: ReturnType<typeof getSpsResolution>;

if (!skipResolution) {
// scan the prebuffer for sps
for (const chunk of this.rtspPrebuffer) {
try {
const sps = findH264NaluType(chunk, H264_NAL_TYPE_SPS);
if (sps) {
const parsedSps = spsParse(sps);
inputVideoResolution = getSpsResolution(parsedSps);
}
}
catch (e) {
}
}

if (!inputVideoResolution) {
const spspps = getSpsPps(videoSection);
const { sps } = spspps;
if (sps) {
try {
const parsedSps = spsParse(sps);
inputVideoResolution = getSpsResolution(parsedSps);
}
catch (e) {
}
}
}
}

return {
inputVideoCodec,
inputAudioCodec,
inputVideoResolution,
}
}

async getMixinSettings(): Promise<Setting[]> {
const settings: Setting[] = [];

Expand Down Expand Up @@ -338,8 +386,9 @@ class PrebufferSession {
};

if (session) {
const resolution = session.inputVideoResolution?.width && session.inputVideoResolution?.height
? `${session.inputVideoResolution?.width}x${session.inputVideoResolution?.height}`
const codecInfo = await this.parseCodecs();
const resolution = codecInfo.inputVideoResolution?.width && codecInfo.inputVideoResolution?.height
? `${codecInfo.inputVideoResolution?.width}x${codecInfo.inputVideoResolution?.height}`
: 'unknown';

const idrInterval = this.getDetectedIdrInterval();
Expand All @@ -359,7 +408,7 @@ class PrebufferSession {
subgroup,
title: 'Detected Video/Audio Codecs',
readonly: true,
value: (session?.inputVideoCodec?.toString() || 'unknown') + '/' + (session?.inputAudioCodec?.toString() || 'unknown'),
value: (codecInfo?.inputVideoCodec?.toString() || 'unknown') + '/' + (codecInfo?.inputAudioCodec?.toString() || 'unknown'),
description: 'Configuring your camera to H264 video, and audio to Opus or PCM-mulaw (G.711ulaw) is recommended.'
},
{
Expand Down Expand Up @@ -470,7 +519,7 @@ class PrebufferSession {
session = startRFC4571Parser(this.console, connectRFC4571Parser(url), sdp, mediaStreamOptions, {
timeout: 10000,
});
this.sdp = session.sdp.then(buffers => Buffer.concat(buffers).toString());
this.sdp = session.sdp;
}
else {
const moBuffer = await mediaManager.convertMediaObjectToBuffer(mo, ScryptedMimeTypes.FFmpegInput);
Expand Down Expand Up @@ -501,7 +550,7 @@ class PrebufferSession {
audioSoftMuted,
rtspRequestTimeout: 10000,
});
this.sdp = session.sdp.then(buffers => Buffer.concat(buffers).toString());
this.sdp = session.sdp;
}
else {
let acodec: string[];
Expand Down Expand Up @@ -615,38 +664,39 @@ class PrebufferSession {
}

await session.sdp;
this.parserSession = session;
session.killed.finally(() => {
if (this.parserSession === session)
this.parserSession = undefined;
});
session.killed.finally(() => clearTimeout(this.inactivityTimeout));

const codecInfo = await this.parseCodecs();

// complain to the user about the codec if necessary. upstream may send a audio
// stream but report none exists (to request muting).
if (!audioSoftMuted && advertisedAudioCodec && session.inputAudioCodec !== undefined
&& session.inputAudioCodec !== advertisedAudioCodec) {
if (!audioSoftMuted && advertisedAudioCodec && codecInfo.inputAudioCodec !== undefined
&& codecInfo.inputAudioCodec !== advertisedAudioCodec) {
this.console.warn('Audio codec plugin reported vs detected mismatch', advertisedAudioCodec, detectedAudioCodec);
}

const advertisedVideoCodec = mso?.video?.codec;
if (advertisedVideoCodec && session.inputVideoCodec !== undefined
&& session.inputVideoCodec !== advertisedVideoCodec) {
this.console.warn('Video codec plugin reported vs detected mismatch', advertisedVideoCodec, session.inputVideoCodec);
if (advertisedVideoCodec && codecInfo.inputVideoCodec !== undefined
&& codecInfo.inputVideoCodec !== advertisedVideoCodec) {
this.console.warn('Video codec plugin reported vs detected mismatch', advertisedVideoCodec, codecInfo.inputVideoCodec);
}

if (!session.inputAudioCodec) {
if (!codecInfo.inputAudioCodec) {
this.console.log('No audio stream detected.');
}

// set/update the detected codec, set it to null if no audio was found.
this.storage.setItem(this.lastDetectedAudioCodecKey, session.inputAudioCodec || 'null');
this.storage.setItem(this.lastDetectedAudioCodecKey, codecInfo.inputAudioCodec || 'null');

if (session.inputVideoCodec !== 'h264') {
if (codecInfo.inputVideoCodec !== 'h264') {
this.console.error(`Video codec is not h264. If there are errors, try changing your camera's encoder output.`);
}

this.parserSession = session;
session.killed.finally(() => {
if (this.parserSession === session)
this.parserSession = undefined;
});
session.killed.finally(() => clearTimeout(this.inactivityTimeout));

// settings ui refresh
deviceManager.onMixinEvent(this.mixin.id, this.mixin, ScryptedInterface.Settings, undefined);

Expand Down Expand Up @@ -676,26 +726,26 @@ class PrebufferSession {
session.killed.finally(() => clearTimeout(refreshTimeout));
}

let shifts = 0;
let prebufferContainer: PrebufferStreamChunk[] = this.rtspPrebuffer;
let shifts = 0;
let prebufferContainer: PrebufferStreamChunk[] = this.rtspPrebuffer;

session.on('rtsp', (chunk: PrebufferStreamChunk) => {
const now = Date.now();
session.on('rtsp', (chunk: PrebufferStreamChunk) => {
const now = Date.now();

chunk.time = now;
prebufferContainer.push(chunk);
chunk.time = now;
prebufferContainer.push(chunk);

while (prebufferContainer.length && prebufferContainer[0].time < now - prebufferDurationMs) {
prebufferContainer.shift();
shifts++;
}
while (prebufferContainer.length && prebufferContainer[0].time < now - prebufferDurationMs) {
prebufferContainer.shift();
shifts++;
}

if (shifts > 100000) {
prebufferContainer = prebufferContainer.slice();
this.rtspPrebuffer = prebufferContainer;
shifts = 0;
}
});
if (shifts > 100000) {
prebufferContainer = prebufferContainer.slice();
this.rtspPrebuffer = prebufferContainer;
shifts = 0;
}
});

session.start();
return session;
Expand Down Expand Up @@ -913,7 +963,8 @@ class PrebufferSession {
requestedPrebuffer = Math.min(defaultPrebuffer, this.getDetectedIdrInterval() || defaultPrebuffer);;
}

const mediaStreamOptions: ResponseMediaStreamOptions = session.negotiateMediaStream(options);
const codecInfo = await this.parseCodecs(true);
const mediaStreamOptions: ResponseMediaStreamOptions = session.negotiateMediaStream(options, codecInfo.inputVideoCodec, codecInfo.inputAudioCodec);
let sdp = await this.sdp;
if (!mediaStreamOptions.video?.h264Info && this.usingScryptedParser) {
mediaStreamOptions.video ||= {};
Expand Down Expand Up @@ -1039,10 +1090,10 @@ class PrebufferSession {
mediaStreamOptions.audio.sampleRate ||= audioSection.rtpmap.clock;
}

if (session.inputVideoResolution?.width && session.inputVideoResolution?.height) {
if (codecInfo.inputVideoResolution?.width && codecInfo.inputVideoResolution?.height) {
// this may be an audio only request.
if (mediaStreamOptions.video)
Object.assign(mediaStreamOptions.video, session.inputVideoResolution);
Object.assign(mediaStreamOptions.video, codecInfo.inputVideoResolution);
}

const now = Date.now();
Expand Down
9 changes: 2 additions & 7 deletions plugins/prebuffer-mixin/src/rfc4571.ts
Original file line number Diff line number Diff line change
Expand Up @@ -191,19 +191,14 @@ export function startRFC4571Parser(console: Console, socket: Readable, sdp: stri

return {
start,
sdp: Promise.resolve([Buffer.from(sdp)]),
inputAudioCodec,
inputVideoCodec,
get inputVideoResolution() {
return inputVideoResolution;
},
sdp: Promise.resolve(sdp),
get isActive() { return isActive },
kill(error?: Error) {
kill(error);
},
killed,
resetActivityTimer,
negotiateMediaStream: (requestMediaStream) => {
negotiateMediaStream: (requestMediaStream,inputVideoCodec, inputAudioCodec) => {
return negotiateMediaStream(sdp, mediaStreamOptions, inputVideoCodec, inputAudioCodec, requestMediaStream);
},
emit(container: 'rtsp', chunk: StreamChunk) {
Expand Down
Loading

0 comments on commit a081e6e

Please sign in to comment.