Skip to content

Commit

Permalink
rebroadcast: parser perf refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
koush committed May 15, 2024
1 parent bdf9278 commit f677cf7
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 72 deletions.
76 changes: 47 additions & 29 deletions common/src/rtsp-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,13 @@ export const H264_NAL_TYPE_MTAP32 = 27;
export function findH264NaluType(streamChunk: StreamChunk, naluType: number) {
if (streamChunk.type !== 'h264')
return;
return findH264NaluTypeInNalu(streamChunk.chunks[streamChunk.chunks.length - 1].subarray(12), naluType);
const { chunks } = streamChunk;
for (let i = 1; i < chunks.length; i += 2) {
const chunk = chunks[i];
const r = findH264NaluTypeInNalu(chunk.subarray(12), naluType);
if (r)
return r;
}
}

export function findH264NaluTypeInNalu(nalu: Buffer, naluType: number) {
Expand Down Expand Up @@ -124,7 +130,15 @@ export function findH264NaluTypeInNalu(nalu: Buffer, naluType: number) {
export function getNaluTypes(streamChunk: StreamChunk) {
if (streamChunk.type !== 'h264')
return new Set<number>();
return getNaluTypesInNalu(streamChunk.chunks[streamChunk.chunks.length - 1].subarray(12))
const sets: Set<number>[] = [];
const { chunks } = streamChunk;
for (let i = 1; i < chunks.length; i += 2) {
const chunk = chunks[i];
const r = getNaluTypesInNalu(chunk.subarray(12));
sets.push(r);
}

return new Set(sets.map(s => [...s]).flat());
}

export function getNaluFragmentInformation(nalu: Buffer) {
Expand Down Expand Up @@ -301,7 +315,7 @@ const quote = (str: string): string => `"${str.replace(/"/g, '\\"')}"`;
export interface RtspClientSetupOptions {
type: 'tcp' | 'udp';
path?: string;
onRtp: (rtspHeader: Buffer, rtp: Buffer) => void;
onRtp: (...headerBuffers: [Buffer, Buffer][]) => void;
}

export interface RtspClientTcpSetupOptions extends RtspClientSetupOptions {
Expand Down Expand Up @@ -412,7 +426,7 @@ export class RtspClient extends RtspBase {
const data = await readLength(this.client, length);

const options = this.setupOptions.get(channel);
options?.onRtp?.(header, data);
options?.onRtp?.([header, data]);
}

async readDataPayload() {
Expand All @@ -424,32 +438,26 @@ export class RtspClient extends RtspBase {
return new Error('RTSP Client received invalid frame magic. This may be a bug in your camera firmware. If this error persists, switch your RTSP Parser to FFmpeg or Scrypted (UDP): ' + header.toString());
}

async readLoopLegacy() {
try {
while (true) {
if (this.needKeepAlive) {
this.needKeepAlive = false;
if (this.hasGetParameter)
await this.getParameter();
else
await this.options();
}
await this.readDataPayload();
}
}
catch (e) {
this.client.destroy(e);
throw e;
}
}

async readLoop() {
const deferred = new Deferred<void>();

let headerBuffers: [Buffer, Buffer][] = [];
let header: Buffer;
let channel: number;
let length: number;

const flush = (newChannel?: number) => {
const c = channel;
channel = newChannel;
const channelChange = newChannel !== c;
if (!channelChange || !headerBuffers.length)
return;
const hb = headerBuffers;
headerBuffers = [];
const options = this.setupOptions.get(c);
options?.onRtp?.(...hb);
}

const read = async () => {
if (this.needKeepAlive) {
this.needKeepAlive = false;
Expand All @@ -465,14 +473,19 @@ export class RtspClient extends RtspBase {
if (!header) {
header = this.client.read(4);

if (!header)
if (!header) {
// flush if waiting for a header.
flush();
return;
}

// validate header once.
if (header[0] !== RTSP_FRAME_MAGIC) {
if (header.toString() !== 'RTSP')
throw this.createBadHeader(header);

flush();

this.client.unshift(header);
header = undefined;

Expand All @@ -489,18 +502,23 @@ export class RtspClient extends RtspBase {
continue;
}

channel = header.readUInt8(1);
const newChannel = header.readUInt8(1);
flush(newChannel);
length = header.readUInt16BE(2);
}

const currentChannel = channel;
const data = this.client.read(length);
if (!data)
if (!data) {
// flush if waiting for data, but restore the channel.
flush();
channel = currentChannel;
return;
}

const h = header;
header = undefined;
const options = this.setupOptions.get(channel);
options?.onRtp?.(h, data);
headerBuffers.push([h, data]);
}
}
catch (e) {
Expand Down Expand Up @@ -674,7 +692,7 @@ export class RtspClient extends RtspBase {
this.client.on('close', () => closeQuiet(udp.server));
}
port = options.dgram.address().port;
options.dgram.on('message', data => options.onRtp(undefined, data));
options.dgram.on('message', data => options.onRtp([undefined, data]));
}
headers = Object.assign({
Transport: `RTP/AVP${protocol};unicast;${client}=${port}-${port + 1}`,
Expand Down
5 changes: 3 additions & 2 deletions common/src/stream-parser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,9 @@ export function createMpegTsParser(options?: StreamParserOptions): StreamParser
for (let prebufferIndex = 0; prebufferIndex < streamChunks.length; prebufferIndex++) {
const streamChunk = streamChunks[prebufferIndex];

for (let chunkIndex = 0; chunkIndex < streamChunk.chunks.length; chunkIndex++) {
const chunk = streamChunk.chunks[chunkIndex];
const { chunks } = streamChunk;
for (let chunkIndex = 0; chunkIndex < chunks.length; chunkIndex++) {
const chunk = chunks[chunkIndex];

let offset = 0;
while (offset + 188 < chunk.length) {
Expand Down
4 changes: 2 additions & 2 deletions plugins/prebuffer-mixin/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion plugins/prebuffer-mixin/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@scrypted/prebuffer-mixin",
"version": "0.10.18",
"version": "0.10.19",
"description": "Video Stream Rebroadcast, Prebuffer, and Management Plugin for Scrypted.",
"author": "Scrypted",
"license": "Apache-2.0",
Expand Down
65 changes: 41 additions & 24 deletions plugins/prebuffer-mixin/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -676,26 +676,26 @@ class PrebufferSession {
session.killed.finally(() => clearTimeout(refreshTimeout));
}

let shifts = 0;
let prebufferContainer: PrebufferStreamChunk[] = this.rtspPrebuffer;
let shifts = 0;
let prebufferContainer: PrebufferStreamChunk[] = this.rtspPrebuffer;

session.on('rtsp', (chunk: PrebufferStreamChunk) => {
const now = Date.now();
session.on('rtsp', (chunk: PrebufferStreamChunk) => {
const now = Date.now();

chunk.time = now;
prebufferContainer.push(chunk);
chunk.time = now;
prebufferContainer.push(chunk);

while (prebufferContainer.length && prebufferContainer[0].time < now - prebufferDurationMs) {
prebufferContainer.shift();
shifts++;
}
while (prebufferContainer.length && prebufferContainer[0].time < now - prebufferDurationMs) {
prebufferContainer.shift();
shifts++;
}

if (shifts > 100000) {
prebufferContainer = prebufferContainer.slice();
this.rtspPrebuffer = prebufferContainer;
shifts = 0;
}
});
if (shifts > 100000) {
prebufferContainer = prebufferContainer.slice();
this.rtspPrebuffer = prebufferContainer;
shifts = 0;
}
});

session.start();
return session;
Expand Down Expand Up @@ -929,15 +929,22 @@ class PrebufferSession {
if (!interleavePassthrough) {
if (channel == undefined) {
const udp = serverPortMap.get(chunk.type);
if (udp)
server.sendTrack(udp.control, chunk.chunks[1], chunk.type.startsWith('rtcp-'));
if (udp) {
const { chunks } = chunk;
for (let i = 1; i < chunks.length; i += 2) {
const c = chunks[i];
server.sendTrack(udp.control, c, chunk.type.startsWith('rtcp-'));
}
}
return;
}

const chunks = chunk.chunks.slice();
const header = Buffer.from(chunks[0]);
header.writeUInt8(channel, 1);
chunks[0] = header;
for (let i = 0; i < chunks.length; i += 2) {
const header = Buffer.from(chunks[0]);
header.writeUInt8(channel, 1);
chunks[i] = header;
}
chunk = {
startStream: chunk.startStream,
chunks,
Expand All @@ -948,7 +955,12 @@ class PrebufferSession {
}

if (server.writeStream) {
server.writeRtpPayload(chunk.chunks[0], chunk.chunks[1]);
const { chunks } = chunk;
for (let i = 0; i < chunks.length; i += 2) {
const header = chunks[i];
const rtp = chunks[i + 1];
server.writeRtpPayload(header, rtp);
}
return;
}

Expand Down Expand Up @@ -1155,8 +1167,13 @@ class PrebufferMixin extends SettingsMixinDeviceBase<VideoCamera> implements Vid
requestedPrebuffer,
filter: (chunk, prebuffer) => {
const track = map.get(chunk.type);
if (track)
server.sendTrack(track, chunk.chunks[1], false);
if (track) {
const { chunks } = chunk;
for (let i = 1; i < chunks.length; i += 2) {
const c = chunks[i];
server.sendTrack(track, c, false);
}
}
return undefined;
}
});
Expand Down
5 changes: 2 additions & 3 deletions plugins/prebuffer-mixin/src/rfc4571.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import { cloneDeep } from "@scrypted/common/src/clone-deep";
import { ParserOptions, ParserSession, setupActivityTimer } from "@scrypted/common/src/ffmpeg-rebroadcast";
import { read16BELengthLoop } from "@scrypted/common/src/read-stream";
import { findH264NaluType, H264_NAL_TYPE_SPS, RTSP_FRAME_MAGIC } from "@scrypted/common/src/rtsp-server";
import { H264_NAL_TYPE_SPS, RTSP_FRAME_MAGIC, findH264NaluType } from "@scrypted/common/src/rtsp-server";
import { parseSdp } from "@scrypted/common/src/sdp-utils";
import { sleep } from "@scrypted/common/src/sleep";
import { StreamChunk } from "@scrypted/common/src/stream-parser";
import { MediaStreamOptions, ResponseMediaStreamOptions } from "@scrypted/sdk";
import { parse as spsParse } from "h264-sps-parser";
import net from 'net';
import { EventEmitter, Readable } from "stream";
import { ParserSession, setupActivityTimer } from "./ffmpeg-rebroadcast";
import { getSpsResolution } from "./sps-resolution";

export function negotiateMediaStream(sdp: string, mediaStreamOptions: MediaStreamOptions, inputVideoCodec: string, inputAudioCodec: string, requestMediaStream: MediaStreamOptions) {
Expand Down
24 changes: 13 additions & 11 deletions plugins/prebuffer-mixin/src/rtsp-session.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { ParserSession, setupActivityTimer } from "@scrypted/common/src/ffmpeg-rebroadcast";
import { closeQuiet, createBindZero } from "@scrypted/common/src/listen-cluster";
import { findH264NaluType, H264_NAL_TYPE_SPS, parseSemicolonDelimited, RtspClient, RtspClientUdpSetupOptions, RTSP_FRAME_MAGIC } from "@scrypted/common/src/rtsp-server";
import { closeQuiet } from "@scrypted/common/src/listen-cluster";
import { H264_NAL_TYPE_SPS, RTSP_FRAME_MAGIC, RtspClient, RtspClientUdpSetupOptions, findH264NaluType, parseSemicolonDelimited } from "@scrypted/common/src/rtsp-server";
import { parseSdp } from "@scrypted/common/src/sdp-utils";
import { StreamChunk } from "@scrypted/common/src/stream-parser";
import { ResponseMediaStreamOptions } from "@scrypted/sdk";
import dgram from 'dgram';
import { parse as spsParse } from "h264-sps-parser";
import { EventEmitter } from "stream";
import { ParserSession, setupActivityTimer } from "./ffmpeg-rebroadcast";
import { negotiateMediaStream } from "./rfc4571";
import { getSpsResolution } from "./sps-resolution";

Expand Down Expand Up @@ -95,13 +95,15 @@ export async function startRtspSession(console: Console, url: string, mediaStrea
const setup: RtspClientUdpSetupOptions = {
path: control,
type: 'udp',
onRtp: (header, data) => {
const prefix = Buffer.alloc(4);
prefix.writeUInt8(RTSP_FRAME_MAGIC, 0);
prefix.writeUInt8(rtspChannel, 1);
prefix.writeUInt16BE(data.length, 2);
onRtp: (...headerBuffers) => {
const chunk: StreamChunk = {
chunks: [prefix, data],
chunks: headerBuffers.map(headerBuffer => headerBuffer[1]).map(data => {
const prefix = Buffer.alloc(4);
prefix.writeUInt8(RTSP_FRAME_MAGIC, 0);
prefix.writeUInt8(rtspChannel, 1);
prefix.writeUInt16BE(data.length, 2);
return [prefix, data];
}).flat(),
type: codec,
};
events.emit('rtsp', chunk);
Expand Down Expand Up @@ -129,9 +131,9 @@ export async function startRtspSession(console: Console, url: string, mediaStrea
path: control,
type: 'tcp',
port: channel,
onRtp: (header, data) => {
onRtp: (...headerBuffers) => {
const chunk: StreamChunk = {
chunks: [header, data],
chunks: headerBuffers.flat(),
type: codec,
};
events.emit('rtsp', chunk);
Expand Down

0 comments on commit f677cf7

Please sign in to comment.