diff --git a/src/matrixrtc/MatrixRTCSession.ts b/src/matrixrtc/MatrixRTCSession.ts index 52258ea53a2..a317495a95d 100644 --- a/src/matrixrtc/MatrixRTCSession.ts +++ b/src/matrixrtc/MatrixRTCSession.ts @@ -34,7 +34,7 @@ import { randomString, secureRandomBase64Url } from "../randomstring.ts"; import { EncryptionKeysEventContent } from "./types.ts"; import { decodeBase64, encodeUnpaddedBase64 } from "../base64.ts"; import { KnownMembership } from "../@types/membership.ts"; -import { MatrixError, safeGetRetryAfterMs } from "../http-api/errors.ts"; +import { HTTPError, MatrixError, safeGetRetryAfterMs } from "../http-api/errors.ts"; import { MatrixEvent } from "../models/event.ts"; import { isLivekitFocusActive } from "./LivekitFocus.ts"; import { ExperimentalGroupCallRoomMemberState } from "../webrtc/groupCall.ts"; @@ -1031,39 +1031,39 @@ export class MatrixRTCSession extends TypedEventEmitter => { try { // TODO: If delayed event times out, re-join! - const res = await this.client._unstable_sendDelayedStateEvent( - this.room.roomId, - { - delay: 8000, - }, - EventType.GroupCallMemberPrefix, - {}, // leave event - stateKey, + const res = await resendIfRateLimited(() => + this.client._unstable_sendDelayedStateEvent( + this.room.roomId, + { + delay: 8000, + }, + EventType.GroupCallMemberPrefix, + {}, // leave event + stateKey, + ), ); this.disconnectDelayId = res.delay_id; } catch (e) { - // TODO: Retry if rate-limited logger.error("Failed to prepare delayed disconnection event:", e); } }; await prepareDelayedDisconnection(); // Send join event _after_ preparing the delayed disconnection event - await this.client.sendStateEvent( - this.room.roomId, - EventType.GroupCallMemberPrefix, - newContent, - stateKey, + await resendIfRateLimited(() => + this.client.sendStateEvent(this.room.roomId, EventType.GroupCallMemberPrefix, newContent, stateKey), ); // If sending state cancels your own delayed state, prepare another delayed state // TODO: Remove this once MSC4140 is stable & doesn't cancel own delayed state if (this.disconnectDelayId !== undefined) { try { - await this.client._unstable_updateDelayedEvent( - this.disconnectDelayId, - UpdateDelayedEventAction.Restart, + const knownDisconnectDelayId = this.disconnectDelayId; + await resendIfRateLimited(() => + this.client._unstable_updateDelayedEvent( + knownDisconnectDelayId, + UpdateDelayedEventAction.Restart, + ), ); } catch (e) { - // TODO: Make embedded client include errcode, and retry only if not M_NOT_FOUND (or rate-limited) logger.warn("Failed to update delayed disconnection event, prepare it again:", e); this.disconnectDelayId = undefined; await prepareDelayedDisconnection(); @@ -1076,23 +1076,27 @@ export class MatrixRTCSession extends TypedEventEmitter + this.client._unstable_updateDelayedEvent( + knownDisconnectDelayId, + UpdateDelayedEventAction.Send, + ), ); sentDelayedDisconnect = true; } catch (e) { - // TODO: Retry if rate-limited logger.error("Failed to send our delayed disconnection event:", e); } this.disconnectDelayId = undefined; } if (!sentDelayedDisconnect) { - await this.client.sendStateEvent( - this.room.roomId, - EventType.GroupCallMemberPrefix, - {}, - this.makeMembershipStateKey(localUserId, localDeviceId), + await resendIfRateLimited(() => + this.client.sendStateEvent( + this.room.roomId, + EventType.GroupCallMemberPrefix, + {}, + this.makeMembershipStateKey(localUserId, localDeviceId), + ), ); } } @@ -1111,10 +1115,12 @@ export class MatrixRTCSession extends TypedEventEmitter => { try { - await this.client._unstable_updateDelayedEvent(this.disconnectDelayId!, UpdateDelayedEventAction.Restart); + const knownDisconnectDelayId = this.disconnectDelayId!; + await resendIfRateLimited(() => + this.client._unstable_updateDelayedEvent(knownDisconnectDelayId, UpdateDelayedEventAction.Restart), + ); this.scheduleDelayDisconnection(); } catch (e) { - // TODO: Retry if rate-limited logger.error("Failed to delay our disconnection event:", e); } }; @@ -1162,3 +1168,31 @@ export class MatrixRTCSession extends TypedEventEmitter(func: () => Promise, numRetriesAllowed: number = 1): Promise { + // eslint-disable-next-line no-constant-condition + while (true) { + try { + return await func(); + } catch (e) { + if (numRetriesAllowed > 0 && e instanceof HTTPError && e.isRateLimitError()) { + numRetriesAllowed--; + let resendDelay: number; + const defaultMs = 5000; + try { + resendDelay = e.getRetryAfterMs() ?? defaultMs; + logger.info(`Rate limited by server, retrying in ${resendDelay}ms`); + } catch (e) { + logger.warn( + `Error while retrieving a rate-limit retry delay, retrying after default delay of ${defaultMs}`, + e, + ); + resendDelay = defaultMs; + } + await sleep(resendDelay); + } else { + throw e; + } + } + } +}