From 0f8e8d98ca285c1982a2b0688fd1e063d84c345a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Vitor=20de=20Lima=20Matos?= Date: Fri, 25 Sep 2020 12:52:08 -0300 Subject: [PATCH 1/9] sdk: implement toDevice messages capability --- raiden-ts/src/config.ts | 1 + raiden-ts/src/constants.ts | 1 + raiden-ts/src/transport/epics/helpers.ts | 23 ++++++++++--- raiden-ts/src/transport/epics/messages.ts | 41 +++++++++++++---------- raiden-ts/src/transport/epics/rooms.ts | 37 ++++++++++---------- raiden-ts/src/transport/epics/webrtc.ts | 6 ++-- 6 files changed, 66 insertions(+), 43 deletions(-) diff --git a/raiden-ts/src/config.ts b/raiden-ts/src/config.ts index ed92d0a7d9..c43960f196 100644 --- a/raiden-ts/src/config.ts +++ b/raiden-ts/src/config.ts @@ -135,6 +135,7 @@ export function makeDefaultConfig( [Capabilities.NO_DELIVERY]: true, [Capabilities.NO_MEDIATE]: true, [Capabilities.WEBRTC]: true, + [Capabilities.TO_DEVICE]: true, }, fallbackIceServers: [{ urls: 'stun:stun.l.google.com:19302' }], rateToSvt: {}, diff --git a/raiden-ts/src/constants.ts b/raiden-ts/src/constants.ts index 99f6981f09..aab795430d 100644 --- a/raiden-ts/src/constants.ts +++ b/raiden-ts/src/constants.ts @@ -18,4 +18,5 @@ export enum Capabilities { NO_MEDIATE = 'noMediate', // can't mediate transfers; mediating requires receiving NO_DELIVERY = 'noDelivery', // don't need Delivery messages WEBRTC = 'webRTC', // use WebRTC channels for p2p messaging + TO_DEVICE = 'toDevice', // use ToDevice messages instead of rooms } diff --git a/raiden-ts/src/transport/epics/helpers.ts b/raiden-ts/src/transport/epics/helpers.ts index cedb6350be..acb1b190f3 100644 --- a/raiden-ts/src/transport/epics/helpers.ts +++ b/raiden-ts/src/transport/epics/helpers.ts @@ -14,6 +14,7 @@ import { import curry from 'lodash/curry'; import { Room, MatrixClient, EventType, RoomMember, MatrixEvent } from 'matrix-js-sdk'; +import { Capabilities } from '../../constants'; import { RaidenConfig } from '../../config'; import { RaidenEpicDeps } from '../../types'; import { isntNil, Address, Signed } from '../../utils/types'; @@ -148,15 +149,29 @@ export function waitMemberAndSend$( ), pluck('rtc', address), ), + // if available and Capabilities.TO_DEVICE enabled on both ends, use ToDevice messages + latest$.pipe( + pluck('presences', address), + withLatestFrom(config$), + filter( + ([presence, { caps }]) => + !!caps?.[Capabilities.TO_DEVICE] && + !!presence?.payload?.available && + !!presence.payload.caps?.[Capabilities.TO_DEVICE], + ), + pluck('0', 'payload', 'userId'), + ), waitMember$(matrix, address, { latest$ }), ).pipe( take(1), // use first room/user which meets all requirements/filters above mergeMap((via) => - defer>( + defer>( async () => - typeof via === 'string' - ? matrix.sendEvent(via, type, content, '') // via room - : via.send(content.body), // via RTC channel + typeof via !== 'string' + ? via.send(content.body) // via RTC channel + : via.startsWith('@') + ? matrix.sendToDevice(type, { [via]: { '*': content } }) // via toDevice message + : matrix.sendEvent(via, type, content, ''), // via room ).pipe( // this returned value is just for notification, and shouldn't be relayed on; // all functionality is provided as side effects of the subscription diff --git a/raiden-ts/src/transport/epics/messages.ts b/raiden-ts/src/transport/epics/messages.ts index cf559f9835..13ed303846 100644 --- a/raiden-ts/src/transport/epics/messages.ts +++ b/raiden-ts/src/transport/epics/messages.ts @@ -1,4 +1,4 @@ -import { Observable, of, EMPTY, fromEvent, timer, defer, from } from 'rxjs'; +import { Observable, of, EMPTY, fromEvent, timer, defer, from, merge } from 'rxjs'; import { catchError, concatMap, @@ -13,9 +13,8 @@ import { takeUntil, tap, } from 'rxjs/operators'; -import find from 'lodash/find'; -import { MatrixClient, MatrixEvent, Room } from 'matrix-js-sdk'; +import { MatrixEvent, Room } from 'matrix-js-sdk'; import { Capabilities } from '../../constants'; import { Signed } from '../../utils/types'; @@ -170,10 +169,13 @@ export const matrixMessageReceivedEpic = ( matrix$.pipe( // when matrix finishes initialization, register to matrix timeline events switchMap((matrix) => - fromEvent<{ event: MatrixEvent; room: Room; matrix: MatrixClient }>( - matrix, - 'Room.timeline', - (event, room) => ({ matrix, event, room }), + merge( + fromEvent<[MatrixEvent, Room]>(matrix, 'Room.timeline').pipe( + map(([event, room]) => ({ matrix, event, room })), + ), + fromEvent(matrix, 'toDeviceEvent').pipe( + map((event) => ({ matrix, event, room: undefined })), + ), ), ), withLatestFrom(config$), @@ -183,27 +185,32 @@ export const matrixMessageReceivedEpic = ( event.getType() === 'm.room.message' && event.event?.content?.msgtype === 'm.text' && event.getSender() !== matrix.getUserId() && - !globalRoomNames(config).some((g) => - // generate an alias for global room of given name, and check if room matches - roomMatch(`#${g}:${getServerName(matrix.getHomeserverUrl())}`, room), - ), + ((room && + !globalRoomNames(config).some((g) => + // generate an alias for global room of given name, and check if room matches + roomMatch(`#${g}:${getServerName(matrix.getHomeserverUrl())}`, room), + )) || + (!room && !!config.caps?.[Capabilities.TO_DEVICE])), // toDevice message ), mergeMap(([{ event, room }, { httpTimeout }]) => latest$.pipe( filter(({ presences, state }) => { - const presence = find(presences, ['payload.userId', event.getSender()]); + const presence = Object.values(presences).find( + (presence) => presence.payload.userId === event.getSender(), + ); if (!presence) return false; const rooms = state.transport.rooms?.[presence.meta.address] ?? []; - if (!rooms.includes(room.roomId)) return false; - return true; + return !room || rooms.includes(room.roomId); }), take(1), // take up to an arbitrary timeout to presence status for the sender // AND the room in which this message was sent to be in sender's address room queue takeUntil(timer(httpTimeout)), mergeMap(function* ({ presences }) { - const presence = find(presences, ['payload.userId', event.getSender()])!; - for (const line of (event.event.content.body || '').split('\n')) { + const presence = Object.values(presences).find( + (presence) => presence.payload.userId === event.getSender(), + )!; + for (const line of (event.getContent().body ?? '').split('\n')) { const message = parseMessage(line, presence.meta.address, { log }); yield messageReceived( { @@ -211,7 +218,7 @@ export const matrixMessageReceivedEpic = ( message, ts: event.event.origin_server_ts ?? Date.now(), userId: presence.payload.userId, - roomId: room.roomId, + ...(room ? { roomId: room.roomId } : {}), }, presence.meta, ); diff --git a/raiden-ts/src/transport/epics/rooms.ts b/raiden-ts/src/transport/epics/rooms.ts index 4d820c411a..5bebdf0aa5 100644 --- a/raiden-ts/src/transport/epics/rooms.ts +++ b/raiden-ts/src/transport/epics/rooms.ts @@ -19,11 +19,12 @@ import { exhaustMap, distinct, delayWhen, + pluck, } from 'rxjs/operators'; -import find from 'lodash/find'; import { MatrixClient, MatrixEvent, Room, RoomMember } from 'matrix-js-sdk'; +import { Capabilities } from '../../constants'; import { Address, isntNil } from '../../utils/types'; import { isActionOf } from '../../utils/actions'; import { RaidenEpicDeps } from '../../types'; @@ -82,11 +83,8 @@ function inviteLoop$( repeatWhen((completed$) => completed$.pipe(delay(httpTimeout))), takeUntil( // stop repeat+defer loop above when user joins - fromEvent( - matrix, - 'RoomMember.membership', - ({}: MatrixEvent, member: RoomMember) => member, - ).pipe( + fromEvent<[MatrixEvent, RoomMember]>(matrix, 'RoomMember.membership').pipe( + pluck(1), filter( (member) => member.roomId === roomId && @@ -164,6 +162,12 @@ export const matrixCreateRoomEpic = ( // wait for user to be monitored filter(({ presences }) => address in presences), take(1), + // skip room creation/invite if both partner and us have ToDevice capability set + filter( + ({ presences, config }) => + !config.caps?.[Capabilities.TO_DEVICE] || + !presences[address].payload.caps?.[Capabilities.TO_DEVICE], + ), // if there's already a room in state for address, skip filter(({ state }) => !state.transport.rooms?.[address]?.[0]), // else, create a room, invite known user and persist roomId in state @@ -223,11 +227,8 @@ export const matrixInviteEpic = ( !roomId ? EMPTY : // re-trigger invite loop if user leaves - fromEvent( - matrix, - 'RoomMember.membership', - ({}: MatrixEvent, member: RoomMember) => member, - ).pipe( + fromEvent<[MatrixEvent, RoomMember]>(matrix, 'RoomMember.membership').pipe( + pluck(1), filter( (member) => member.roomId === roomId && @@ -273,10 +274,8 @@ export const matrixHandleInvitesEpic = ( matrix$.pipe( // when matrix finishes initialization, register to matrix invite events switchMap((matrix) => - fromEvent<{ event: MatrixEvent; member: RoomMember; matrix: MatrixClient }>( - matrix, - 'RoomMember.membership', - (event, member) => ({ event, member, matrix }), + fromEvent<[MatrixEvent, RoomMember]>(matrix, 'RoomMember.membership').pipe( + map(([event, member]) => ({ event, member, matrix })), ), ), filter( @@ -289,7 +288,7 @@ export const matrixHandleInvitesEpic = ( const sender = event.getSender(), senderPresence$ = latest$.pipe( pluckDistinct('presences'), - map((presences) => find(presences, (p) => p.payload.userId === sender)), + map((presences) => Object.values(presences).find((p) => p.payload.userId === sender)), filter(isntNil), take(1), // Don't wait more than some arbitrary time for this sender presence update to show @@ -417,10 +416,8 @@ export const matrixCleanLeftRoomsEpic = ( matrix$.pipe( // when matrix finishes initialization, register to matrix invite events switchMap((matrix) => - fromEvent<{ room: Room; membership: string; matrix: MatrixClient }>( - matrix, - 'Room.myMembership', - (room, membership) => ({ room, membership, matrix }), + fromEvent<[Room, string]>(matrix, 'Room.myMembership').pipe( + map(([room, membership]) => ({ room, membership, matrix })), ), ), // filter for leave events to us diff --git a/raiden-ts/src/transport/epics/webrtc.ts b/raiden-ts/src/transport/epics/webrtc.ts index 8b8e87f382..86477577df 100644 --- a/raiden-ts/src/transport/epics/webrtc.ts +++ b/raiden-ts/src/transport/epics/webrtc.ts @@ -135,8 +135,10 @@ function matrixWebrtcEvents$( type: T, sender: string, ) { - return fromEvent<[MatrixEvent]>(matrix, 'Room.timeline').pipe( - pluck(0), + return merge( + fromEvent<[MatrixEvent]>(matrix, 'Room.timeline').pipe(pluck(0)), + fromEvent(matrix, 'toDeviceEvent'), + ).pipe( filter( (event): event is ExtMatrixEvent => event.getType() === 'm.room.message' && From e946bb67ddd81646dbc306633ce3663458adc3d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Vitor=20de=20Lima=20Matos?= Date: Tue, 6 Oct 2020 09:41:18 -0300 Subject: [PATCH 2/9] sdk: some types improvements --- raiden-ts/src/messages/actions.ts | 2 +- raiden-ts/src/polyfills.ts | 7 +++++++ raiden-ts/src/raiden.ts | 2 +- 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/raiden-ts/src/messages/actions.ts b/raiden-ts/src/messages/actions.ts index 650a6f2f3a..aeb28719de 100644 --- a/raiden-ts/src/messages/actions.ts +++ b/raiden-ts/src/messages/actions.ts @@ -12,7 +12,7 @@ export const messageSend = createAsyncAction( 'message/send/success', 'message/send/failure', t.type({ message: t.union([t.string, Signed(Message)]) }), - t.union([t.undefined, t.type({ via: t.string })]), + t.union([t.undefined, t.partial({ via: t.string })]), ); export namespace messageSend { export interface request extends ActionType {} diff --git a/raiden-ts/src/polyfills.ts b/raiden-ts/src/polyfills.ts index 080dbeb3b2..21cd4e8e34 100644 --- a/raiden-ts/src/polyfills.ts +++ b/raiden-ts/src/polyfills.ts @@ -9,6 +9,13 @@ import { logger as matrixLogger } from 'matrix-js-sdk/lib/logger'; Object.assign(logging, { methodFactory }); // revert matrixLogger.setLevel(logging.levels.DEBUG); // apply +declare module 'matrix-js-sdk' { + // augment MatrixEvent interface/class + export interface MatrixEvent { + getContent(): any; // eslint-disable-line @typescript-eslint/no-explicit-any + } +} + // request.abort() is called when shutting down matrix; this patch clears some timeouts left behind import { getRequest, request } from 'matrix-js-sdk'; const origRequest = getRequest(); diff --git a/raiden-ts/src/raiden.ts b/raiden-ts/src/raiden.ts index 6a6d91b3a4..ee406478b5 100644 --- a/raiden-ts/src/raiden.ts +++ b/raiden-ts/src/raiden.ts @@ -281,7 +281,7 @@ export class Raiden { raidenReducer, // workaround for redux's PreloadedState issues with branded values state as any, // eslint-disable-line @typescript-eslint/no-explicit-any - applyMiddleware(loggerMiddleware, this.epicMiddleware, persisterMiddleware), + applyMiddleware(loggerMiddleware, persisterMiddleware, this.epicMiddleware), ); // populate deps.latest$, to ensure config, logger && pollingInterval are setup before start From c02e0293a24bce9362d1e33605823d0b6f470e06 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Vitor=20de=20Lima=20Matos?= Date: Tue, 6 Oct 2020 09:42:57 -0300 Subject: [PATCH 3/9] sdk: better handle matrix rate-limit errors --- raiden-ts/src/transport/epics/helpers.ts | 161 ++++++++------------- raiden-ts/src/transport/epics/init.ts | 88 ++++++------ raiden-ts/src/transport/epics/presence.ts | 45 ++++-- raiden-ts/src/transport/epics/rooms.ts | 128 +++++++---------- raiden-ts/src/transport/epics/webrtc.ts | 168 +++++++++++++--------- raiden-ts/src/utils/rx.ts | 60 ++++++-- 6 files changed, 343 insertions(+), 307 deletions(-) diff --git a/raiden-ts/src/transport/epics/helpers.ts b/raiden-ts/src/transport/epics/helpers.ts index acb1b190f3..f2359fc858 100644 --- a/raiden-ts/src/transport/epics/helpers.ts +++ b/raiden-ts/src/transport/epics/helpers.ts @@ -1,24 +1,24 @@ -import { Observable, of, EMPTY, fromEvent, timer, throwError, defer, merge } from 'rxjs'; +import { Observable, of, fromEvent, timer, throwError, combineLatest } from 'rxjs'; import { distinctUntilChanged, filter, map, mergeMap, withLatestFrom, - switchMap, take, - mapTo, - pluck, retryWhen, + startWith, + pluck, } from 'rxjs/operators'; import curry from 'lodash/curry'; -import { Room, MatrixClient, EventType, RoomMember, MatrixEvent } from 'matrix-js-sdk'; +import { Room, MatrixClient, EventType } from 'matrix-js-sdk'; import { Capabilities } from '../../constants'; import { RaidenConfig } from '../../config'; import { RaidenEpicDeps } from '../../types'; import { isntNil, Address, Signed } from '../../utils/types'; import { RaidenError, ErrorCodes } from '../../utils/error'; +import { pluckDistinct } from '../../utils/rx'; import { Message } from '../../messages/types'; import { decodeJsonMessage, getMessageSigner } from '../../messages/utils'; @@ -68,49 +68,26 @@ function waitMember$( address: Address, { latest$ }: Pick, ) { - return latest$.pipe( - map(({ state }) => state.transport.rooms?.[address]?.[0]), - // wait for a room to exist (created or invited) for address - filter(isntNil), - distinctUntilChanged(), - // this switchMap unsubscribes from previous "wait" if first room for address changes - switchMap((roomId) => - // get/wait room object for roomId - // may wait for the room state to be populated (happens after createRoom resolves) - getRoom$(matrix, roomId).pipe( - mergeMap((room) => - // wait for address to be monitored & online (after getting Room for address) - // latest$ ensures it happens immediatelly if all conditions are satisfied - latest$.pipe( - pluck('presences', address), - map((presence) => - presence?.payload?.available ? presence.payload.userId : undefined, - ), - distinctUntilChanged(), - map((userId) => ({ room, userId })), - ), - ), - // when user is online, get room member for partner's userId - // this switchMap unsubscribes from previous wait if userId changes or go offline - switchMap(({ room, userId }) => { - if (!userId) return EMPTY; // user not monitored or not available - const member = room.getMember(userId); - // if it already joined room, return its membership - if (member && member.membership === 'join') return of(member); - // else, wait for the user to join/accept invite - return fromEvent<[MatrixEvent, RoomMember]>(matrix, 'RoomMember.membership').pipe( - pluck(1), - filter( - (member) => - member.roomId === room.roomId && - member.userId === userId && - member.membership === 'join', - ), - ); - }), - pluck('roomId'), - ), + return combineLatest([ + latest$.pipe( + pluckDistinct('presences', address), + filter((presence) => !!presence?.payload?.available), + ), + latest$.pipe( + map(({ state }) => state.transport.rooms?.[address]?.[0]), + // wait for a room to exist (created or invited) for address + filter(isntNil), + distinctUntilChanged(), + ), + fromEvent(matrix, 'Room').pipe(startWith(null)), + fromEvent(matrix, 'RoomMember.membership').pipe(startWith(null)), + ]).pipe( + filter( + ([presence, roomId]) => + matrix.getRoom(roomId)?.getMember?.(presence.payload.userId)?.membership === 'join', ), + pluck(1), + take(1), ); } @@ -137,59 +114,43 @@ export function waitMemberAndSend$( allowRtc = false, ): Observable { const RETRY_COUNT = 3; // is this relevant enough to become a constant/setting? - return merge( - // if available & open, use channel - latest$.pipe( - filter( - ({ presences, rtc }) => - allowRtc && - address in presences && - presences[address].payload.available && - rtc[address]?.readyState === 'open', - ), - pluck('rtc', address), - ), - // if available and Capabilities.TO_DEVICE enabled on both ends, use ToDevice messages - latest$.pipe( - pluck('presences', address), - withLatestFrom(config$), - filter( - ([presence, { caps }]) => - !!caps?.[Capabilities.TO_DEVICE] && - !!presence?.payload?.available && - !!presence.payload.caps?.[Capabilities.TO_DEVICE], - ), - pluck('0', 'payload', 'userId'), - ), - waitMember$(matrix, address, { latest$ }), - ).pipe( - take(1), // use first room/user which meets all requirements/filters above - mergeMap((via) => - defer>( - async () => - typeof via !== 'string' - ? via.send(content.body) // via RTC channel - : via.startsWith('@') - ? matrix.sendToDevice(type, { [via]: { '*': content } }) // via toDevice message - : matrix.sendEvent(via, type, content, ''), // via room - ).pipe( - // this returned value is just for notification, and shouldn't be relayed on; - // all functionality is provided as side effects of the subscription - mapTo(typeof via === 'string' ? via : via.label), - retryWhen((err$) => - // if sendEvent throws, omit & retry after pollingInterval - // up to RETRY_COUNT times; if it continues to error, throws down - err$.pipe( - withLatestFrom(config$), - mergeMap(([err, { pollingInterval }], i) => { - if (i < RETRY_COUNT - 1) { - log.warn(`messageSend error, retrying ${i + 1}/${RETRY_COUNT}`, err); - return timer(pollingInterval); - // give up - } else return throwError(err); - }), - ), - ), + return latest$.pipe( + filter(({ presences }) => !!presences[address]?.payload?.available), + take(1), + withLatestFrom(config$), + mergeMap(([{ presences, rtc }, { caps }]) => { + // if available & open, use channel + if (allowRtc && rtc[address]?.readyState === 'open') return of(rtc[address]); + // if available and Capabilities.TO_DEVICE enabled on both ends, use ToDevice messages + if ( + caps?.[Capabilities.TO_DEVICE] && + presences[address].payload.caps?.[Capabilities.TO_DEVICE] + ) + return of(presences[address].payload.userId); + return waitMember$(matrix, address, { latest$ }); + }), + mergeMap(async (via) => { + if (typeof via !== 'string') via.send(content.body); + // via RTC channel + else if (via.startsWith('@')) await matrix.sendToDevice(type, { [via]: { '*': content } }); + // via toDevice message + else await matrix.sendEvent(via, type, content, ''); // via room + // this returned value is just for notification, and shouldn't be relayed on; + // all functionality is provided as side effects of the subscription + return typeof via !== 'string' ? via.label : via; + }), + retryWhen((err$) => + // if sendEvent throws, omit & retry since first 'latest$' after pollingInterval + // up to RETRY_COUNT times; if it continues to error, throws down + err$.pipe( + withLatestFrom(config$), + mergeMap(([err, { pollingInterval }], i) => { + if (i < RETRY_COUNT - 1) { + log.warn(`messageSend error, retrying ${i + 1}/${RETRY_COUNT}`, err); + return timer(pollingInterval); + // give up + } else return throwError(err); + }), ), ), ); diff --git a/raiden-ts/src/transport/epics/init.ts b/raiden-ts/src/transport/epics/init.ts index 284e106c3c..bf0b5bbe1a 100644 --- a/raiden-ts/src/transport/epics/init.ts +++ b/raiden-ts/src/transport/epics/init.ts @@ -8,11 +8,11 @@ import { merge, defer, AsyncSubject, + timer, } from 'rxjs'; import { catchError, concatMap, - delay, filter, ignoreElements, map, @@ -28,12 +28,13 @@ import { pluck, throwIfEmpty, retryWhen, + delayWhen, } from 'rxjs/operators'; import { fromFetch } from 'rxjs/fetch'; import sortBy from 'lodash/sortBy'; import isEmpty from 'lodash/isEmpty'; -import { createClient, MatrixClient, MatrixEvent, Filter } from 'matrix-js-sdk'; +import { createClient, MatrixClient, MatrixEvent } from 'matrix-js-sdk'; import { logger as matrixLogger } from 'matrix-js-sdk/lib/logger'; import { assert } from '../../utils'; @@ -43,7 +44,8 @@ import { RaidenAction } from '../../actions'; import { RaidenConfig } from '../../config'; import { RaidenState } from '../../state'; import { getServerName } from '../../utils/matrix'; -import { pluckDistinct, retryAsync$ } from '../../utils/rx'; +import { pluckDistinct, retryAsync$, retryWaitWhile } from '../../utils/rx'; +import { exponentialBackoff } from '../../transfers/epics/utils'; import { matrixSetup } from '../actions'; import { RaidenMatrixSetup } from '../state'; import { Caps } from '../types'; @@ -95,23 +97,21 @@ function joinGlobalRooms(config: RaidenConfig, matrix: MatrixClient): Observable * @param roomIds - The ids of the rooms to filter out during sync. * @returns Observable of the {@link Filter} that was created. */ -function createFilter(matrix: MatrixClient, roomIds: string[]): Observable { - return defer(() => { - const roomFilter = { - not_rooms: roomIds, - ephemeral: { - not_types: ['m.receipt', 'm.typing'], - }, - timeline: { - limit: 0, - not_senders: [matrix.getUserId()!], - }, - }; - const filterDefinition = { - room: roomFilter, - }; - return matrix.createFilter(filterDefinition); - }); +async function createFilter(matrix: MatrixClient, roomIds: string[]) { + const roomFilter = { + not_rooms: roomIds, + ephemeral: { + not_types: ['m.receipt', 'm.typing'], + }, + timeline: { + limit: 0, + not_senders: [matrix.getUserId()!], + }, + }; + const filterDefinition = { + room: roomFilter, + }; + return matrix.createFilter(filterDefinition); } function startMatrixSync( @@ -127,12 +127,17 @@ function startMatrixSync( matrix$.next(matrix); matrix$.complete(); }), - delay(1e3), // wait 1s before starting matrix, so event listeners can be registered withLatestFrom(config$), + // wait 1s before starting matrix, so event listeners can be registered + delayWhen(([, { pollingInterval }]) => timer(Math.ceil(pollingInterval / 5))), mergeMap(([, config]) => joinGlobalRooms(config, matrix).pipe( mergeMap((roomIds) => createFilter(matrix, roomIds)), mergeMap((filter) => matrix.startClient({ filter })), + retryWaitWhile( + exponentialBackoff(config.pollingInterval, config.httpTimeout), + (err) => err?.httpStatus !== 429, // retry rate-limit errors only + ), ), ), ignoreElements(), @@ -246,7 +251,7 @@ function setupMatrixClient$( accessToken: setup.accessToken, deviceId: setup.deviceId, }); - return of({ matrix, server, setup }); + return of({ matrix, server, setup, pollingInterval }); } else { const matrix = createClient({ baseUrl: server }); const userName = address.toLowerCase(), @@ -291,31 +296,32 @@ function setupMatrixClient$( accessToken: access_token, deviceId: device_id, displayName: signedUserId, - } as RaidenMatrixSetup, + }, + pollingInterval, })), ); }), - // the APIs below are authenticated, and therefore also act as validator - mergeMap(({ matrix, server, setup }) => - // set these properties before starting sync - merge( - retryAsync$( - () => matrix.setDisplayName(setup.displayName), - pollingInterval, - (err) => err?.httpStatus !== 429, - ), - retryAsync$( - () => matrix.setAvatarUrl(caps && !isEmpty(caps) ? stringifyCaps(caps) : ''), - pollingInterval, - (err) => err?.httpStatus !== 429, - ), - ).pipe( - mapTo({ matrix, server, setup }), // return triplet again - ), - ), ); } }), + // the APIs below are authenticated, and therefore also act as validator + mergeMap(({ matrix, server, setup, pollingInterval }) => + // set these properties before starting sync + combineLatest([ + retryAsync$( + () => matrix.setDisplayName(setup.displayName), + pollingInterval, + (err) => err?.httpStatus !== 429, + ), + retryAsync$( + () => matrix.setAvatarUrl(caps && !isEmpty(caps) ? stringifyCaps(caps) : ''), + pollingInterval, + (err) => err?.httpStatus !== 429, + ), + ]).pipe( + mapTo({ matrix, server, setup }), // return triplet again + ), + ), ); } diff --git a/raiden-ts/src/transport/epics/presence.ts b/raiden-ts/src/transport/epics/presence.ts index 1e07ad1b97..3b7d3ccc02 100644 --- a/raiden-ts/src/transport/epics/presence.ts +++ b/raiden-ts/src/transport/epics/presence.ts @@ -19,9 +19,11 @@ import { import minBy from 'lodash/minBy'; import isEmpty from 'lodash/isEmpty'; import isEqual from 'lodash/isEqual'; +import pick from 'lodash/pick'; import { getAddress, verifyMessage } from 'ethers/utils'; import { MatrixClient, MatrixEvent } from 'matrix-js-sdk'; +import { exponentialBackoff } from '../../transfers/epics/utils'; import { assert } from '../../utils'; import { RaidenError, ErrorCodes } from '../../utils/error'; import { Address, isntNil } from '../../utils/types'; @@ -30,7 +32,7 @@ import { RaidenEpicDeps } from '../../types'; import { RaidenAction } from '../../actions'; import { RaidenState } from '../../state'; import { getUserPresence } from '../../utils/matrix'; -import { pluckDistinct } from '../../utils/rx'; +import { pluckDistinct, retryWaitWhile } from '../../utils/rx'; import { matrixPresence } from '../actions'; import { channelMonitored } from '../../channels/actions'; import { parseCaps, stringifyCaps } from '../utils'; @@ -146,6 +148,8 @@ export const matrixMonitorPresenceEpic = ( ), ); +const comparePresencesFields = ['userId', 'available', 'caps'] as const; + /** * Monitor peers matrix presence from User.presence events * We aggregate all users of interest (i.e. for which a monitor request was emitted at some point) @@ -157,12 +161,13 @@ export const matrixMonitorPresenceEpic = ( * @param deps.log - Logger instance * @param deps.matrix$ - MatrixClient async subject * @param deps.latest$ - Latest values + * @param deps.config$ - Config observable * @returns Observable of presence updates */ export const matrixPresenceUpdateEpic = ( action$: Observable, {}: Observable, - { log, matrix$, latest$ }: RaidenEpicDeps, + { log, matrix$, latest$, config$ }: RaidenEpicDeps, ): Observable => matrix$ .pipe( @@ -170,21 +175,23 @@ export const matrixPresenceUpdateEpic = ( switchMap((matrix) => // matrix's 'User.presence' sometimes fail to fire, but generic 'event' is always fired, // and User (retrieved via matrix.getUser) is up-to-date before 'event' emits - fromEvent(matrix, 'event').pipe(map((event) => ({ event, matrix }))), + fromEvent(matrix, 'event').pipe( + filter((event) => event.getType() === 'm.presence'), + map((event) => ({ event, matrix })), + ), ), - filter(({ event }) => event.getType() === 'm.presence'), // parse peer address from userId map(({ event, matrix }) => { - // as 'event' is emitted after user is (created and) updated, getUser always returns it - const user = matrix.getUser(event.getSender()); try { + // as 'event' is emitted after user is (created and) updated, getUser always returns it + const user = matrix.getUser(event.getSender()); assert(user?.presence); const peerAddress = userRe.exec(user.userId)?.[1]; assert(peerAddress); // getAddress will convert any valid address into checksummed-format const address = getAddress(peerAddress) as Address | undefined; assert(address); - return { matrix, user, address }; + return { matrix, user, address, event }; } catch (err) {} }), // filter out events without userId in the right format (startWith hex-address) @@ -196,11 +203,12 @@ export const matrixPresenceUpdateEpic = ( scan((toMonitor, request) => toMonitor.add(request.meta.address), new Set
()), startWith(new Set
()), ), + config$, ), // filter out events from users we don't care about // i.e.: presence monitoring never requested filter(([{ address }, toMonitor]) => toMonitor.has(address)), - mergeMap(([{ matrix, user, address }]) => { + mergeMap(([{ matrix, user, address }, , { pollingInterval, httpTimeout }]) => { // first filter can't tell typescript this property will always be set! const userId = user.userId, presence = user.presence!, @@ -227,21 +235,30 @@ export const matrixPresenceUpdateEpic = ( { address: recovered }, ); }), + retryWaitWhile( + exponentialBackoff(pollingInterval, httpTimeout), + (err) => err?.httpStatus !== 429, // retry rate-limit errors only + ), catchError( - (err) => (log.debug('Error validating presence event, ignoring', err), EMPTY), + (err) => (log.warn('Error validating presence event, ignoring', userId, err), EMPTY), ), ); }), ) .pipe( withLatestFrom(latest$), - // filter out if presence update is to offline, and address became online in another user filter( ([action, { presences }]) => - action.payload.available || - !(action.meta.address in presences) || - !presences[action.meta.address].payload.available || - action.payload.userId === presences[action.meta.address].payload.userId, + // filter out if presence update is to offline, and address became online in another user + (action.payload.available || + !(action.meta.address in presences) || + !presences[action.meta.address].payload.available || + action.payload.userId === presences[action.meta.address].payload.userId) && + // pass only if some relevant field changed + !isEqual( + pick(action.payload, comparePresencesFields), + pick(presences[action.meta.address]?.payload, comparePresencesFields), + ), ), pluck(0), ); diff --git a/raiden-ts/src/transport/epics/rooms.ts b/raiden-ts/src/transport/epics/rooms.ts index 5bebdf0aa5..5ca8afa465 100644 --- a/raiden-ts/src/transport/epics/rooms.ts +++ b/raiden-ts/src/transport/epics/rooms.ts @@ -1,4 +1,4 @@ -import { Observable, from, of, EMPTY, fromEvent, timer, defer, concat } from 'rxjs'; +import { Observable, from, of, EMPTY, fromEvent, timer, defer, combineLatest } from 'rxjs'; import { catchError, delay, @@ -20,22 +20,25 @@ import { distinct, delayWhen, pluck, + startWith, } from 'rxjs/operators'; import { MatrixClient, MatrixEvent, Room, RoomMember } from 'matrix-js-sdk'; import { Capabilities } from '../../constants'; +import { RaidenConfig } from '../../config'; import { Address, isntNil } from '../../utils/types'; import { isActionOf } from '../../utils/actions'; import { RaidenEpicDeps } from '../../types'; import { RaidenAction } from '../../actions'; +import { RaidenState } from '../../state'; import { channelMonitored } from '../../channels/actions'; import { messageSend, messageReceived } from '../../messages/actions'; import { transferSigned } from '../../transfers/actions'; -import { RaidenState } from '../../state'; -import { pluckDistinct } from '../../utils/rx'; +import { pluckDistinct, retryWaitWhile, takeIf } from '../../utils/rx'; import { getServerName } from '../../utils/matrix'; import { Direction } from '../../transfers/state'; +import { exponentialBackoff } from '../../transfers/epics/utils'; import { matrixRoom, matrixRoomLeave, matrixPresence } from '../actions'; import { globalRoomNames, getRoom$, roomMatch } from './helpers'; @@ -47,7 +50,9 @@ import { globalRoomNames, getRoom$, roomMatch } from './helpers'; * @param matrix - client instance * @param roomId - room to invite user to * @param userId - user to be invited - * @param config$ - Observable of config object containing httpTimeout used as iteration delay + * @param config - Config object + * @param config.pollingInterval - wait this interval before first invite + * @param config.httpTimeout - wait this interval between retries * @param opts - Options object * @param opts.log - Logger instance * @returns Cold observable which keep inviting user if needed and then completes. @@ -56,43 +61,26 @@ function inviteLoop$( matrix: MatrixClient, roomId: string, userId: string, - config$: Observable<{ httpTimeout: number }>, - { log }: { log: RaidenEpicDeps['log'] }, + { pollingInterval, httpTimeout }: RaidenConfig, + { log }: Pick, ) { - return defer(() => { - const room = matrix.getRoom(roomId); - return room - ? // use room already present in matrix instance - of(room) - : // wait for room - fromEvent(matrix, 'Room').pipe( - filter((room) => room.roomId === roomId), - take(1), - ); - }).pipe( - // stop if user already a room member - filter((room) => { - const member = room.getMember(userId); - return !member || member.membership !== 'join'; - }), - withLatestFrom(config$), - mergeMap(([, { httpTimeout }]) => - // defer here ensures invite is re-done on repeat (re-subscription) - defer(() => matrix.invite(roomId, userId).catch(log.warn.bind(log, 'Error inviting'))).pipe( + // defer here ensures invite is re-done on repeat (re-subscription) + return timer(pollingInterval).pipe( + mergeMap(() => + defer(() => matrix.invite(roomId, userId)).pipe( + catchError((err) => (log.warn('Error inviting', err), EMPTY)), // while shouldn't stop (by unsubscribe or takeUntil) repeatWhen((completed$) => completed$.pipe(delay(httpTimeout))), - takeUntil( - // stop repeat+defer loop above when user joins - fromEvent<[MatrixEvent, RoomMember]>(matrix, 'RoomMember.membership').pipe( - pluck(1), - filter( - (member) => - member.roomId === roomId && - member.userId === userId && - member.membership === 'join', - ), - ), - ), + ), + ), + // takeIf will unsubscribe from repeat loop if user joins room, re-subscribe while/if not + takeIf( + fromEvent<[MatrixEvent, RoomMember]>(matrix, 'RoomMember.membership').pipe( + pluck(1), + filter((member) => member.roomId === roomId && member.userId === userId), + startWith(matrix.getRoom(roomId)?.getMember?.(userId)), + map((member) => member?.membership !== 'join'), + distinctUntilChanged(), ), ), ); @@ -109,12 +97,13 @@ function inviteLoop$( * @param deps.log - Logger instance * @param deps.matrix$ - MatrixClient async subject * @param deps.latest$ - Latest values + * @param deps.config$ - Config observable * @returns Observable of matrixRoom actions */ export const matrixCreateRoomEpic = ( action$: Observable, {}: Observable, - { address, log, matrix$, latest$ }: RaidenEpicDeps, + { address, log, matrix$, latest$, config$ }: RaidenEpicDeps, ): Observable => // actual output observable, selects addresses of interest from actions action$.pipe( @@ -153,10 +142,11 @@ export const matrixCreateRoomEpic = ( grouped$.pipe( // this mergeMap is like withLatestFrom, but waits until matrix$ emits its only value mergeMap((address) => matrix$.pipe(map((matrix) => ({ address, matrix })))), + withLatestFrom(config$), // exhaustMap is used to prevent bursts of actions for a given address (eg. on startup) // of creating multiple rooms for same address, so we ignore new address items while // previous is being processed. If user roams, matrixInviteEpic will re-invite - exhaustMap(({ address, matrix }) => + exhaustMap(([{ address, matrix }, { pollingInterval, httpTimeout }]) => // presencesStateReplay$+take(1) acts like withLatestFrom with cached result latest$.pipe( // wait for user to be monitored @@ -178,6 +168,10 @@ export const matrixCreateRoomEpic = ( }), ), map(({ room_id: roomId }) => matrixRoom({ roomId }, { address })), + retryWaitWhile( + exponentialBackoff(pollingInterval, httpTimeout), + (err) => err?.httpStatus !== 429, // retry rate-limit errors only + ), catchError((err) => (log.error('Error creating room, ignoring', err), EMPTY)), ), ), @@ -210,44 +204,21 @@ export const matrixInviteEpic = ( groupBy((a) => a.meta.address), mergeMap((grouped$) => // grouped$ is one observable of presence actions per partners address - grouped$.pipe( + combineLatest([ + grouped$, + latest$.pipe( + map(({ state }) => state.transport.rooms?.[grouped$.key]?.[0]), + distinctUntilChanged(), + ), + ]).pipe( // action comes only after matrix$ is started, so it's safe to use withLatestFrom - withLatestFrom(matrix$), + withLatestFrom(matrix$, config$), // switchMap on new presence action for address - switchMap(([action, matrix]) => + switchMap(([[action, roomId], matrix, config]) => // if not available, do nothing (and unsubscribe from previous observable) - !action.payload.available + !action.payload.available || !roomId ? EMPTY - : latest$.pipe( - map(({ state }) => state.transport.rooms?.[action.meta.address]?.[0]), - distinctUntilChanged(), - switchMap((roomId) => - concat( - of(roomId), - !roomId - ? EMPTY - : // re-trigger invite loop if user leaves - fromEvent<[MatrixEvent, RoomMember]>(matrix, 'RoomMember.membership').pipe( - pluck(1), - filter( - (member) => - member.roomId === roomId && - member.userId === action.payload.userId && - member.membership === 'leave', - ), - mapTo(roomId), - ), - ), - ), - // switchMap on main roomId change - switchMap((roomId) => - !roomId - ? // if roomId not set, do nothing and unsubscribe - EMPTY - : // while subscribed and user didn't join, invite every httpTimeout=30s - inviteLoop$(matrix, roomId, action.payload.userId, config$, { log }), - ), - ), + : inviteLoop$(matrix, roomId, action.payload.userId, config, { log }), ), ), ), @@ -300,10 +271,15 @@ export const matrixHandleInvitesEpic = ( ); return senderPresence$.pipe(map((senderPresence) => ({ matrix, member, senderPresence }))); }), - mergeMap(({ matrix, member, senderPresence }) => + withLatestFrom(config$), + mergeMap(([{ matrix, member, senderPresence }, { pollingInterval, httpTimeout }]) => // join room and emit MatrixRoomAction to make it default/first option for sender address - from(matrix.joinRoom(member.roomId, { syncRoom: true })).pipe( + defer(() => matrix.joinRoom(member.roomId, { syncRoom: true })).pipe( mapTo(matrixRoom({ roomId: member.roomId }, { address: senderPresence.meta.address })), + retryWaitWhile( + exponentialBackoff(pollingInterval, httpTimeout), + (err) => err?.httpStatus !== 429, // retry rate-limit errors only + ), catchError((err) => (log.error('Error joining invited room, ignoring', err), EMPTY)), ), ), diff --git a/raiden-ts/src/transport/epics/webrtc.ts b/raiden-ts/src/transport/epics/webrtc.ts index 86477577df..bc962bc284 100644 --- a/raiden-ts/src/transport/epics/webrtc.ts +++ b/raiden-ts/src/transport/epics/webrtc.ts @@ -25,9 +25,7 @@ import { take, takeUntil, tap, - mapTo, finalize, - timeout, pluck, repeatWhen, delayWhen, @@ -35,6 +33,7 @@ import { bufferTime, endWith, mergeMapTo, + startWith, } from 'rxjs/operators'; import * as t from 'io-ts'; @@ -43,8 +42,10 @@ import { MatrixClient, MatrixEvent } from 'matrix-js-sdk'; import { Capabilities } from '../../constants'; import { Address, decode, isntNil } from '../../utils/types'; import { jsonParse, jsonStringify } from '../../utils/data'; +import { timeoutFirst } from '../../utils/rx'; import { RaidenEpicDeps } from '../../types'; -import { RaidenAction, raidenShutdown } from '../../actions'; +import { RaidenAction } from '../../actions'; +import { exponentialBackoff } from '../../transfers/epics/utils'; import { RaidenConfig } from '../../config'; import { messageReceived } from '../../messages/actions'; import { RaidenState } from '../../state'; @@ -100,7 +101,7 @@ const _matrixIceServersCache = new WeakMap { const cached = _matrixIceServersCache.get(matrix); if (cached && Date.now() < cached[0]) return cached[1]; - const fetched = ((await matrix.turnServer()) as unknown) as + const fetched = ((await matrix.turnServer().catch(() => undefined)) as unknown) as | { uris: string | string[]; ttl: number; @@ -125,7 +126,6 @@ async function getMatrixIceServers(matrix: MatrixClient): Promise string; getContent: () => { msgtype: string; body: string }; }; @@ -154,6 +154,8 @@ function matrixWebrtcEvents$( ); } +type RtcConnPair = readonly [RTCPeerConnection, RTCDataChannel]; + // setup candidates$ handlers function handleCandidates$( connection: RTCPeerConnection, @@ -209,23 +211,27 @@ function setupCallerDataChannel$( info: CallInfo, { fallbackIceServers }: RaidenConfig, deps: Pick, -): Observable { +): Observable { const { callId, peerId, peerAddress } = info; const { log, latest$, config$ } = deps; - return from(getMatrixIceServers(matrix)).pipe( + return defer(() => getMatrixIceServers(matrix)).pipe( mergeMap((matrixTurnServers) => { const connection = new RTCPeerConnection({ iceServers: [...matrixTurnServers, ...fallbackIceServers], }); // we relay on retries, no need to enforce ordered const dataChannel = connection.createDataChannel(callId, { ordered: false }); + Object.assign(dataChannel, { connection }); return merge( // despite 'never' emitting, candidates$ have side-effects while/when subscribed handleCandidates$(connection, matrix, start$, info, deps), - defer(() => connection.createOffer()).pipe( + defer(async () => connection.createOffer()).pipe( + mergeMap(async (offer) => { + await connection.setLocalDescription(offer); + return offer; + }), mergeMap((offer) => { - connection.setLocalDescription(offer); const body: t.TypeOf = { type: offer.type as RtcEventType.offer, sdp: offer.sdp!, @@ -248,19 +254,21 @@ function setupCallerDataChannel$( ); }), take(1), - tap(() => log.info('RTC: got answer', callId)), - map((event) => { + tap((event) => { + log.info('RTC: got answer', callId); if (event.call_id !== callId) log.warn( `RTC: callId mismatch, continuing: we="${callId}", them="${event.call_id}"`, ); - connection.setRemoteDescription(event); + }), + mergeMap(async (event) => connection.setRemoteDescription(event)), + tap(() => { start$.next(null); start$.complete(); }), ignoreElements(), ), - of(dataChannel), // output created channel + of([connection, dataChannel] as const), // output created channel ); }), ); @@ -273,7 +281,7 @@ function setupCalleeDataChannel$( info: CallInfo, {}: RaidenConfig, deps: Pick, -): Observable { +): Observable { const { callId, peerId, peerAddress } = info; const { log, latest$, config$ } = deps; return matrixWebrtcEvents$(matrix, RtcEventType.offer, peerId).pipe( @@ -289,13 +297,16 @@ function setupCalleeDataChannel$( const connection = new RTCPeerConnection({ iceServers: [...matrixTurnServers, ...fallbackIceServers], }); - connection.setRemoteDescription(event); return merge( // despite 'never' emitting, candidates$ have side-effects while/when subscribed handleCandidates$(connection, matrix, start$, info, deps), - defer(() => connection.createAnswer()).pipe( + defer(async () => connection.setRemoteDescription(event)).pipe( + mergeMap(async () => connection.createAnswer()), + mergeMap(async (answer) => { + await connection.setLocalDescription(answer); + return answer; + }), mergeMap((answer) => { - connection.setLocalDescription(answer); const body: t.TypeOf = { type: answer.type as RtcEventType.answer, sdp: answer.sdp!, @@ -317,53 +328,69 @@ function setupCalleeDataChannel$( }), ignoreElements(), ), - fromEvent(connection, 'datachannel').pipe(pluck('channel')), + fromEvent(connection, 'datachannel').pipe( + pluck('channel'), + tap((channel) => Object.assign(channel, { connection })), + map((channel) => [connection, channel] as const), + ), ); }), take(1), ); } +const hangUpError = 'RTC: peer hung up'; +const closedError = 'RTC: dataChannel closed'; +const pingMsg = 'ping'; +const failedConnectionStates = ['failed', 'closed', 'disconnected']; + // setup listeners & events for a data channel, when it gets opened, and teardown when closed -function listenDataChannel$( - stop$: Subject, +function listenDataChannel( + openTimeout: number, { callId, peerId, peerAddress }: CallInfo, - { httpTimeout }: RaidenConfig, { log }: Pick, -): OperatorFunction { - return (dataChannel$) => - dataChannel$.pipe( - mergeMap((dataChannel) => +): OperatorFunction { + return (pair$) => + pair$.pipe( + mergeMap(([connection, dataChannel]) => merge( + fromEvent(connection, 'connectionstatechange').pipe( + startWith(null), + mergeMap(() => + failedConnectionStates.includes(connection.connectionState) + ? throwError(new Error('RTC: connection failed')) + : EMPTY, + ), + ), fromEvent(dataChannel, 'close').pipe( - tap(() => { - log.info('RTC: dataChannel close', callId); - stop$.next(null); - stop$.complete(); - }), - ignoreElements(), + mergeMapTo(throwError(new Error(closedError))), ), fromEvent(dataChannel, 'error').pipe( mergeMap((ev) => throwError(ev.error)), ), fromEvent(dataChannel, 'open').pipe( - take(1), - // this timeout ensures the whole dataChannel$ observable throws if it can't be set up, - // so it can be retried/repeated/restarted. - // notice it only starts after dataChannel$ emits, i.e. on subscription for caller (as - // it createDataChannel object, then responsible for hanging up call and retrying), - // and after 'datachannel' event for callee (passive listener) - timeout(httpTimeout / 3), - tap(() => log.info('RTC: dataChannel open', callId)), - // output rtcChannel action with defined channel instance to have it set in latest$ - mapTo(rtcChannel(dataChannel, { address: peerAddress })), + tap(() => { + log.info('RTC: dataChannel open', callId); + // when connected, sends a first message + dataChannel.send(pingMsg); + }), + ignoreElements(), ), + // 'race+throwError' is like timeout operator, but applies only once + // i.e. times out to retry whole connection if no first message is received on time; + // emits rtcChannel action on first message, instead of on 'open' event fromEvent(dataChannel, 'message').pipe( + timeoutFirst(openTimeout), tap((e) => log.debug('RTC: dataChannel message', callId, e)), pluck('data'), filter((d: unknown): d is string => typeof d === 'string'), - map((line) => - messageReceived( + mergeMap(function* (line, msgCount) { + // output rtcChannel action with defined channel instance to have it set in latest$ + // on (and only on) first received message; if no message is received, it'll + // timeout and retry channel + if (msgCount === 0) yield rtcChannel(dataChannel, { address: peerAddress }); + if (line === pingMsg) return; // ignore pingMsg, used only to succeed rtcChannel + yield messageReceived( { text: line, message: parseMessage(line, peerAddress, { log }), @@ -371,20 +398,11 @@ function listenDataChannel$( userId: peerId, }, { address: peerAddress }, - ), - ), + ); + }), ), - ).pipe(finalize(() => dataChannel.close())), + ).pipe(finalize(() => (dataChannel.close(), connection.close()))), ), - takeUntil(stop$), - catchError((err) => { - stop$.next(null); - stop$.complete(); - log.info("Couldn't set up WebRTC dataChannel, retrying", callId, err?.message ?? err); - return EMPTY; - }), - // if it ends by takeUntil or catchError, output rtcChannel to reset latest$ mapping - endWith(rtcChannel(undefined, { address: peerAddress })), ); } @@ -397,7 +415,9 @@ function handlePresenceChange$( return presence$.pipe( distinctUntilChanged( (a, b) => - a.payload.userId === b.payload.userId && a.payload.available === b.payload.available, + a.payload.userId === b.payload.userId && + a.payload.available === b.payload.available && + a.payload.caps?.[Capabilities.WEBRTC] === b.payload.caps?.[Capabilities.WEBRTC], ), withLatestFrom(matrix$, config$), filter( @@ -414,6 +434,8 @@ function handlePresenceChange$( .sort((a, b) => a.localeCompare(b)) .join('|'); const isCaller = callId.startsWith(address.toLowerCase()); + const timeoutGen = exponentialBackoff(config.pollingInterval, 2 * config.httpTimeout); + return defer(() => { const info: CallInfo = { callId, @@ -424,9 +446,9 @@ function handlePresenceChange$( // start$ indicates invite/offer/answer cycle completed, and candidates can be exchanged const start$ = new AsyncSubject(); // stop$ indicates dataChannel closed (maybe by peer), and teardown should take place - const stop$ = new AsyncSubject(); + const stop$ = new AsyncSubject(); - let dataChannel$: Observable; + let dataChannel$; if (isCaller) { // caller dataChannel$ = setupCallerDataChannel$(matrix, start$, info, config, deps); @@ -437,6 +459,7 @@ function handlePresenceChange$( stop$ .pipe( + filter((errored) => errored), mergeMap(() => { const body: t.TypeOf = { type: RtcEventType.hangup, @@ -450,20 +473,35 @@ function handlePresenceChange$( { log, latest$, config$ }, ).pipe(takeUntil(timer(config.httpTimeout / 10))); }), - takeUntil(action$.pipe(filter(raidenShutdown.is))), + takeUntil(action$.pipe(ignoreElements(), endWith(null))), ) .subscribe(); // when stopping, if not shutting down, send hangup - // listenDataChannel$ needs channel$:Observable, but we must include/merge - // setup and monitoring Observable's to get things moving on subscription + const { value: timeoutValue } = timeoutGen.next(); + if (!timeoutValue) return EMPTY; // shouldn't happen with exponentialBackoff + + // listenDataChannel$ needs channel$:Observable<[RTCDataChannel]>, but we must include/ + // merge setup and monitoring Observable's to get things moving on subscription return merge( dataChannel$, - // throws nad restart if peer hangs up + // throws and restart if peer hangs up matrixWebrtcEvents$(matrix, RtcEventType.hangup, info.peerId).pipe( - // no need for specific error since this is just logged and ignored in listenDataChannel$ - mergeMapTo(throwError(new Error('RTC: peer hung up'))), + // no need for specific error since this is just logged and ignored + mergeMapTo(throwError(new Error(hangUpError))), ), - ).pipe(listenDataChannel$(stop$, info, config, deps)); + ).pipe( + listenDataChannel(timeoutValue, info, deps), + takeUntil(stop$), + catchError((err) => { + // emit false for these errors, to prevent delayed hangup event to be sent + stop$.next(![hangUpError, closedError].includes(err?.message)); + stop$.complete(); + log.info("Couldn't set up WebRTC dataChannel, retrying", callId, err?.message ?? err); + return EMPTY; + }), + // if it ends by takeUntil or catchError, output rtcChannel to reset latest$ mapping + endWith(rtcChannel(undefined, { address: action.meta.address })), + ); }).pipe( // if it disconnects for any reason, but partner is still online, // try to reconnect by repeating from 'defer' diff --git a/raiden-ts/src/utils/rx.ts b/raiden-ts/src/utils/rx.ts index 06abcf3364..7016370fcf 100644 --- a/raiden-ts/src/utils/rx.ts +++ b/raiden-ts/src/utils/rx.ts @@ -6,6 +6,7 @@ import { defer, throwError, timer, + race, } from 'rxjs'; import { pluck, @@ -19,6 +20,7 @@ import { takeWhile, map, switchMap, + mergeMapTo, } from 'rxjs/operators'; import { isntNil } from './types'; @@ -98,6 +100,39 @@ export function repeatUntil( ); } +/** + * Operator to retry/re-subscribe input$ until a stopPredicate returns truthy or delayMs iterator + * completes, waiting delayMs milliseconds between retries. + * Input observable must be re-subscribable/retriable. + * + * @param delayMs - Interval or iterator of intervals to wait between retries + * @param stopPredicate - Receives error and count, stop retry and throw if returns truthy + * @returns Operator function to retry if stopPredicate not truthy waiting between retries + */ +export function retryWaitWhile( + delayMs: number | Iterator, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + stopPredicate: (err: any, count: number) => boolean | undefined = (_, count) => count >= 10, +): MonoTypeOperatorFunction { + return (input$) => + input$.pipe( + retryWhen((error$) => + error$.pipe( + mergeMap((error, count) => { + let interval; + if (typeof delayMs === 'number') interval = delayMs; + else { + const next = delayMs.next(); + interval = !next.done ? next.value : -1; + } + if (stopPredicate(error, count) || interval < 0) return throwError(error); + return timer(interval); + }), + ), + ), + ); +} + /** * Receives an async function and returns an observable which will retry it every interval until it * resolves, or throw if it can't succeed after 10 retries. @@ -106,26 +141,18 @@ export function repeatUntil( * JsonRpcProvider._doPoll also catches, suppresses & retry * * @param func - An async function (e.g. a Promise factory, like a defer callback) - * @param interval - Interval to retry in case of rejection + * @param delayMs - Interval to retry in case of rejection, or iterator yielding intervals * @param stopPredicate - Stops retrying and throws if this function returns a truty value; * Receives error and retry count; Default: stops after 10 retries * @returns Observable version of async function, with retries */ export function retryAsync$( func: () => Promise, - interval = 1e3, + delayMs: number | Iterator = 1e3, // eslint-disable-next-line @typescript-eslint/no-explicit-any stopPredicate: (err: any, count: number) => boolean | undefined = (_, count) => count >= 10, ): Observable { - return defer(func).pipe( - retryWhen((error$) => - error$.pipe( - mergeMap((error, count) => - stopPredicate(error, count) ? throwError(error) : timer(interval), - ), - ), - ), - ); + return defer(func).pipe(retryWaitWhile(delayMs, stopPredicate)); } /** @@ -154,3 +181,14 @@ export function takeIf( repeatWhen(() => distinctCond$.pipe(filter((cond): cond is true => cond))), ); } + +/** + * Like timeout rxjs operator, but applies only on first emition + * + * @param timeout - Timeout to wait for an item flow through input + * @returns Operator function + */ +export function timeoutFirst(timeout: number): MonoTypeOperatorFunction { + return (input$) => + race(timer(timeout).pipe(mergeMapTo(throwError(new Error('timeout waiting first')))), input$); +} From c2d10573453ac7a8661f2231f475b8a567147752 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Vitor=20de=20Lima=20Matos?= Date: Tue, 6 Oct 2020 09:43:47 -0300 Subject: [PATCH 4/9] cli: fixes on missing params, enable toDevice by default --- raiden-cli/src/config.json | 3 ++- raiden-cli/src/routes/channels.ts | 2 +- raiden-cli/src/routes/payments.ts | 2 +- raiden-cli/src/utils/logging.ts | 3 +++ 4 files changed, 7 insertions(+), 3 deletions(-) diff --git a/raiden-cli/src/config.json b/raiden-cli/src/config.json index 6c696bc468..869699587e 100644 --- a/raiden-cli/src/config.json +++ b/raiden-cli/src/config.json @@ -6,7 +6,8 @@ "noDelivery": true, "noReceive": false, "noMediate": false, - "webRTC": true + "webRTC": true, + "toDevice": true }, "autoSettle": true } diff --git a/raiden-cli/src/routes/channels.ts b/raiden-cli/src/routes/channels.ts index 838545868b..ce39335fbb 100644 --- a/raiden-cli/src/routes/channels.ts +++ b/raiden-cli/src/routes/channels.ts @@ -106,7 +106,7 @@ async function openChannel(this: Cli, request: Request, response: Response, next // a better solution. await this.raiden.openChannel(token, partner, { settleTimeout: request.body.settle_timeout, - deposit: request.body.total_deposit.toString(), + deposit: request.body.total_deposit?.toString?.(), }); const channel = await this.raiden.channels$ .pipe(pluck(token, partner), first(isntNil)) diff --git a/raiden-cli/src/routes/payments.ts b/raiden-cli/src/routes/payments.ts index bcc48428c6..769b8392a8 100644 --- a/raiden-cli/src/routes/payments.ts +++ b/raiden-cli/src/routes/payments.ts @@ -81,7 +81,7 @@ async function doTransfer(this: Cli, request: Request, response: Response, next: request.params.tokenAddress, request.params.targetAddress, request.body.amount.toString(), - { paymentId: request.body.identifier.toString(), lockTimeout: request.body.lock_timeout }, + { paymentId: request.body.identifier?.toString?.(), lockTimeout: request.body.lock_timeout }, ); const transfer = await this.raiden.waitTransfer(transferKey); response.send(transformSdkTransferToApiPayment(transfer)); diff --git a/raiden-cli/src/utils/logging.ts b/raiden-cli/src/utils/logging.ts index 685bab88eb..d816a761be 100644 --- a/raiden-cli/src/utils/logging.ts +++ b/raiden-cli/src/utils/logging.ts @@ -1,7 +1,10 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ import fs from 'fs'; +import util from 'util'; import logging, { LoggingMethod } from 'loglevel'; +util.inspect.defaultOptions.depth = 3; // +1 from default of 2 + export function setupLoglevel(output?: string): void { const originalFactory = logging.methodFactory; From 4653f6ece6f0f651a8acd14a379afe75048438f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Vitor=20de=20Lima=20Matos?= Date: Tue, 6 Oct 2020 10:52:01 -0300 Subject: [PATCH 5/9] sdk: improve startup log with config --- raiden-ts/src/raiden.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/raiden-ts/src/raiden.ts b/raiden-ts/src/raiden.ts index ee406478b5..a352de88ed 100644 --- a/raiden-ts/src/raiden.ts +++ b/raiden-ts/src/raiden.ts @@ -417,9 +417,10 @@ export class Raiden { prevBlockNumber: this.state.blockNumber, address: this.address, TokenNetworkRegistry: this.deps.contractsInfo.TokenNetworkRegistry.address, - network: this.deps.network, + network: { name: this.deps.network.name, chainId: this.deps.network.chainId }, 'raiden-ts': Raiden.version, 'raiden-contracts': Raiden.contractVersion, + config: this.config, }); // Set `epicMiddleware` to `null`, this indicates the instance is not running. From 1fcb3f2fa3f91cdc7829beb97408bb1f1298d8ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Vitor=20de=20Lima=20Matos?= Date: Tue, 6 Oct 2020 12:17:03 -0300 Subject: [PATCH 6/9] cli: workaround wrtc segfault on pc.close --- raiden-cli/src/index.ts | 4 ++-- raiden-ts/src/transport/epics/webrtc.ts | 8 +++++++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/raiden-cli/src/index.ts b/raiden-cli/src/index.ts index e31756e8f9..8f57c1e2bd 100644 --- a/raiden-cli/src/index.ts +++ b/raiden-cli/src/index.ts @@ -205,8 +205,8 @@ function shutdownRaiden(this: Cli): void { if (this.raiden.started) { this.log.info('Stopping raiden...'); this.raiden.stop(); - // force-exit at most 5s after stopping raiden - unrefTimeout(setTimeout(() => process.exit(0), 5000)); + // force-exit at most 10s after stopping raiden + unrefTimeout(setTimeout(() => process.exit(0), 10000)); } else { process.exit(1); } diff --git a/raiden-ts/src/transport/epics/webrtc.ts b/raiden-ts/src/transport/epics/webrtc.ts index bc962bc284..433904c67c 100644 --- a/raiden-ts/src/transport/epics/webrtc.ts +++ b/raiden-ts/src/transport/epics/webrtc.ts @@ -401,7 +401,13 @@ function listenDataChannel( ); }), ), - ).pipe(finalize(() => (dataChannel.close(), connection.close()))), + ).pipe( + finalize(() => { + dataChannel.close(); + // FIXME: https://github.com/node-webrtc/node-webrtc/issues/636 + // connection.close(); + }), + ), ), ); } From 6f9af204a4af9c18c55d2f385812048b65c86ddf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Vitor=20de=20Lima=20Matos?= Date: Tue, 6 Oct 2020 12:23:23 -0300 Subject: [PATCH 7/9] sdk: tests --- raiden-ts/tests/unit/epics/transport.spec.ts | 168 +++++++++++++++---- raiden-ts/tests/unit/mocks.ts | 29 +++- 2 files changed, 165 insertions(+), 32 deletions(-) diff --git a/raiden-ts/tests/unit/epics/transport.spec.ts b/raiden-ts/tests/unit/epics/transport.spec.ts index 49fb063980..737ffeef45 100644 --- a/raiden-ts/tests/unit/epics/transport.spec.ts +++ b/raiden-ts/tests/unit/epics/transport.spec.ts @@ -597,21 +597,24 @@ describe('transport epic', () => { expect(matrix.invite).not.toHaveBeenCalled(); }); - test('invite if there is room for user', () => { + test('invite if there is room for user', async () => { expect.assertions(2); const roomId = partnerRoomId; action$.next(matrixRoom({ roomId }, { address: partner })); - matrix.invite.mockResolvedValueOnce(Promise.resolve()); // partner joins when they're invited the second time - matrix.invite.mockImplementationOnce(async () => { - matrix.emit( - 'RoomMember.membership', - {}, - { roomId, userId: partnerUserId, membership: 'join' }, - ); - }); + const promise = new Promise((resolve) => + matrix.invite.mockImplementation(async () => { + matrix.emit( + 'RoomMember.membership', + {}, + { roomId, userId: partnerUserId, membership: 'join' }, + ); + resolve(); + }), + ); + matrix.invite.mockResolvedValueOnce(Promise.resolve()); // epic needs to wait for the room to become available matrix.getRoom.mockReturnValueOnce(null); @@ -625,13 +628,7 @@ describe('transport epic', () => { ), ); - matrix.emit('Room', { roomId, getMember: jest.fn(() => ({ membership: 'leave' })) }); - matrix.emit( - 'RoomMember.membership', - {}, - { roomId, userId: partnerUserId, membership: 'leave' }, - ); - + await promise; expect(matrix.invite).toHaveBeenCalledTimes(2); expect(matrix.invite).toHaveBeenCalledWith(roomId, partnerUserId); @@ -894,7 +891,7 @@ describe('transport epic', () => { action$.next(matrixRoom({ roomId }, { address: partner })); - matrix.getRoom.mockReturnValueOnce({ + matrix.getRoom.mockReturnValue({ roomId, name: roomId, getMember: jest.fn( @@ -933,10 +930,13 @@ describe('transport epic', () => { setTimeout(() => action$.complete(), 100); await expect(promise).resolves.toMatchObject( - messageSend.success(expect.anything(), { - address: partner, - msgId: signed.message_identifier.toString(), - }), + messageSend.success( + { via: expect.stringMatching(/^!/) }, + { + address: partner, + msgId: signed.message_identifier.toString(), + }, + ), ); expect(matrix.sendEvent).toHaveBeenCalledTimes(2); expect(matrix.sendEvent).toHaveBeenCalledWith( @@ -955,7 +955,7 @@ describe('transport epic', () => { action$.next(matrixRoom({ roomId }, { address: partner })); - matrix.getRoom.mockReturnValueOnce(null); + matrix.getRoom.mockReturnValue(null); const sub = matrixMessageSendEpic(action$, state$, depsMock).subscribe(); @@ -970,14 +970,28 @@ describe('transport epic', () => { expect(matrix.sendEvent).not.toHaveBeenCalled(); // a wild Room appears - matrix.emit('Room', { + matrix.getRoom.mockReturnValue({ roomId, name: roomId, - getMember: jest.fn(), - getJoinedMembers: jest.fn(), + getMember: jest.fn( + (userId) => + ({ + roomId, + userId, + name: userId, + membership: 'join', + user: null, + } as any), + ), + getJoinedMembers: jest.fn(() => []), getCanonicalAlias: jest.fn(() => roomId), getAliases: jest.fn(() => []), - }); + currentState: { + roomId, + setStateEvents: jest.fn(), + members: {}, + } as any, + } as any); // user joins later matrix.emit( @@ -1005,7 +1019,7 @@ describe('transport epic', () => { action$.next(matrixRoom({ roomId }, { address: partner })); - matrix.getRoom.mockReturnValueOnce({ + matrix.getRoom.mockReturnValue({ roomId, name: roomId, getMember: jest.fn( @@ -1056,6 +1070,53 @@ describe('transport epic', () => { expect.anything(), ); }); + + test('send: toDevice', async () => { + expect.assertions(4); + + const message = processed; + const signed = await signMessage(depsMock.signer, message); + + action$.next(raidenConfigUpdate({ caps: { [Capabilities.TO_DEVICE]: true } })); + // fail once, succeed on retry + matrix.sendToDevice.mockRejectedValueOnce(new Error('Failed')); + + const promise = matrixMessageSendEpic(action$, state$, depsMock).toPromise(); + + [ + matrixPresence.success( + { + userId: partnerUserId, + available: true, + ts: Date.now(), + caps: { [Capabilities.TO_DEVICE]: true }, + }, + { address: partner }, + ), + messageSend.request( + { message: signed }, + { address: partner, msgId: signed.message_identifier.toString() }, + ), + ].forEach((a) => action$.next(a)); + setTimeout(() => action$.complete(), 100); + + await expect(promise).resolves.toMatchObject( + messageSend.success( + { via: expect.stringMatching(/^@0x/) }, + { + address: partner, + msgId: signed.message_identifier.toString(), + }, + ), + ); + expect(matrix.sendEvent).not.toHaveBeenCalled(); + expect(matrix.sendToDevice).toHaveBeenCalledTimes(2); + expect(matrix.sendToDevice).toHaveBeenCalledWith('m.room.message', { + [partnerUserId]: { + '*': { body: expect.stringMatching('"Processed"'), msgtype: 'm.text' }, + }, + }); + }); }); describe('matrixMessageReceivedEpic', () => { @@ -1083,6 +1144,7 @@ describe('transport epic', () => { { getType: () => 'm.room.message', getSender: () => partnerUserId, + getContent: () => ({ msgtype: 'm.text', body: message }), event: { content: { msgtype: 'm.text', body: message }, origin_server_ts: 123, @@ -1138,6 +1200,7 @@ describe('transport epic', () => { { getType: () => 'm.room.message', getSender: () => partnerUserId, + getContent: () => ({ msgtype: 'm.text', body: message }), event: { content: { msgtype: 'm.text', body: message }, origin_server_ts: 123, @@ -1190,6 +1253,7 @@ describe('transport epic', () => { { getType: () => 'm.room.message', getSender: () => partnerUserId, + getContent: () => ({ msgtype: 'm.text', body: message }), event: { content: { msgtype: 'm.text', body: message }, origin_server_ts: 123, @@ -1212,6 +1276,51 @@ describe('transport epic', () => { ), ); }); + + test('receive: toDevice', async () => { + expect.assertions(1); + + const message = 'test message', + content = { msgtype: 'm.text', body: message }; + + action$.next(raidenConfigUpdate({ caps: { [Capabilities.TO_DEVICE]: true } })); + + const promise = matrixMessageReceivedEpic(action$, state$, depsMock) + .pipe(first()) + .toPromise(); + + matrix.emit('toDeviceEvent', { + getType: jest.fn(() => 'm.room.message'), + getSender: jest.fn(() => partnerUserId), + getContent: jest.fn(() => content), + event: { type: 'm.room.message', sender: partnerUserId, content }, + }); + + // actions sees presence update for partner only later + action$.next( + matrixPresence.success( + { + userId: partnerUserId, + available: true, + ts: Date.now(), + caps: { [Capabilities.TO_DEVICE]: true }, + }, + { address: partner }, + ), + ); + + // then it resolves + await expect(promise).resolves.toEqual( + messageReceived( + { + text: message, + ts: expect.any(Number), + userId: partnerUserId, + }, + { address: partner }, + ), + ); + }); }); describe('matrixMessageReceivedUpdateRoomEpic', () => { @@ -1534,7 +1643,7 @@ describe('transport epic', () => { msgtype: 'm.notice', body: jsonStringify({ type: 'offer', - call_id: `${partner}|${depsMock.address}`, + call_id: `${partner}|${depsMock.address}`.toLowerCase(), sdp: 'offerSdp', }), }), @@ -1564,7 +1673,7 @@ describe('transport epic', () => { { address: partner }, ), ); - }, 50); + }, 100); action$.next(matrixRoom({ roomId: partnerRoomId }, { address: partner })); action$.next( @@ -1663,6 +1772,7 @@ describe('transport epic', () => { ); Object.assign(rtcDataChannel, { readyState: 'open' }); rtcDataChannel.emit('open', true); + rtcDataChannel.emit('message', { data: 'ping' }); }, 40); setTimeout(() => { rtcDataChannel.emit('close', true); diff --git a/raiden-ts/tests/unit/mocks.ts b/raiden-ts/tests/unit/mocks.ts index 39cfecb14d..2f1083fcb6 100644 --- a/raiden-ts/tests/unit/mocks.ts +++ b/raiden-ts/tests/unit/mocks.ts @@ -251,6 +251,7 @@ export function makeMatrix(userId: string, server: string): jest.Mocked true), leave: jest.fn(async () => true), sendEvent: jest.fn(async () => true), + sendToDevice: jest.fn(async () => true), _http: { opts: {}, // mock request done by raiden/utils::getUserPresence @@ -277,6 +278,7 @@ export function makeMatrix(userId: string, server: string): jest.Mocked; const rtcConnection = (Object.assign(new EventEmitter(), { @@ -290,6 +292,7 @@ export function mockRTC() { /* remote */ }), addIceCandidate: jest.fn(), + close: jest.fn(), }) as unknown) as jest.Mocked; const RTCPeerConnection = jest @@ -739,9 +742,10 @@ function mockedMatrixCreateClient({ baseUrl }: { baseUrl: string }): jest.Mocked matrix.emit( 'Room.timeline', { - getType: () => type, - getSender: () => userId, - event: { content }, + getType: jest.fn(() => type), + getSender: jest.fn(() => userId), + getContent: jest.fn(() => content), + event: { type, sender: userId, content }, }, matrix.getRoom(roomId), ); @@ -749,6 +753,25 @@ function mockedMatrixCreateClient({ baseUrl }: { baseUrl: string }): jest.Mocked logging.info('__sendEvent', address, roomId, type, content); return true; }), + sendToDevice: jest.fn( + async (type: string, contentMap: { [userId: string]: { [deviceID: string]: any } }) => { + for (const [partnerId, map] of Object.entries(contentMap)) { + for (const client of mockedClients) { + const matrix = await client.deps.matrix$.toPromise(); + if (partnerId !== matrix.getUserId()) continue; + for (const content of Object.values(map)) { + matrix.emit('toDeviceEvent', { + getType: jest.fn(() => type), + getSender: jest.fn(() => userId), + getContent: jest.fn(() => content), + event: { type, sender: userId, content }, + }); + logging.info('__sendToDevice', address, type, content); + } + } + } + }, + ), _http: { opts: {}, // mock request done by raiden/utils::getUserPresence From 16770401a91d5c6328b2d357364fae404253b8ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Vitor=20de=20Lima=20Matos?= Date: Tue, 6 Oct 2020 12:30:24 -0300 Subject: [PATCH 8/9] sdk: add changelog entry --- raiden-ts/CHANGELOG.md | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/raiden-ts/CHANGELOG.md b/raiden-ts/CHANGELOG.md index cf4fb38608..7cf0252615 100644 --- a/raiden-ts/CHANGELOG.md +++ b/raiden-ts/CHANGELOG.md @@ -3,9 +3,11 @@ ## [Unreleased] ### Fixed - [#2094] Fix TransferState's timestamps missing +- [#2174] Fix a few transport issues triggered on high-load scenarios ### Added - [#2044] Introduce PouchDB (IndexedDB/leveldown) as new persistent state storage backend +- [#2204] Implement toDevice capability and messaging ### Changed - [#2158] Adapt WebRTC to new protocol compatible with python client @@ -13,6 +15,8 @@ [#2044]: https://github.com/raiden-network/light-client/issues/2044 [#2094]: https://github.com/raiden-network/light-client/issues/2094 [#2158]: https://github.com/raiden-network/light-client/issues/2158 +[#2174]: https://github.com/raiden-network/light-client/pull/2174 +[#2204]: https://github.com/raiden-network/light-client/issues/2204 ## [0.11.1] - 2020-08-18 ### Changed @@ -20,7 +24,7 @@ - [#2054] Update to Raiden contracts `v0.37.1` [#2049]: https://github.com/raiden-network/light-client/issues/2049 -[#2054]: https://github.com/raiden-network/light-client/pulls/2054 +[#2054]: https://github.com/raiden-network/light-client/pull/2054 ## [0.11.0] - 2020-08-04 From 4eb8e6b8fdc757b74e8804ccd90094a531a570ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Vitor=20de=20Lima=20Matos?= Date: Thu, 8 Oct 2020 09:50:46 -0300 Subject: [PATCH 9/9] sdk: PR review --- raiden-ts/src/transport/epics/init.ts | 6 +-- raiden-ts/src/transport/epics/messages.ts | 45 +++++++++++++---------- raiden-ts/src/transport/utils.ts | 12 ++++++ 3 files changed, 40 insertions(+), 23 deletions(-) diff --git a/raiden-ts/src/transport/epics/init.ts b/raiden-ts/src/transport/epics/init.ts index bf0b5bbe1a..b41f208d2f 100644 --- a/raiden-ts/src/transport/epics/init.ts +++ b/raiden-ts/src/transport/epics/init.ts @@ -34,7 +34,7 @@ import { fromFetch } from 'rxjs/fetch'; import sortBy from 'lodash/sortBy'; import isEmpty from 'lodash/isEmpty'; -import { createClient, MatrixClient, MatrixEvent } from 'matrix-js-sdk'; +import { createClient, MatrixClient, MatrixEvent, Filter } from 'matrix-js-sdk'; import { logger as matrixLogger } from 'matrix-js-sdk/lib/logger'; import { assert } from '../../utils'; @@ -97,7 +97,7 @@ function joinGlobalRooms(config: RaidenConfig, matrix: MatrixClient): Observable * @param roomIds - The ids of the rooms to filter out during sync. * @returns Observable of the {@link Filter} that was created. */ -async function createFilter(matrix: MatrixClient, roomIds: string[]) { +async function createMatrixFilter(matrix: MatrixClient, roomIds: string[]): Promise { const roomFilter = { not_rooms: roomIds, ephemeral: { @@ -132,7 +132,7 @@ function startMatrixSync( delayWhen(([, { pollingInterval }]) => timer(Math.ceil(pollingInterval / 5))), mergeMap(([, config]) => joinGlobalRooms(config, matrix).pipe( - mergeMap((roomIds) => createFilter(matrix, roomIds)), + mergeMap((roomIds) => createMatrixFilter(matrix, roomIds)), mergeMap((filter) => matrix.startClient({ filter })), retryWaitWhile( exponentialBackoff(config.pollingInterval, config.httpTimeout), diff --git a/raiden-ts/src/transport/epics/messages.ts b/raiden-ts/src/transport/epics/messages.ts index 13ed303846..2b5bddd61b 100644 --- a/raiden-ts/src/transport/epics/messages.ts +++ b/raiden-ts/src/transport/epics/messages.ts @@ -14,8 +14,9 @@ import { tap, } from 'rxjs/operators'; -import { MatrixEvent, Room } from 'matrix-js-sdk'; +import { MatrixClient, MatrixEvent, Room } from 'matrix-js-sdk'; +import { RaidenConfig } from '../../config'; import { Capabilities } from '../../constants'; import { Signed } from '../../utils/types'; import { isActionOf } from '../../utils/actions'; @@ -33,6 +34,7 @@ import { messageSend, messageReceived, messageGlobalSend } from '../../messages/ import { RaidenState } from '../../state'; import { getServerName } from '../../utils/matrix'; import { LruCache } from '../../utils/lru'; +import { getPresenceByUserId } from '../utils'; import { globalRoomNames, roomMatch, getRoom$, waitMemberAndSend$, parseMessage } from './helpers'; /** @@ -147,6 +149,25 @@ export const matrixMessageGlobalSendEpic = ( ignoreElements(), ); +// filter for text messages not from us and not from global rooms +function isValidMessage([{ matrix, event, room }, config]: [ + { matrix: MatrixClient; event: MatrixEvent; room: Room | undefined }, + RaidenConfig, +]): boolean { + const isTextMessage = + event.getType() === 'm.room.message' && + event.getContent().msgtype === 'm.text' && + event.getSender() !== matrix.getUserId(); + const isPrivateRoom = + !!room && + !globalRoomNames(config).some((g) => + // generate an alias for global room of given name, and check if room matches + roomMatch(`#${g}:${getServerName(matrix.getHomeserverUrl())}`, room), + ); + const isToDevice = !room && !!config.caps?.[Capabilities.TO_DEVICE]; // toDevice message + return isTextMessage && (isPrivateRoom || isToDevice); +} + /** * Subscribe to matrix messages and emits MessageReceivedAction upon receiving a valid message from * an user of interest (one valid signature from an address we monitor) in a room we have for them @@ -179,25 +200,11 @@ export const matrixMessageReceivedEpic = ( ), ), withLatestFrom(config$), - // filter for text messages not from us and not from global rooms - filter( - ([{ matrix, event, room }, config]) => - event.getType() === 'm.room.message' && - event.event?.content?.msgtype === 'm.text' && - event.getSender() !== matrix.getUserId() && - ((room && - !globalRoomNames(config).some((g) => - // generate an alias for global room of given name, and check if room matches - roomMatch(`#${g}:${getServerName(matrix.getHomeserverUrl())}`, room), - )) || - (!room && !!config.caps?.[Capabilities.TO_DEVICE])), // toDevice message - ), + filter(isValidMessage), mergeMap(([{ event, room }, { httpTimeout }]) => latest$.pipe( filter(({ presences, state }) => { - const presence = Object.values(presences).find( - (presence) => presence.payload.userId === event.getSender(), - ); + const presence = getPresenceByUserId(presences, event.getSender()); if (!presence) return false; const rooms = state.transport.rooms?.[presence.meta.address] ?? []; return !room || rooms.includes(room.roomId); @@ -207,9 +214,7 @@ export const matrixMessageReceivedEpic = ( // AND the room in which this message was sent to be in sender's address room queue takeUntil(timer(httpTimeout)), mergeMap(function* ({ presences }) { - const presence = Object.values(presences).find( - (presence) => presence.payload.userId === event.getSender(), - )!; + const presence = getPresenceByUserId(presences, event.getSender())!; for (const line of (event.getContent().body ?? '').split('\n')) { const message = parseMessage(line, presence.meta.address, { log }); yield messageReceived( diff --git a/raiden-ts/src/transport/utils.ts b/raiden-ts/src/transport/utils.ts index 5ef4174a03..f707c6c6eb 100644 --- a/raiden-ts/src/transport/utils.ts +++ b/raiden-ts/src/transport/utils.ts @@ -32,6 +32,18 @@ export const getPresences$: (action$: Observable) => Observable presence.payload.userId === userId); +} + /** * Stringify a caps mapping *