Skip to content

Commit

Permalink
Add data stream support (#361)
Browse files Browse the repository at this point in the history
* Add data stream receiving

* updat lock file

* pass participant identity

* streams in both directions

* add optional override chunk id

* remove test fallback

* Add file send API

* add docs

* update README

* remove todos

* remove totalChunks

* update ffi protocol

* update rust

* update proto

* reuse

* nicer stream API

* include info in streamwriter

* allow to pass messageId

* fix remaining bugs

* update rust sdk

* Create many-suns-smoke.md

* add sendText method

* update naming

* add streamBytes method

* stream callbacks

* better example

* update example for interop

* add lfs for assets

* update example asset

* make CI happy

* point rust ffi to 0.12.8

* explicit remove handlers

* make example write to file

* add extension in file name
  • Loading branch information
lukasIO authored Jan 27, 2025
1 parent 473c46b commit 88ba6b3
Show file tree
Hide file tree
Showing 27 changed files with 2,154 additions and 108 deletions.
5 changes: 5 additions & 0 deletions .changeset/many-suns-smoke.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@livekit/rtc-node": patch
---

Add data stream support
1 change: 1 addition & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
**/*.jpg filter=lfs diff=lfs merge=lfs -text
2 changes: 1 addition & 1 deletion REUSE.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ SPDX-PackageDownloadLocation = "https://github.com/livekit/node-sdks"

# trivial files
[[annotations]]
path = [".gitignore", ".gitmodules", "flake.lock", ".github/**", "packages/livekit-rtc/.gitignore", ".changeset/**", "**/CHANGELOG.md", "NOTICE"]
path = [".gitignore", ".gitmodules", ".gitattributes", "flake.lock", ".github/**", "packages/livekit-rtc/.gitignore", ".changeset/**", "**/CHANGELOG.md", "NOTICE"]
SPDX-FileCopyrightText = "2024 LiveKit, Inc."
SPDX-License-Identifier = "Apache-2.0"

Expand Down
6 changes: 6 additions & 0 deletions examples/data-streams/.env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# 1. Copy this file and rename it to .env
# 2. Update the enviroment variables below.

LIVEKIT_API_KEY=mykey
LIVEKIT_API_SECRET=mysecret
LIVEKIT_URL=wss://myproject.livekit.cloud
36 changes: 36 additions & 0 deletions examples/data-streams/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Data Streams Example

This example demonstrates how to use DataStreams to stream and receive both text and files from other LiveKit participants.

## Prerequisites

Before running this example, make sure you have:

1. Node.js installed on your machine.
2. A LiveKit server running (either locally or remotely).
3. LiveKit API key and secret.

## Setup

1. Install dependencies:

```
pnpm install
```

2. Create a `.env.local` file in the example directory with your LiveKit credentials:
```
LIVEKIT_API_KEY=your_api_key
LIVEKIT_API_SECRET=your_api_secret
LIVEKIT_URL=your_livekit_url
```

## Running the Example

To run the example, use the following command:

```
pnpm run start
```

The example will log to your terminal.
3 changes: 3 additions & 0 deletions examples/data-streams/assets/maybemexico.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
103 changes: 103 additions & 0 deletions examples/data-streams/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import {
type ByteStreamReader,
type RemoteParticipant,
Room,
RoomEvent,
type TextStreamReader,
} from '@livekit/rtc-node';
import { config } from 'dotenv';
import fs from 'fs';
import { AccessToken } from 'livekit-server-sdk';

config({ path: '.env.local', override: false });
const LIVEKIT_API_KEY = process.env.LIVEKIT_API_KEY;
const LIVEKIT_API_SECRET = process.env.LIVEKIT_API_SECRET;
const LIVEKIT_URL = process.env.LIVEKIT_URL;
if (!LIVEKIT_API_KEY || !LIVEKIT_API_SECRET || !LIVEKIT_URL) {
throw new Error('Missing required environment variables. Please check your .env.local file.');
}

const greetParticipant = async (room: Room, recipient: RemoteParticipant) => {
const greeting = 'Hi this is just a text sample';
const streamWriter = await room.localParticipant?.streamText({
destinationIdentities: [recipient.identity],
topic: 'chat',
});

for (const c of greeting) {
await streamWriter?.write(c);
}

streamWriter?.close();
};

const sendFile = async (room: Room, recipient: RemoteParticipant) => {
console.log('sending file');
await room.localParticipant?.sendFile('./assets/maybemexico.jpg', {
destinationIdentities: [recipient.identity],
name: 'mex.jpg',
topic: 'welcome',
mimeType: 'image/jpg',
});
console.log('done sending file');
};

const main = async () => {
const roomName = `dev`;
const identity = 'tester';
const token = await createToken(identity, roomName);

const room = new Room();

const finishedPromise = new Promise((resolve) => {
room.on(RoomEvent.ParticipantDisconnected, resolve);
});

room.setTextStreamHandler(async (reader: TextStreamReader, { identity }) => {
console.log(`chat message from ${identity}: ${await reader.readAll()}`);
// for await (const { collected } of reader) {
// console.log(collected);
// }
}, 'chat');

room.setByteStreamHandler(async (reader: ByteStreamReader, { identity }) => {
console.log(`welcome image received from ${identity}: ${reader.info.name}`);

// create write stream and write received file to disk, make sure ./temp folder exists
const writer = fs.createWriteStream(`./temp/${reader.info.name}`, {});

for await (const chunk of reader) {
writer.write(chunk);
}
writer.close();
}, 'welcome');

room.on(RoomEvent.ParticipantConnected, async (participant) => {
await sendFile(room, participant);
await greetParticipant(room, participant);
});

await room.connect(LIVEKIT_URL, token);

for (const [, p] of room.remoteParticipants) {
await sendFile(room, p);
await greetParticipant(room, p);
}

await finishedPromise;
};

const createToken = async (identity: string, roomName: string) => {
const token = new AccessToken(LIVEKIT_API_KEY, LIVEKIT_API_SECRET, {
identity,
});
token.addGrant({
room: roomName,
roomJoin: true,
canPublish: true,
canSubscribe: true,
});
return await token.toJwt();
};

main();
23 changes: 23 additions & 0 deletions examples/data-streams/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"name": "example-data-streams",
"author": "LiveKit",
"private": "true",
"description": "Example of using data streams in LiveKit",
"type": "module",
"main": "index.ts",
"scripts": {
"lint": "eslint -f unix \"**/*.ts\"",
"start": "tsx index.ts"
},
"keywords": [],
"license": "Apache-2.0",
"dependencies": {
"@livekit/rtc-node": "workspace:*",
"dotenv": "^16.4.5",
"livekit-server-sdk": "workspace:*"
},
"devDependencies": {
"@types/node": "^20.10.4",
"tsx": "^4.7.1"
}
}
2 changes: 2 additions & 0 deletions examples/data-streams/temp/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
*
!.gitignore
3 changes: 2 additions & 1 deletion packages/livekit-rtc/generate_proto.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ PATH=$PATH:$(pwd)/node_modules/.bin \
$FFI_PROTOCOL/video_frame.proto \
$FFI_PROTOCOL/e2ee.proto \
$FFI_PROTOCOL/stats.proto \
$FFI_PROTOCOL/rpc.proto
$FFI_PROTOCOL/rpc.proto \
$FFI_PROTOCOL/track_publication.proto
4 changes: 3 additions & 1 deletion packages/livekit-rtc/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@
"dependencies": {
"@bufbuild/protobuf": "^1.10.0",
"@livekit/mutex": "^1.0.0",
"@livekit/typed-emitter": "^3.0.0"
"@livekit/typed-emitter": "^3.0.0",
"pino": "^8.19.0",
"pino-pretty": "^11.0.0"
},
"devDependencies": {
"@napi-rs/cli": "^2.18.0",
Expand Down
2 changes: 1 addition & 1 deletion packages/livekit-rtc/rust-sdks
Submodule rust-sdks updated 40 files
+3 −0 .github/workflows/publish.yml
+7 −7 Cargo.lock
+4 −4 Cargo.toml
+72 −2 examples/rpc/src/main.rs
+1 −1 libwebrtc/.nanparc
+6 −0 libwebrtc/CHANGELOG.md
+1 −1 libwebrtc/Cargo.toml
+4 −0 libwebrtc/src/data_channel.rs
+4 −0 libwebrtc/src/native/data_channel.rs
+1 −1 livekit-api/Cargo.toml
+2 −0 livekit-api/src/services/ingress.rs
+42 −5 livekit-api/src/services/sip.rs
+1 −1 livekit-ffi/.nanparc
+26 −0 livekit-ffi/CHANGELOG.md
+1 −1 livekit-ffi/Cargo.toml
+19 −0 livekit-ffi/protocol/ffi.proto
+101 −17 livekit-ffi/protocol/room.proto
+2 −0 livekit-ffi/protocol/video_frame.proto
+73 −10 livekit-ffi/src/conversion/room.rs
+200 −34 livekit-ffi/src/livekit.proto.rs
+3 −2 livekit-ffi/src/server/mod.rs
+63 −1 livekit-ffi/src/server/requests.rs
+127 −5 livekit-ffi/src/server/room.rs
+1 −1 livekit-protocol/.nanparc
+7 −0 livekit-protocol/CHANGELOG.md
+1 −1 livekit-protocol/Cargo.toml
+1 −1 livekit-protocol/protocol
+253 −20 livekit-protocol/src/livekit.rs
+1,274 −203 livekit-protocol/src/livekit.serde.rs
+1 −1 livekit/.nanparc
+21 −0 livekit/CHANGELOG.md
+1 −1 livekit/Cargo.toml
+95 −19 livekit/src/room/mod.rs
+40 −28 livekit/src/room/participant/local_participant.rs
+30 −6 livekit/src/rtc_engine/mod.rs
+20 −3 livekit/src/rtc_engine/rtc_events.rs
+200 −21 livekit/src/rtc_engine/rtc_session.rs
+1 −0 webrtc-sys/include/livekit/data_channel.h
+4 −0 webrtc-sys/src/data_channel.cpp
+1 −0 webrtc-sys/src/data_channel.rs
7 changes: 7 additions & 0 deletions packages/livekit-rtc/src/data_streams/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
// SPDX-FileCopyrightText: 2024 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0

export * from './stream_reader.js';
export * from './stream_writer.js';
export type * from './types.js';
166 changes: 166 additions & 0 deletions packages/livekit-rtc/src/data_streams/stream_reader.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// SPDX-FileCopyrightText: 2024 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import type { ReadableStream } from 'node:stream/web';
import { log } from '../log.js';
import type { DataStream_Chunk } from '../proto/room_pb.js';
import { bigIntToNumber } from '../utils.js';
import type { BaseStreamInfo, ByteStreamInfo, TextStreamChunk, TextStreamInfo } from './types.js';

abstract class BaseStreamReader<T extends BaseStreamInfo> {
protected reader: ReadableStream<DataStream_Chunk>;

protected totalByteSize?: number;

protected _info: T;

protected bytesReceived: number;

get info() {
return this._info;
}

constructor(info: T, stream: ReadableStream<DataStream_Chunk>, totalByteSize?: number) {
this.reader = stream;
this.totalByteSize = totalByteSize;
this._info = info;
this.bytesReceived = 0;
}

protected abstract handleChunkReceived(chunk: DataStream_Chunk): void;

onProgress?: (progress: number | undefined) => void;

abstract readAll(): Promise<string | Array<Uint8Array>>;
}

/**
* A class to read chunks from a ReadableStream and provide them in a structured format.
*/
export class ByteStreamReader extends BaseStreamReader<ByteStreamInfo> {
protected handleChunkReceived(chunk: DataStream_Chunk) {
this.bytesReceived += chunk.content!.byteLength;
const currentProgress = this.totalByteSize
? this.bytesReceived / this.totalByteSize
: undefined;
this.onProgress?.(currentProgress);
}

[Symbol.asyncIterator]() {
const reader = this.reader.getReader();

return {
next: async (): Promise<IteratorResult<Uint8Array>> => {
try {
const { done, value } = await reader.read();
if (done) {
return { done: true, value: undefined as any };
} else {
this.handleChunkReceived(value);
return { done: false, value: value.content! };
}
} catch (error) {
log.error('error processing stream update', error);
return { done: true, value: undefined };
}
},

return(): IteratorResult<Uint8Array> {
reader.releaseLock();
return { done: true, value: undefined };
},
};
}

async readAll(): Promise<Array<Uint8Array>> {
const chunks: Set<Uint8Array> = new Set();
for await (const chunk of this) {
chunks.add(chunk);
}
return Array.from(chunks);
}
}

/**
* A class to read chunks from a ReadableStream and provide them in a structured format.
*/
export class TextStreamReader extends BaseStreamReader<TextStreamInfo> {
private receivedChunks: Map<number, DataStream_Chunk>;

/**
* A TextStreamReader instance can be used as an AsyncIterator that returns the entire string
* that has been received up to the current point in time.
*/
constructor(
info: TextStreamInfo,
stream: ReadableStream<DataStream_Chunk>,
totalChunkCount?: number,
) {
super(info, stream, totalChunkCount);
this.receivedChunks = new Map();
}

protected handleChunkReceived(chunk: DataStream_Chunk) {
const index = bigIntToNumber(chunk.chunkIndex!);
const previousChunkAtIndex = this.receivedChunks.get(index!);
if (previousChunkAtIndex && previousChunkAtIndex.version! > chunk.version!) {
// we have a newer version already, dropping the old one
return;
}
this.receivedChunks.set(index, chunk);
const currentProgress = this.totalByteSize
? this.receivedChunks.size / this.totalByteSize
: undefined;
this.onProgress?.(currentProgress);
}

/**
* Async iterator implementation to allow usage of `for await...of` syntax.
* Yields structured chunks from the stream.
*
*/
[Symbol.asyncIterator]() {
const reader = this.reader.getReader();
const decoder = new TextDecoder();

return {
next: async (): Promise<IteratorResult<TextStreamChunk>> => {
try {
const { done, value } = await reader.read();
if (done) {
return { done: true, value: undefined };
} else {
this.handleChunkReceived(value);
return {
done: false,
value: {
index: bigIntToNumber(value.chunkIndex)!,
current: decoder.decode(value.content!),
collected: Array.from(this.receivedChunks.values())
.sort((a, b) => bigIntToNumber(a.chunkIndex!) - bigIntToNumber(b.chunkIndex!))
.map((chunk) => decoder.decode(chunk.content!))
.join(''),
},
};
}
} catch (error) {
log.error('error processing stream update', error);
return { done: true, value: undefined };
}
},

return(): IteratorResult<TextStreamChunk> {
reader.releaseLock();
return { done: true, value: undefined };
},
};
}

async readAll(): Promise<string> {
let latestString: string = '';
for await (const { collected } of this) {
latestString = collected;
}
return latestString;
}
}
Loading

0 comments on commit 88ba6b3

Please sign in to comment.