Skip to content

Commit

Permalink
Merge branches 'toger5/local-echo' and 'af/rtc-handle-ratelimiting' i…
Browse files Browse the repository at this point in the history
…nto element-call-nov-preview
  • Loading branch information
robintown committed Nov 9, 2024
2 parents e6fa25d + b96ae12 commit 6971e7b
Showing 1 changed file with 64 additions and 30 deletions.
94 changes: 64 additions & 30 deletions src/matrixrtc/MatrixRTCSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import { randomString, secureRandomBase64Url } from "../randomstring.ts";
import { EncryptionKeysEventContent } from "./types.ts";
import { decodeBase64, encodeUnpaddedBase64 } from "../base64.ts";
import { KnownMembership } from "../@types/membership.ts";
import { MatrixError, safeGetRetryAfterMs } from "../http-api/errors.ts";
import { HTTPError, MatrixError, safeGetRetryAfterMs } from "../http-api/errors.ts";
import { MatrixEvent } from "../models/event.ts";
import { isLivekitFocusActive } from "./LivekitFocus.ts";
import { ExperimentalGroupCallRoomMemberState } from "../webrtc/groupCall.ts";
Expand Down Expand Up @@ -1031,39 +1031,39 @@ export class MatrixRTCSession extends TypedEventEmitter<MatrixRTCSessionEvent, M
const prepareDelayedDisconnection = async (): Promise<void> => {
try {
// TODO: If delayed event times out, re-join!
const res = await this.client._unstable_sendDelayedStateEvent(
this.room.roomId,
{
delay: 8000,
},
EventType.GroupCallMemberPrefix,
{}, // leave event
stateKey,
const res = await resendIfRateLimited(() =>
this.client._unstable_sendDelayedStateEvent(
this.room.roomId,
{
delay: 8000,
},
EventType.GroupCallMemberPrefix,
{}, // leave event
stateKey,
),
);
this.disconnectDelayId = res.delay_id;
} catch (e) {
// TODO: Retry if rate-limited
logger.error("Failed to prepare delayed disconnection event:", e);
}
};
await prepareDelayedDisconnection();
// Send join event _after_ preparing the delayed disconnection event
await this.client.sendStateEvent(
this.room.roomId,
EventType.GroupCallMemberPrefix,
newContent,
stateKey,
await resendIfRateLimited(() =>
this.client.sendStateEvent(this.room.roomId, EventType.GroupCallMemberPrefix, newContent, stateKey),
);
// If sending state cancels your own delayed state, prepare another delayed state
// TODO: Remove this once MSC4140 is stable & doesn't cancel own delayed state
if (this.disconnectDelayId !== undefined) {
try {
await this.client._unstable_updateDelayedEvent(
this.disconnectDelayId,
UpdateDelayedEventAction.Restart,
const knownDisconnectDelayId = this.disconnectDelayId;
await resendIfRateLimited(() =>
this.client._unstable_updateDelayedEvent(
knownDisconnectDelayId,
UpdateDelayedEventAction.Restart,
),
);
} catch (e) {
// TODO: Make embedded client include errcode, and retry only if not M_NOT_FOUND (or rate-limited)
logger.warn("Failed to update delayed disconnection event, prepare it again:", e);
this.disconnectDelayId = undefined;
await prepareDelayedDisconnection();
Expand All @@ -1076,23 +1076,27 @@ export class MatrixRTCSession extends TypedEventEmitter<MatrixRTCSessionEvent, M
let sentDelayedDisconnect = false;
if (this.disconnectDelayId !== undefined) {
try {
await this.client._unstable_updateDelayedEvent(
this.disconnectDelayId,
UpdateDelayedEventAction.Send,
const knownDisconnectDelayId = this.disconnectDelayId;
await resendIfRateLimited(() =>
this.client._unstable_updateDelayedEvent(
knownDisconnectDelayId,
UpdateDelayedEventAction.Send,
),
);
sentDelayedDisconnect = true;
} catch (e) {
// TODO: Retry if rate-limited
logger.error("Failed to send our delayed disconnection event:", e);
}
this.disconnectDelayId = undefined;
}
if (!sentDelayedDisconnect) {
await this.client.sendStateEvent(
this.room.roomId,
EventType.GroupCallMemberPrefix,
{},
this.makeMembershipStateKey(localUserId, localDeviceId),
await resendIfRateLimited(() =>
this.client.sendStateEvent(
this.room.roomId,
EventType.GroupCallMemberPrefix,
{},
this.makeMembershipStateKey(localUserId, localDeviceId),
),
);
}
}
Expand All @@ -1111,10 +1115,12 @@ export class MatrixRTCSession extends TypedEventEmitter<MatrixRTCSessionEvent, M

private readonly delayDisconnection = async (): Promise<void> => {
try {
await this.client._unstable_updateDelayedEvent(this.disconnectDelayId!, UpdateDelayedEventAction.Restart);
const knownDisconnectDelayId = this.disconnectDelayId!;
await resendIfRateLimited(() =>
this.client._unstable_updateDelayedEvent(knownDisconnectDelayId, UpdateDelayedEventAction.Restart),
);
this.scheduleDelayDisconnection();
} catch (e) {
// TODO: Retry if rate-limited
logger.error("Failed to delay our disconnection event:", e);
}
};
Expand Down Expand Up @@ -1162,3 +1168,31 @@ export class MatrixRTCSession extends TypedEventEmitter<MatrixRTCSessionEvent, M
this.sendEncryptionKeysEvent(newKeyIndex);
};
}

async function resendIfRateLimited<T>(func: () => Promise<T>, numRetriesAllowed: number = 1): Promise<T> {
// eslint-disable-next-line no-constant-condition
while (true) {
try {
return await func();
} catch (e) {
if (numRetriesAllowed > 0 && e instanceof HTTPError && e.isRateLimitError()) {
numRetriesAllowed--;
let resendDelay: number;
const defaultMs = 5000;
try {
resendDelay = e.getRetryAfterMs() ?? defaultMs;
logger.info(`Rate limited by server, retrying in ${resendDelay}ms`);
} catch (e) {
logger.warn(
`Error while retrieving a rate-limit retry delay, retrying after default delay of ${defaultMs}`,
e,
);
resendDelay = defaultMs;
}
await sleep(resendDelay);
} else {
throw e;
}
}
}
}

0 comments on commit 6971e7b

Please sign in to comment.