Skip to content

Commit

Permalink
Revert "rebroadcast: more parser refactor"
Browse files Browse the repository at this point in the history
This reverts commit 5432b5b.
  • Loading branch information
koush committed May 30, 2024
1 parent e3cdd43 commit 1e2fd46
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 30 deletions.
17 changes: 9 additions & 8 deletions common/src/rtsp-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ const quote = (str: string): string => `"${str.replace(/"/g, '\\"')}"`;
export interface RtspClientSetupOptions {
type: 'tcp' | 'udp';
path?: string;
onRtp: (headerBuffers: Buffer[]) => void;
onRtp: (...headerBuffers: [Buffer, Buffer][]) => void;
}

export interface RtspClientTcpSetupOptions extends RtspClientSetupOptions {
Expand Down Expand Up @@ -441,20 +441,21 @@ export class RtspClient extends RtspBase {
async readLoop() {
const deferred = new Deferred<void>();

let headerBuffers: Buffer[] = [];
let headerBuffers: [Buffer, Buffer][] = [];
let header: Buffer;
let channel: number;
let length: number;

const flush = (newChannel?: number) => {
const c = channel;
channel = newChannel;
if (!headerBuffers.length || newChannel === c)
const channelChange = newChannel !== c;
if (!channelChange || !headerBuffers.length)
return;
const hb = headerBuffers;
headerBuffers = [];
const options = this.setupOptions.get(c);
options?.onRtp?.(hb);
options?.onRtp?.(...hb);
}

const read = async () => {
Expand All @@ -480,11 +481,11 @@ export class RtspClient extends RtspBase {

// validate header once.
if (header[0] !== RTSP_FRAME_MAGIC) {
flush();

if (header.toString() !== 'RTSP')
throw this.createBadHeader(header);

flush();

this.client.unshift(header);
header = undefined;

Expand All @@ -506,18 +507,18 @@ export class RtspClient extends RtspBase {
length = header.readUInt16BE(2);
}

const currentChannel = channel;
const data = this.client.read(length);
if (!data) {
// flush if waiting for data, but restore the channel.
const currentChannel = channel;
flush();
channel = currentChannel;
return;
}

const h = header;
header = undefined;
headerBuffers.push(h, data);
headerBuffers.push([h, data]);
}
}
catch (e) {
Expand Down
22 changes: 10 additions & 12 deletions plugins/prebuffer-mixin/src/rtsp-session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,17 +95,15 @@ export async function startRtspSession(console: Console, url: string, mediaStrea
const setup: RtspClientUdpSetupOptions = {
path: control,
type: 'udp',
onRtp: (headerBuffers) => {
for (let i = 0; i < headerBuffers.length; i += 2) {
const data = headerBuffers[i + 1];
const prefix = Buffer.alloc(4);
prefix.writeUInt8(RTSP_FRAME_MAGIC, 0);
prefix.writeUInt8(rtspChannel, 1);
prefix.writeUInt16BE(data.length, 2);
headerBuffers[i] = prefix;
}
onRtp: (...headerBuffers) => {
const chunk: StreamChunk = {
chunks: headerBuffers,
chunks: headerBuffers.map(headerBuffer => headerBuffer[1]).map(data => {
const prefix = Buffer.alloc(4);
prefix.writeUInt8(RTSP_FRAME_MAGIC, 0);
prefix.writeUInt8(rtspChannel, 1);
prefix.writeUInt16BE(data.length, 2);
return [prefix, data];
}).flat(),
type: codec,
};
events.emit('rtsp', chunk);
Expand Down Expand Up @@ -133,9 +131,9 @@ export async function startRtspSession(console: Console, url: string, mediaStrea
path: control,
type: 'tcp',
port: channel,
onRtp: (headerBuffers) => {
onRtp: (...headerBuffers) => {
const chunk: StreamChunk = {
chunks: headerBuffers,
chunks: headerBuffers.flat(),
type: codec,
};
events.emit('rtsp', chunk);
Expand Down
12 changes: 2 additions & 10 deletions plugins/webrtc/src/rtp-forwarders.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,7 @@ async function setupRtspClient(console: Console, rtspClient: RtspClient, channel
const result = await rtspClient.setup({
type: 'udp',
path: section.control,
onRtp: (headerBuffers) => {
for (let i = 1; i < headerBuffers.length; i += 2) {
deliver(headerBuffers[i]);
}
},
onRtp: (rtspHeader, rtp) => deliver(rtp),
});
// console.log('rtsp/udp', section.codec, result);
return false;
Expand All @@ -84,11 +80,7 @@ async function setupRtspClient(console: Console, rtspClient: RtspClient, channel
type: 'tcp',
port: channel,
path: section.control,
onRtp: (headerBuffers) => {
for (let i = 1; i < headerBuffers.length; i += 2) {
deliver(headerBuffers[i]);
}
},
onRtp: (rtspHeader, rtp) => deliver(rtp),
});
// console.log('rtsp/tcp', section.codec);
return true;
Expand Down

0 comments on commit 1e2fd46

Please sign in to comment.