From 687e0e8ca4d6a22d0a46b9b50e1992e9a776f9fe Mon Sep 17 00:00:00 2001 From: Amish Shah Date: Tue, 10 Aug 2021 11:37:33 +0100 Subject: [PATCH] feat(VoiceReceive)!: improve usability (#136) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * refactor(VoiceReceiver): begin refactor * feat(SSRCMap): resolve function * feat(VoiceConnection): close all streams in non-ready state * refactor(VoiceReceiver): map by user ID * feat(VoiceReceiver): allow specifying end type for streams * feat(VoiceReceiver): add SpeakingMap * refactor(SSRCMap): remove unused resolve method * test(VoiceReceiver): add test for SpeakingMap * test(VoiceReceiver): add tests for AudioReceiveStream * test(AudioReceiver): strengthen AudioReceiveStream tests * test(VoiceReceiver): remove inapplicable tests * test(VoiceConnection): fix test errors * test(VoiceConnection): test receiver bindings tracking * test(VoiceReceiver): decrypt * chore: remove unused code * fix(AudioReceiveStream): close normally * feat(Examples): update receiver example * feat(Examples): create recorder example * docs(VoiceReceiver): add docs for receive classes * refactor: suggestions from code review Co-authored-by: Antonio Román Co-authored-by: Antonio Román --- examples/README.md | 3 +- examples/recorder/.eslintrc.json | 7 + examples/recorder/.gitignore | 4 + examples/recorder/README.md | 23 +++ examples/recorder/auth.example.json | 3 + examples/recorder/package.json | 28 ++++ examples/recorder/recordings/.gitkeep | 0 examples/recorder/src/bot.ts | 46 ++++++ .../recorder/src/createListeningStream.ts | 42 ++++++ examples/recorder/src/deploy.ts | 26 ++++ examples/recorder/src/interactions.ts | 92 ++++++++++++ examples/recorder/tsconfig.eslint.json | 3 + examples/recorder/tsconfig.json | 13 ++ src/VoiceConnection.ts | 57 ++++++- src/__tests__/VoiceConnection.test.ts | 93 ++++++++++++ src/receive/AudioReceiveStream.ts | 72 ++++++++- src/receive/SSRCMap.ts | 2 + src/receive/SpeakingMap.ts | 60 ++++++++ src/receive/VoiceReceiver.ts | 140 +++++------------- .../__tests__/AudioReceiveStream.test.ts | 71 +++++++++ src/receive/__tests__/SpeakingMap.test.ts | 32 ++++ src/receive/__tests__/VoiceReceiver.test.ts | 128 +++++++--------- src/receive/__tests__/fixtures/states.ts | 70 --------- 23 files changed, 764 insertions(+), 251 deletions(-) create mode 100644 examples/recorder/.eslintrc.json create mode 100644 examples/recorder/.gitignore create mode 100644 examples/recorder/README.md create mode 100644 examples/recorder/auth.example.json create mode 100644 examples/recorder/package.json create mode 100644 examples/recorder/recordings/.gitkeep create mode 100644 examples/recorder/src/bot.ts create mode 100644 examples/recorder/src/createListeningStream.ts create mode 100644 examples/recorder/src/deploy.ts create mode 100644 examples/recorder/src/interactions.ts create mode 100644 examples/recorder/tsconfig.eslint.json create mode 100644 examples/recorder/tsconfig.json create mode 100644 src/receive/SpeakingMap.ts create mode 100644 src/receive/__tests__/AudioReceiveStream.test.ts create mode 100644 src/receive/__tests__/SpeakingMap.test.ts delete mode 100644 src/receive/__tests__/fixtures/states.ts diff --git a/examples/README.md b/examples/README.md index 95fd6e0c..74b69d22 100644 --- a/examples/README.md +++ b/examples/README.md @@ -1,7 +1,8 @@ # Examples | Example | Description | -| ------------------------ | ----------------------------------------------------------------------------------------------------------------------------------------------------------------- | +|--------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------| | [Basic](./basic) | A simple "Hello World" TypeScript example that plays an mp3 file. Notably, it works with discord.js v12 and so it also contains an example of creating an adapter | | [Radio Bot](./radio-bot) | A fun JavaScript example of what you can create using @discordjs/voice. A radio bot that plays output from your speakers in a Discord voice channel | | [Music Bot](./music-bot) | A TypeScript example of a YouTube music bot. Demonstrates how queues can be implemented and how to implement "good" disconnect/reconnection logic | +| [Recorder](./recorder) | An example of using voice receive to create a bot that can record audio from users | diff --git a/examples/recorder/.eslintrc.json b/examples/recorder/.eslintrc.json new file mode 100644 index 00000000..ab8cc9fa --- /dev/null +++ b/examples/recorder/.eslintrc.json @@ -0,0 +1,7 @@ +{ + "root": true, + "extends": "../../.eslintrc.json", + "parserOptions": { + "project": "./tsconfig.eslint.json" + } +} diff --git a/examples/recorder/.gitignore b/examples/recorder/.gitignore new file mode 100644 index 00000000..789c0675 --- /dev/null +++ b/examples/recorder/.gitignore @@ -0,0 +1,4 @@ +package-lock.json +auth.json +tsconfig.tsbuildinfo +recordings/*.ogg diff --git a/examples/recorder/README.md b/examples/recorder/README.md new file mode 100644 index 00000000..397542d5 --- /dev/null +++ b/examples/recorder/README.md @@ -0,0 +1,23 @@ +# 👂 Recorder Bot + +This example shows how you can use the voice receive functionality in @discordjs/voice to record users in voice channels +and save the audio to local Ogg files. + +## Usage + +```sh-session +# Clone the main repository, and then run: +$ npm install +$ npm run build + +# Open this example and install dependencies +$ cd examples/recorder +$ npm install + +# Set a bot token (see auth.example.json) +$ cp auth.example.json auth.json +$ nano auth.json + +# Start the bot! +$ npm start +``` diff --git a/examples/recorder/auth.example.json b/examples/recorder/auth.example.json new file mode 100644 index 00000000..34e3fca0 --- /dev/null +++ b/examples/recorder/auth.example.json @@ -0,0 +1,3 @@ +{ + "token": "Your Discord bot token here" +} diff --git a/examples/recorder/package.json b/examples/recorder/package.json new file mode 100644 index 00000000..06a49df7 --- /dev/null +++ b/examples/recorder/package.json @@ -0,0 +1,28 @@ +{ + "name": "receiver-bot", + "version": "0.0.1", + "description": "An example receiver bot written using @discordjs/voice", + "scripts": { + "start": "npm run build && node -r tsconfig-paths/register dist/bot", + "test": "echo \"Error: no test specified\" && exit 1", + "lint": "eslint src --ext .ts", + "lint:fix": "eslint src --ext .ts --fix", + "prettier": "prettier --write **/*.{ts,js,json,yml,yaml}", + "build": "tsc", + "build:check": "tsc --noEmit --incremental false" + }, + "author": "Amish Shah ", + "license": "MIT", + "dependencies": { + "@discordjs/opus": "^0.5.3", + "discord-api-types": "^0.22.0", + "discord.js": "^13.0.1", + "libsodium-wrappers": "^0.7.9", + "node-crc": "^1.3.2", + "prism-media": "^2.0.0-alpha.0" + }, + "devDependencies": { + "tsconfig-paths": "^3.10.1", + "typescript": "~4.3.5" + } +} diff --git a/examples/recorder/recordings/.gitkeep b/examples/recorder/recordings/.gitkeep new file mode 100644 index 00000000..e69de29b diff --git a/examples/recorder/src/bot.ts b/examples/recorder/src/bot.ts new file mode 100644 index 00000000..498dacaf --- /dev/null +++ b/examples/recorder/src/bot.ts @@ -0,0 +1,46 @@ +import Discord, { Interaction } from 'discord.js'; +import { getVoiceConnection } from '@discordjs/voice'; +import { deploy } from './deploy'; +import { interactionHandlers } from './interactions'; + +// eslint-disable-next-line @typescript-eslint/no-var-requires, @typescript-eslint/no-require-imports +const { token } = require('../auth.json'); + +const client = new Discord.Client({ intents: ['GUILD_VOICE_STATES', 'GUILD_MESSAGES', 'GUILDS'] }); + +client.on('ready', () => console.log('Ready!')); + +client.on('messageCreate', async (message) => { + if (!message.guild) return; + if (!client.application?.owner) await client.application?.fetch(); + + if (message.content.toLowerCase() === '!deploy' && message.author.id === client.application?.owner?.id) { + await deploy(message.guild); + await message.reply('Deployed!'); + } +}); + +/** + * The IDs of the users that can be recorded by the bot. + */ +const recordable = new Set(); + +client.on('interactionCreate', async (interaction: Interaction) => { + if (!interaction.isCommand() || !interaction.guildId) return; + + const handler = interactionHandlers.get(interaction.commandName); + + try { + if (handler) { + await handler(interaction, recordable, client, getVoiceConnection(interaction.guildId)); + } else { + await interaction.reply('Unknown command'); + } + } catch (error) { + console.warn(error); + } +}); + +client.on('error', console.warn); + +void client.login(token); diff --git a/examples/recorder/src/createListeningStream.ts b/examples/recorder/src/createListeningStream.ts new file mode 100644 index 00000000..4ae3162a --- /dev/null +++ b/examples/recorder/src/createListeningStream.ts @@ -0,0 +1,42 @@ +import { EndBehaviorType, VoiceReceiver } from '@discordjs/voice'; +import { User } from 'discord.js'; +import { createWriteStream } from 'fs'; +import { opus } from 'prism-media'; +import { pipeline } from 'stream'; + +function getDisplayName(userId: string, user?: User) { + return user ? `${user.username}_${user.discriminator}` : userId; +} + +export function createListeningStream(receiver: VoiceReceiver, userId: string, user?: User) { + const opusStream = receiver.subscribe(userId, { + end: { + behavior: EndBehaviorType.AfterSilence, + duration: 100, + }, + }); + + const oggStream = new opus.OggLogicalBitstream({ + opusHead: new opus.OpusHead({ + channelCount: 2, + sampleRate: 48000, + }), + pageSizeControl: { + maxPackets: 10, + }, + }); + + const filename = `./recordings/${Date.now()}-${getDisplayName(userId, user)}.ogg`; + + const out = createWriteStream(filename); + + console.log(`👂 Started recording ${filename}`); + + pipeline(opusStream, oggStream, out, (err) => { + if (err) { + console.warn(`❌ Error recording file ${filename} - ${err.message}`); + } else { + console.log(`✅ Recorded ${filename}`); + } + }); +} diff --git a/examples/recorder/src/deploy.ts b/examples/recorder/src/deploy.ts new file mode 100644 index 00000000..b8c414e7 --- /dev/null +++ b/examples/recorder/src/deploy.ts @@ -0,0 +1,26 @@ +import { Guild } from 'discord.js'; + +export const deploy = async (guild: Guild) => { + await guild.commands.set([ + { + name: 'join', + description: 'Joins the voice channel that you are in', + }, + { + name: 'record', + description: 'Enables recording for a user', + options: [ + { + name: 'speaker', + type: 'USER' as const, + description: 'The user to record', + required: true, + }, + ], + }, + { + name: 'leave', + description: 'Leave the voice channel', + }, + ]); +}; diff --git a/examples/recorder/src/interactions.ts b/examples/recorder/src/interactions.ts new file mode 100644 index 00000000..d27bcd38 --- /dev/null +++ b/examples/recorder/src/interactions.ts @@ -0,0 +1,92 @@ +import { entersState, joinVoiceChannel, VoiceConnection, VoiceConnectionStatus } from '@discordjs/voice'; +import { Client, CommandInteraction, GuildMember, Snowflake } from 'discord.js'; +import { createListeningStream } from './createListeningStream'; + +async function join( + interaction: CommandInteraction, + recordable: Set, + client: Client, + connection?: VoiceConnection, +) { + await interaction.deferReply(); + if (!connection) { + if (interaction.member instanceof GuildMember && interaction.member.voice.channel) { + const channel = interaction.member.voice.channel; + connection = joinVoiceChannel({ + channelId: channel.id, + guildId: channel.guild.id, + selfDeaf: false, + selfMute: true, + adapterCreator: channel.guild.voiceAdapterCreator, + }); + } else { + await interaction.followUp('Join a voice channel and then try that again!'); + return; + } + } + + try { + await entersState(connection, VoiceConnectionStatus.Ready, 20e3); + const receiver = connection.receiver; + + receiver.speaking.on('start', (userId) => { + if (recordable.has(userId)) { + createListeningStream(receiver, userId, client.users.cache.get(userId)); + } + }); + } catch (error) { + console.warn(error); + await interaction.followUp('Failed to join voice channel within 20 seconds, please try again later!'); + } + + await interaction.followUp('Ready!'); +} + +async function record( + interaction: CommandInteraction, + recordable: Set, + client: Client, + connection?: VoiceConnection, +) { + if (connection) { + const userId = interaction.options.get('speaker')!.value! as Snowflake; + recordable.add(userId); + + const receiver = connection.receiver; + if (connection.receiver.speaking.users.has(userId)) { + createListeningStream(receiver, userId, client.users.cache.get(userId)); + } + + await interaction.reply({ ephemeral: true, content: 'Listening!' }); + } else { + await interaction.reply({ ephemeral: true, content: 'Join a voice channel and then try that again!' }); + } +} + +async function leave( + interaction: CommandInteraction, + recordable: Set, + client: Client, + connection?: VoiceConnection, +) { + if (connection) { + connection.destroy(); + recordable.clear(); + await interaction.reply({ ephemeral: true, content: 'Left the channel!' }); + } else { + await interaction.reply({ ephemeral: true, content: 'Not playing in this server!' }); + } +} + +export const interactionHandlers = new Map< + string, + ( + interaction: CommandInteraction, + recordable: Set, + client: Client, + connection?: VoiceConnection, + ) => Promise +>(); +interactionHandlers.set('join', join); +interactionHandlers.set('record', record); +interactionHandlers.set('leave', leave); diff --git a/examples/recorder/tsconfig.eslint.json b/examples/recorder/tsconfig.eslint.json new file mode 100644 index 00000000..ea6be8e9 --- /dev/null +++ b/examples/recorder/tsconfig.eslint.json @@ -0,0 +1,3 @@ +{ + "extends": "./tsconfig.json" +} diff --git a/examples/recorder/tsconfig.json b/examples/recorder/tsconfig.json new file mode 100644 index 00000000..99c05985 --- /dev/null +++ b/examples/recorder/tsconfig.json @@ -0,0 +1,13 @@ +{ + "extends": "../../tsconfig.json", + "compilerOptions": { + "baseUrl": ".", + "outDir": "dist", + "paths": { + "@discordjs/voice": ["../../"], + "libsodium-wrappers": ["./node_modules/libsodium-wrappers"] + } + }, + "include": ["src/*.ts"], + "exclude": [""] +} diff --git a/src/VoiceConnection.ts b/src/VoiceConnection.ts index de9ca079..ea1de2c8 100644 --- a/src/VoiceConnection.ts +++ b/src/VoiceConnection.ts @@ -13,6 +13,8 @@ import { DiscordGatewayAdapterImplementerMethods } from './util/adapter'; import { Networking, NetworkingState, NetworkingStatusCode } from './networking/Networking'; import { Awaited, noop } from './util/util'; import { TypedEmitter } from 'tiny-typed-emitter'; +import { VoiceReceiver } from './receive'; +import type { VoiceWebSocket, VoiceUDPSocket } from './networking'; /** * The various status codes a voice connection can hold at any one time. @@ -192,6 +194,12 @@ export class VoiceConnection extends TypedEmitter { state: GatewayVoiceStateUpdateDispatchData | undefined; }; + /** + * The receiver of this voice connection. You should join the voice channel with `selfDeaf` set + * to false for this feature to work properly. + */ + public readonly receiver: VoiceReceiver; + /** * The debug logger function, if debugging is enabled. */ @@ -209,6 +217,8 @@ export class VoiceConnection extends TypedEmitter { this.debug = debug ? (message: string) => this.emit('debug', message) : null; this.rejoinAttempts = 0; + this.receiver = new VoiceReceiver(this); + this.onNetworkingClose = this.onNetworkingClose.bind(this); this.onNetworkingStateChange = this.onNetworkingStateChange.bind(this); this.onNetworkingError = this.onNetworkingError.bind(this); @@ -248,17 +258,24 @@ export class VoiceConnection extends TypedEmitter { const oldSubscription: PlayerSubscription | undefined = Reflect.get(oldState, 'subscription'); const newSubscription: PlayerSubscription | undefined = Reflect.get(newState, 'subscription'); - if (oldNetworking && oldNetworking !== newNetworking) { - oldNetworking.off('debug', this.onNetworkingDebug); - oldNetworking.on('error', noop); - oldNetworking.off('error', this.onNetworkingError); - oldNetworking.off('close', this.onNetworkingClose); - oldNetworking.off('stateChange', this.onNetworkingStateChange); - oldNetworking.destroy(); + if (oldNetworking !== newNetworking) { + if (oldNetworking) { + oldNetworking.on('error', noop); + oldNetworking.off('debug', this.onNetworkingDebug); + oldNetworking.off('error', this.onNetworkingError); + oldNetworking.off('close', this.onNetworkingClose); + oldNetworking.off('stateChange', this.onNetworkingStateChange); + oldNetworking.destroy(); + } + if (newNetworking) this.updateReceiveBindings(newNetworking.state, oldNetworking?.state); } if (newState.status === VoiceConnectionStatus.Ready) { this.rejoinAttempts = 0; + } else if (newState.status === VoiceConnectionStatus.Destroyed) { + for (const stream of this.receiver.subscriptions.values()) { + if (!stream.destroyed) stream.destroy(); + } } // If destroyed, the adapter can also be destroyed so it can be cleaned up by the user @@ -316,6 +333,31 @@ export class VoiceConnection extends TypedEmitter { */ } + /** + * Called when the networking state changes, and the new ws/udp packet/message handlers need to be rebound + * to the new instances. + * @param newState - The new networking state + * @param oldState - The old networking state, if there is one + */ + private updateReceiveBindings(newState: NetworkingState, oldState?: NetworkingState) { + const oldWs = Reflect.get(oldState ?? {}, 'ws') as VoiceWebSocket | undefined; + const newWs = Reflect.get(newState, 'ws') as VoiceWebSocket | undefined; + const oldUdp = Reflect.get(oldState ?? {}, 'udp') as VoiceUDPSocket | undefined; + const newUdp = Reflect.get(newState, 'udp') as VoiceUDPSocket | undefined; + + if (oldWs !== newWs) { + oldWs?.off('packet', this.receiver.onWsPacket); + newWs?.on('packet', this.receiver.onWsPacket); + } + + if (oldUdp !== newUdp) { + oldUdp?.off('message', this.receiver.onUdpMessage); + newUdp?.on('message', this.receiver.onUdpMessage); + } + + this.receiver.connectionData = Reflect.get(newState, 'connectionData') ?? {}; + } + /** * Attempts to configure a networking instance for this voice connection using the received packets. * Both packets are required, and any existing networking instance will be destroyed. @@ -400,6 +442,7 @@ export class VoiceConnection extends TypedEmitter { * @param newState - The new state */ private onNetworkingStateChange(oldState: NetworkingState, newState: NetworkingState) { + this.updateReceiveBindings(newState, oldState); if (oldState.code === newState.code) return; if (this.state.status !== VoiceConnectionStatus.Connecting && this.state.status !== VoiceConnectionStatus.Ready) return; diff --git a/src/__tests__/VoiceConnection.test.ts b/src/__tests__/VoiceConnection.test.ts index 296ae9b4..9b293fb7 100644 --- a/src/__tests__/VoiceConnection.test.ts +++ b/src/__tests__/VoiceConnection.test.ts @@ -8,11 +8,14 @@ import { VoiceConnectionSignallingState, VoiceConnectionStatus, } from '../VoiceConnection'; + import * as _DataStore from '../DataStore'; import * as _Networking from '../networking/Networking'; import * as _AudioPlayer from '../audio/AudioPlayer'; import { PlayerSubscription as _PlayerSubscription } from '../audio/PlayerSubscription'; import type { DiscordGatewayAdapterLibraryMethods } from '../util/adapter'; +import EventEmitter from 'events'; + jest.mock('../audio/AudioPlayer'); jest.mock('../audio/PlayerSubscription'); jest.mock('../DataStore'); @@ -23,6 +26,11 @@ const Networking = _Networking as unknown as jest.Mocked; const AudioPlayer = _AudioPlayer as unknown as jest.Mocked; const PlayerSubscription = _PlayerSubscription as unknown as jest.Mock<_PlayerSubscription>; +Networking.Networking.mockImplementation(function mockedConstructor() { + this.state = {}; + return this; +}); + function createFakeAdapter() { const sendPayload = jest.fn(); sendPayload.mockReturnValue(true); @@ -647,6 +655,91 @@ describe('VoiceConnection#onSubscriptionRemoved', () => { }); expect(subscription.unsubscribe).toHaveBeenCalledTimes(1); }); + + describe('updateReceiveBindings', () => { + test('Applies and removes udp listeners', () => { + // Arrange + const ws = new EventEmitter() as any; + + const oldNetworking = new Networking.Networking({} as any, false); + oldNetworking.state = { + code: _Networking.NetworkingStatusCode.Ready, + connectionData: {} as any, + connectionOptions: {} as any, + udp: new EventEmitter() as any, + ws, + }; + + const newNetworking = new Networking.Networking({} as any, false); + newNetworking.state = { + ...oldNetworking.state, + udp: new EventEmitter() as any, + }; + + const { voiceConnection } = createFakeVoiceConnection(); + + // Act + voiceConnection['updateReceiveBindings'](newNetworking.state, oldNetworking.state); + + // Assert + expect(oldNetworking.state.udp.listenerCount('message')).toBe(0); + expect(newNetworking.state.udp.listenerCount('message')).toBe(1); + expect(voiceConnection.receiver.connectionData).toBe(newNetworking.state.connectionData); + }); + + test('Applies and removes ws listeners', () => { + // Arrange + const udp = new EventEmitter() as any; + + const oldNetworking = new Networking.Networking({} as any, false); + oldNetworking.state = { + code: _Networking.NetworkingStatusCode.Ready, + connectionData: {} as any, + connectionOptions: {} as any, + udp, + ws: new EventEmitter() as any, + }; + + const newNetworking = new Networking.Networking({} as any, false); + newNetworking.state = { + ...oldNetworking.state, + ws: new EventEmitter() as any, + }; + + const { voiceConnection } = createFakeVoiceConnection(); + + // Act + voiceConnection['updateReceiveBindings'](newNetworking.state, oldNetworking.state); + + // Assert + expect(oldNetworking.state.ws.listenerCount('packet')).toBe(0); + expect(newNetworking.state.ws.listenerCount('packet')).toBe(1); + expect(voiceConnection.receiver.connectionData).toBe(newNetworking.state.connectionData); + }); + + test('Applies initial listeners', () => { + // Arrange + + const newNetworking = new Networking.Networking({} as any, false); + newNetworking.state = { + code: _Networking.NetworkingStatusCode.Ready, + connectionData: {} as any, + connectionOptions: {} as any, + udp: new EventEmitter() as any, + ws: new EventEmitter() as any, + }; + + const { voiceConnection } = createFakeVoiceConnection(); + + // Act + voiceConnection['updateReceiveBindings'](newNetworking.state, undefined); + + // Assert + expect(newNetworking.state.ws.listenerCount('packet')).toBe(1); + expect(newNetworking.state.udp.listenerCount('message')).toBe(1); + expect(voiceConnection.receiver.connectionData).toBe(newNetworking.state.connectionData); + }); + }); }); describe('Adapter', () => { diff --git a/src/receive/AudioReceiveStream.ts b/src/receive/AudioReceiveStream.ts index 581878f9..623dc4de 100644 --- a/src/receive/AudioReceiveStream.ts +++ b/src/receive/AudioReceiveStream.ts @@ -1,15 +1,85 @@ import { Readable, ReadableOptions } from 'stream'; +import { SILENCE_FRAME } from '../audio/AudioPlayer'; + +/** + * The different behaviors an audio receive stream can have for deciding when to end. + */ +export enum EndBehaviorType { + /** + * The stream will only end when manually destroyed. + */ + Manual, + /** + * The stream will end after a given time period of silence/no audio packets. + */ + AfterSilence, + /** + * The stream will end after a given time period of no audio packets. + */ + AfterInactivity, +} + +export type EndBehavior = + | { + behavior: EndBehaviorType.Manual; + } + | { + behavior: EndBehaviorType.AfterSilence | EndBehaviorType.AfterInactivity; + duration: number; + }; + +export interface AudioReceiveStreamOptions extends ReadableOptions { + end: EndBehavior; +} + +export function createDefaultAudioReceiveStreamOptions(): AudioReceiveStreamOptions { + return { + end: { + behavior: EndBehaviorType.Manual, + }, + }; +} /** * A readable stream of Opus packets received from a specific entity * in a Discord voice connection. */ export class AudioReceiveStream extends Readable { - public constructor(options?: ReadableOptions) { + /** + * The end behavior of the receive stream. + */ + public readonly end: EndBehavior; + + private endTimeout?: NodeJS.Timeout; + + public constructor({ end, ...options }: AudioReceiveStreamOptions) { super({ ...options, objectMode: true, }); + + this.end = end; + } + + public push(buffer: Buffer | null) { + if (buffer) { + if ( + this.end.behavior === EndBehaviorType.AfterInactivity || + (this.end.behavior === EndBehaviorType.AfterSilence && + (buffer.compare(SILENCE_FRAME) !== 0 || typeof this.endTimeout === 'undefined')) + ) { + this.renewEndTimeout(this.end); + } + } + + return super.push(buffer); + } + + private renewEndTimeout(end: EndBehavior & { duration: number }) { + if (this.endTimeout) { + clearTimeout(this.endTimeout); + } + this.endTimeout = setTimeout(() => this.push(null), end.duration); } // eslint-disable-next-line @typescript-eslint/no-empty-function diff --git a/src/receive/SSRCMap.ts b/src/receive/SSRCMap.ts index 94d96970..5601ee95 100644 --- a/src/receive/SSRCMap.ts +++ b/src/receive/SSRCMap.ts @@ -24,6 +24,7 @@ export interface VoiceUserData { * The events that an SSRCMap may emit. */ export interface SSRCMapEvents { + create: (newData: VoiceUserData) => Awaited; update: (oldData: VoiceUserData | undefined, newData: VoiceUserData) => Awaited; delete: (deletedData: VoiceUserData) => Awaited; } @@ -56,6 +57,7 @@ export class SSRCMap extends TypedEmitter { }; this.map.set(data.audioSSRC, newValue); + if (!existing) this.emit('create', newValue); this.emit('update', existing, newValue); } diff --git a/src/receive/SpeakingMap.ts b/src/receive/SpeakingMap.ts new file mode 100644 index 00000000..ba700f26 --- /dev/null +++ b/src/receive/SpeakingMap.ts @@ -0,0 +1,60 @@ +import { TypedEmitter } from 'tiny-typed-emitter'; +import { Awaited } from '../util/util'; + +/** + * The events that a SpeakingMap can emit + */ +export interface SpeakingMapEvents { + /** + * Emitted when a user starts speaking. + */ + start: (userId: string) => Awaited; + /** + * Emitted when a user stops speaking. + */ + end: (userId: string) => Awaited; +} + +/** + * Tracks the speaking states of users in a voice channel. + */ +export class SpeakingMap extends TypedEmitter { + /** + * The delay after a packet is received from a user until they're marked as not speaking anymore. + */ + public static readonly DELAY = 100; + /** + * The currently speaking users, mapped to the milliseconds since UNIX epoch at which they started speaking. + */ + public readonly users: Map; + + private readonly speakingTimeouts: Map; + + public constructor() { + super(); + this.users = new Map(); + this.speakingTimeouts = new Map(); + } + + public onPacket(userId: string) { + const timeout = this.speakingTimeouts.get(userId); + if (timeout) { + clearTimeout(timeout); + } else { + this.users.set(userId, Date.now()); + this.emit('start', userId); + } + this.startTimeout(userId); + } + + private startTimeout(userId: string) { + this.speakingTimeouts.set( + userId, + setTimeout(() => { + this.emit('end', userId); + this.speakingTimeouts.delete(userId); + this.users.delete(userId); + }, SpeakingMap.DELAY), + ); + } +} diff --git a/src/receive/VoiceReceiver.ts b/src/receive/VoiceReceiver.ts index aadd233c..0b5b2cf9 100644 --- a/src/receive/VoiceReceiver.ts +++ b/src/receive/VoiceReceiver.ts @@ -1,11 +1,13 @@ import { VoiceOpcodes } from 'discord-api-types/voice/v4'; -import { SILENCE_FRAME } from '../audio/AudioPlayer'; -import { ConnectionData, Networking, NetworkingState } from '../networking/Networking'; -import { VoiceUDPSocket } from '../networking/VoiceUDPSocket'; -import { VoiceWebSocket } from '../networking/VoiceWebSocket'; +import { ConnectionData } from '../networking/Networking'; import { methods } from '../util/Secretbox'; import type { VoiceConnection } from '../VoiceConnection'; -import { AudioReceiveStream } from './AudioReceiveStream'; +import { + AudioReceiveStream, + AudioReceiveStreamOptions, + createDefaultAudioReceiveStreamOptions, +} from './AudioReceiveStream'; +import { SpeakingMap } from './SpeakingMap'; import { SSRCMap } from './SSRCMap'; /** @@ -28,91 +30,37 @@ export class VoiceReceiver { /** * The current audio subscriptions of this receiver. */ - public readonly subscriptions: Map; + public readonly subscriptions: Map; /** - * The connection information for this receiver. Used to decrypt incoming packets. + * The connection data of the receiver. + * @internal */ - private connectionData: Partial; + public connectionData: Partial; + + /** + * The speaking map of the receiver. + */ + public readonly speaking: SpeakingMap; public constructor(voiceConnection: VoiceConnection) { this.voiceConnection = voiceConnection; this.ssrcMap = new SSRCMap(); + this.speaking = new SpeakingMap(); this.subscriptions = new Map(); this.connectionData = {}; - const onWsPacket = (packet: any) => this.onWsPacket(packet); - const onUdpMessage = (msg: Buffer) => this.onUdpMessage(msg); - - const applyConnectionData = (connectionData: Partial) => { - this.connectionData = { - ...this.connectionData, - ...connectionData, - }; - if (connectionData.packetsPlayed === 0) { - this.voiceConnection.playOpusPacket(SILENCE_FRAME); - } - }; - - // Bind listeners for updates - const onNetworkingChange = (oldState: NetworkingState, newState: NetworkingState) => { - const oldWs = Reflect.get(oldState, 'ws') as VoiceWebSocket | undefined; - const oldUdp = Reflect.get(oldState, 'udp') as VoiceUDPSocket | undefined; - const newWs = Reflect.get(newState, 'ws') as VoiceWebSocket | undefined; - const newUdp = Reflect.get(newState, 'udp') as VoiceUDPSocket | undefined; - - const connectionData = Reflect.get(newState, 'connectionData') as Partial | undefined; - if (connectionData) applyConnectionData(connectionData); - - if (newWs !== oldWs) { - oldWs?.off('packet', onWsPacket); - newWs?.on('packet', onWsPacket); - } - - if (newUdp !== oldUdp) { - oldUdp?.off('message', onUdpMessage); - newUdp?.on('message', onUdpMessage); - } - }; - - this.voiceConnection.on('stateChange', (oldState, newState) => { - const oldNetworking: Networking | undefined = Reflect.get(oldState, 'networking'); - const newNetworking: Networking | undefined = Reflect.get(newState, 'networking'); - - if (newNetworking !== oldNetworking) { - oldNetworking?.off('stateChange', onNetworkingChange); - newNetworking?.on('stateChange', onNetworkingChange); - if (newNetworking) { - const ws = Reflect.get(newNetworking.state, 'ws') as VoiceWebSocket | undefined; - const udp = Reflect.get(newNetworking.state, 'udp') as VoiceUDPSocket | undefined; - const connectionData = Reflect.get(newNetworking.state, 'connectionData') as - | Partial - | undefined; - ws?.on('packet', onWsPacket); - udp?.on('message', onUdpMessage); - if (connectionData) applyConnectionData(connectionData); - } - } - }); - - // Bind listeners for the existing state - const networking: Networking | undefined = Reflect.get(voiceConnection.state, 'networking'); - if (networking) { - const ws = Reflect.get(networking.state, 'ws') as VoiceWebSocket | undefined; - const udp = Reflect.get(networking.state, 'udp') as VoiceUDPSocket | undefined; - const connectionData = Reflect.get(networking.state, 'connectionData') as Partial | undefined; - ws?.on('packet', onWsPacket); - udp?.on('message', onUdpMessage); - if (connectionData) applyConnectionData(connectionData); - } + this.onWsPacket = this.onWsPacket.bind(this); + this.onUdpMessage = this.onUdpMessage.bind(this); } /** * Called when a packet is received on the attached connection's WebSocket. * * @param packet The received packet + * @internal */ - private onWsPacket(packet: any) { + public onWsPacket(packet: any) { if (packet.op === VoiceOpcodes.ClientDisconnect && typeof packet.d?.user_id === 'string') { this.ssrcMap.delete(packet.d.user_id); } else if ( @@ -190,16 +138,20 @@ export class VoiceReceiver { * Called when the UDP socket of the attached connection receives a message. * * @param msg The received message + * @internal */ - private onUdpMessage(msg: Buffer) { + public onUdpMessage(msg: Buffer) { if (msg.length <= 8) return; const ssrc = msg.readUInt32BE(8); - const stream = this.subscriptions.get(ssrc); - if (!stream) return; const userData = this.ssrcMap.get(ssrc); if (!userData) return; + this.speaking.onPacket(userData.userId); + + const stream = this.subscriptions.get(userData.userId); + if (!stream) return; + if (this.connectionData.encryptionMode && this.connectionData.nonceBuffer && this.connectionData.secretKey) { const packet = this.parsePacket( msg, @@ -216,36 +168,22 @@ export class VoiceReceiver { } /** - * Creates a subscription for the given target, specified either by their SSRC or user ID. + * Creates a subscription for the given user ID. * - * @param target The audio SSRC or user ID to subscribe to + * @param target The ID of the user to subscribe to * @returns A readable stream of Opus packets received from the target */ - public subscribe(target: string | number) { - const ssrc = this.ssrcMap.get(target)?.audioSSRC; - if (!ssrc) { - throw new Error(`No known SSRC for ${target}`); - } - - const existing = this.subscriptions.get(ssrc); + public subscribe(userId: string, options?: Partial) { + const existing = this.subscriptions.get(userId); if (existing) return existing; - const stream = new AudioReceiveStream(); - stream.once('close', () => this.subscriptions.delete(ssrc)); - this.subscriptions.set(ssrc, stream); + const stream = new AudioReceiveStream({ + ...createDefaultAudioReceiveStreamOptions(), + ...options, + }); + + stream.once('close', () => this.subscriptions.delete(userId)); + this.subscriptions.set(userId, stream); return stream; } } - -/** - * Creates a new voice receiver for the given voice connection. - * - * @param voiceConnection The voice connection to attach to - * @beta - * @remarks - * Voice receive is an undocumented part of the Discord API - voice receive is not guaranteed - * to be stable and may break without notice. - */ -export function createVoiceReceiver(voiceConnection: VoiceConnection) { - return new VoiceReceiver(voiceConnection); -} diff --git a/src/receive/__tests__/AudioReceiveStream.test.ts b/src/receive/__tests__/AudioReceiveStream.test.ts new file mode 100644 index 00000000..eb3ce5ea --- /dev/null +++ b/src/receive/__tests__/AudioReceiveStream.test.ts @@ -0,0 +1,71 @@ +import { SILENCE_FRAME } from '../../audio/AudioPlayer'; +import { AudioReceiveStream, EndBehaviorType } from '../AudioReceiveStream'; + +const DUMMY_BUFFER = Buffer.allocUnsafe(16); + +function wait(ms: number) { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +async function stepSilence(stream: AudioReceiveStream, increment: number) { + stream.push(SILENCE_FRAME); + await wait(increment); + expect(stream.readable).toBe(true); +} + +describe('AudioReceiveStream', () => { + test('Manual end behavior', async () => { + const stream = new AudioReceiveStream({ end: { behavior: EndBehaviorType.Manual } }); + stream.push(DUMMY_BUFFER); + expect(stream.readable).toBe(true); + await wait(200); + stream.push(DUMMY_BUFFER); + expect(stream.readable).toBe(true); + }); + + test('AfterSilence end behavior', async () => { + const duration = 100; + const increment = 20; + + const stream = new AudioReceiveStream({ end: { behavior: EndBehaviorType.AfterSilence, duration: 100 } }); + stream.resume(); + + for (let i = increment; i < duration / 2; i += increment) { + await stepSilence(stream, increment); + } + + stream.push(DUMMY_BUFFER); + + for (let i = increment; i < duration; i += increment) { + await stepSilence(stream, increment); + } + + await wait(increment); + expect(stream.readableEnded).toBe(true); + }); + + test('AfterInactivity end behavior', async () => { + const duration = 100; + const increment = 20; + + const stream = new AudioReceiveStream({ end: { behavior: EndBehaviorType.AfterInactivity, duration: 100 } }); + stream.resume(); + + for (let i = increment; i < duration / 2; i += increment) { + await stepSilence(stream, increment); + } + + stream.push(DUMMY_BUFFER); + + for (let i = increment; i < duration; i += increment) { + await stepSilence(stream, increment); + } + + await wait(increment); + expect(stream.readableEnded).toBe(false); + + await wait(duration - increment); + + expect(stream.readableEnded).toBe(true); + }); +}); diff --git a/src/receive/__tests__/SpeakingMap.test.ts b/src/receive/__tests__/SpeakingMap.test.ts new file mode 100644 index 00000000..cb867c83 --- /dev/null +++ b/src/receive/__tests__/SpeakingMap.test.ts @@ -0,0 +1,32 @@ +import { noop } from '../../util/util'; +import { SpeakingMap } from '../SpeakingMap'; + +jest.useFakeTimers(); + +describe('SpeakingMap', () => { + test('Emits start and end', () => { + const speaking = new SpeakingMap(); + const userId = '123'; + + const starts: string[] = []; + const ends: string[] = []; + + speaking.on('start', (userId) => void starts.push(userId)); + speaking.on('end', (userId) => void ends.push(userId)); + + for (let i = 0; i < 10; i++) { + speaking.onPacket(userId); + setTimeout(noop, SpeakingMap.DELAY / 2); + jest.advanceTimersToNextTimer(); + + expect(starts).toEqual([userId]); + expect(ends).toEqual([]); + } + jest.advanceTimersToNextTimer(); + expect(ends).toEqual([userId]); + + speaking.onPacket(userId); + jest.advanceTimersToNextTimer(); + expect(starts).toEqual([userId, userId]); + }); +}); diff --git a/src/receive/__tests__/VoiceReceiver.test.ts b/src/receive/__tests__/VoiceReceiver.test.ts index 4fce40ca..d06ff536 100644 --- a/src/receive/__tests__/VoiceReceiver.test.ts +++ b/src/receive/__tests__/VoiceReceiver.test.ts @@ -2,20 +2,33 @@ import { VoiceReceiver } from '../VoiceReceiver'; import { VoiceConnection as _VoiceConnection, VoiceConnectionStatus } from '../../VoiceConnection'; import { RTP_PACKET_DESKTOP, RTP_PACKET_CHROME, RTP_PACKET_ANDROID } from './fixtures/rtp'; -import EventEmitter, { once } from 'events'; -import { createVoiceReceiver } from '..'; +import { once } from 'events'; import { VoiceOpcodes } from 'discord-api-types/voice/v4'; -import * as fixtures from './fixtures/states'; +import { methods } from '../../util/Secretbox'; jest.mock('../../VoiceConnection'); jest.mock('../SSRCMap'); +const openSpy = jest.spyOn(methods, 'open'); + +openSpy.mockImplementation((buffer) => buffer); + const VoiceConnection = _VoiceConnection as unknown as jest.Mocked; function nextTick() { return new Promise((resolve) => process.nextTick(resolve)); } +function* rangeIter(start: number, end: number) { + for (let i = start; i <= end; i++) { + yield i; + } +} + +function range(start: number, end: number) { + return Buffer.from([...rangeIter(start, end)]); +} + describe('VoiceReceiver', () => { let voiceConnection: _VoiceConnection; let receiver: VoiceReceiver; @@ -25,7 +38,7 @@ describe('VoiceReceiver', () => { voiceConnection.state = { status: VoiceConnectionStatus.Signalling, } as any; - receiver = createVoiceReceiver(voiceConnection); + receiver = new VoiceReceiver(voiceConnection); receiver['connectionData'] = { encryptionMode: 'dummy', nonceBuffer: Buffer.alloc(0), @@ -46,7 +59,7 @@ describe('VoiceReceiver', () => { userId: '123', })); - const stream = receiver.subscribe(RTP_PACKET.ssrc); + const stream = receiver.subscribe('123'); receiver['onUdpMessage'](RTP_PACKET.packet); await nextTick(); @@ -66,7 +79,7 @@ describe('VoiceReceiver', () => { userId: '123', })); - const stream = receiver.subscribe(RTP_PACKET_DESKTOP.ssrc); + const stream = receiver.subscribe('123'); const errorEvent = once(stream, 'error'); @@ -83,12 +96,8 @@ describe('VoiceReceiver', () => { userId: '123', })); - const stream = receiver.subscribe(RTP_PACKET_DESKTOP.ssrc); - expect(receiver.subscribe(RTP_PACKET_DESKTOP.ssrc)).toBe(stream); - }); - - test('subscribe: refuses unknown SSRC or user IDs', () => { - expect(() => receiver.subscribe(RTP_PACKET_DESKTOP.ssrc)).toThrow(); + const stream = receiver.subscribe('123'); + expect(receiver.subscribe('123')).toBe(stream); }); describe('onWsPacket', () => { @@ -149,74 +158,51 @@ describe('VoiceReceiver', () => { }); }); }); -}); - -test('Receiver tracks state changes', () => { - const voiceConnection: any = new EventEmitter(); - voiceConnection.playOpusPacket = jest.fn(); - - voiceConnection.state = fixtures.state1; - - const receiver = createVoiceReceiver(voiceConnection); - - const onWsPacketSpy = jest.fn(); - receiver['onWsPacket'] = onWsPacketSpy; - - const onUdpMessageSpy = jest.fn(); - receiver['onUdpMessage'] = onUdpMessageSpy; - const networking1 = fixtures.state2.vc.networking; - voiceConnection.state = fixtures.state2.vc; + describe('decrypt', () => { + const secretKey = new Uint8Array([1, 2, 3, 4]); - voiceConnection.emit('stateChange', fixtures.state1.vc, voiceConnection.state); - fixtures.state2.networking.ws.emit('packet', Symbol('message')); - expect(onWsPacketSpy).toHaveBeenCalled(); - onWsPacketSpy.mockClear(); - - networking1.state = fixtures.state3.networking; - networking1.emit('stateChange', fixtures.state2.networking, fixtures.state3.networking); - - fixtures.state3.networking.ws.emit('packet', Symbol('message')); - expect(onWsPacketSpy).toHaveBeenCalled(); - - fixtures.state3.networking.udp.emit('message', Symbol('message')); - expect(onUdpMessageSpy).toHaveBeenCalled(); - - onWsPacketSpy.mockClear(); - onUdpMessageSpy.mockClear(); - - voiceConnection.state = fixtures.state4.vc; - expect(voiceConnection.playOpusPacket).not.toHaveBeenCalled(); - - voiceConnection.emit('stateChange', fixtures.state3.vc, voiceConnection.state); - - expect(voiceConnection.playOpusPacket).toHaveBeenCalled(); + beforeEach(() => { + openSpy.mockClear(); + }); - fixtures.state4.networking.ws.emit('packet', Symbol('message')); - expect(onWsPacketSpy).toHaveBeenCalled(); + test('decrypt: xsalsa20_poly1305_lite', () => { + // Arrange + const buffer = range(1, 32); + const nonce = Buffer.alloc(4); - fixtures.state4.networking.udp.emit('message', Symbol('message')); - expect(onUdpMessageSpy).toHaveBeenCalled(); -}); + // Act + const decrypted = receiver['decrypt'](buffer, 'xsalsa20_poly1305_lite', nonce, secretKey); -test('Receiver binds to immediately ready voice connection', () => { - const voiceConnection: any = new EventEmitter(); - voiceConnection.state = fixtures.state4.vc; - voiceConnection.playOpusPacket = jest.fn(); + // Assert + expect(nonce.equals(range(29, 32))).toBe(true); + expect(decrypted.equals(range(13, 28))).toBe(true); + }); - const receiver = createVoiceReceiver(voiceConnection); + test('decrypt: xsalsa20_poly1305_suffix', () => { + // Arrange + const buffer = range(1, 64); + const nonce = Buffer.alloc(24); - expect(voiceConnection.playOpusPacket).toHaveBeenCalled(); + // Act + const decrypted = receiver['decrypt'](buffer, 'xsalsa20_poly1305_suffix', nonce, secretKey); - const onWsPacketSpy = jest.fn(); - receiver['onWsPacket'] = onWsPacketSpy; + // Assert + expect(nonce.equals(range(41, 64))).toBe(true); + expect(decrypted.equals(range(13, 40))).toBe(true); + }); - const onUdpMessageSpy = jest.fn(); - receiver['onUdpMessage'] = onUdpMessageSpy; + test('decrypt: xsalsa20_poly1305', () => { + // Arrange + const buffer = range(1, 64); + const nonce = Buffer.alloc(12); - fixtures.state4.networking.ws.emit('packet', Symbol('message')); - expect(onWsPacketSpy).toHaveBeenCalled(); + // Act + const decrypted = receiver['decrypt'](buffer, 'xsalsa20_poly1305', nonce, secretKey); - fixtures.state4.networking.udp.emit('message', Symbol('message')); - expect(onUdpMessageSpy).toHaveBeenCalled(); + // Assert + expect(nonce.equals(range(1, 12))).toBe(true); + expect(decrypted.equals(range(13, 64))).toBe(true); + }); + }); }); diff --git a/src/receive/__tests__/fixtures/states.ts b/src/receive/__tests__/fixtures/states.ts deleted file mode 100644 index c1376e0b..00000000 --- a/src/receive/__tests__/fixtures/states.ts +++ /dev/null @@ -1,70 +0,0 @@ -import EventEmitter from 'events'; -import { NetworkingState, NetworkingStatusCode } from '../../../networking'; -import { VoiceConnectionState, VoiceConnectionStatus } from '../../../VoiceConnection'; - -// State 1 -const vcState1: VoiceConnectionState = { - status: VoiceConnectionStatus.Signalling, - adapter: {} as any, -}; - -export const state1 = { - vc: vcState1, -}; - -// State 2 -const networkingState2: NetworkingState = { - code: NetworkingStatusCode.Identifying, - connectionOptions: {} as any, - ws: new EventEmitter() as any, -}; -const networking1 = new EventEmitter() as any; -networking1.state = networkingState2; -const vcState2: VoiceConnectionState = { - status: VoiceConnectionStatus.Ready, - networking: networking1, - adapter: {} as any, -}; - -export const state2 = { - vc: vcState2, - networking: networkingState2, -}; - -// State 3 -const networkingState3 = { - code: NetworkingStatusCode.Ready, - connectionData: {} as any, - connectionOptions: {} as any, - udp: new EventEmitter() as any, - ws: new EventEmitter() as any, -}; - -export const state3 = { - vc: vcState2, - networking: networkingState3, -}; - -// State 4 -const networking2 = new EventEmitter() as any; -const networkingState4 = { - code: NetworkingStatusCode.Ready, - connectionData: { - packetsPlayed: 0, - } as any, - connectionOptions: {} as any, - udp: new EventEmitter() as any, - ws: new EventEmitter() as any, -}; -networking2.state = networkingState4; - -const vcState4: VoiceConnectionState = { - status: VoiceConnectionStatus.Ready, - networking: networking2, - adapter: {} as any, -}; - -export const state4 = { - vc: vcState4, - networking: networkingState4, -};