Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds toDevice capability to matrix transport #2174

Merged
merged 9 commits into from
Oct 8, 2020
161 changes: 61 additions & 100 deletions raiden-ts/src/transport/epics/helpers.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
import { Observable, of, EMPTY, fromEvent, timer, throwError, defer, merge } from 'rxjs';
import { Observable, of, fromEvent, timer, throwError, combineLatest } from 'rxjs';
import {
distinctUntilChanged,
filter,
map,
mergeMap,
withLatestFrom,
switchMap,
take,
mapTo,
pluck,
retryWhen,
startWith,
pluck,
} from 'rxjs/operators';
import curry from 'lodash/curry';
import { Room, MatrixClient, EventType, RoomMember, MatrixEvent } from 'matrix-js-sdk';
import { Room, MatrixClient, EventType } from 'matrix-js-sdk';

import { Capabilities } from '../../constants';
import { RaidenConfig } from '../../config';
import { RaidenEpicDeps } from '../../types';
import { isntNil, Address, Signed } from '../../utils/types';
import { RaidenError, ErrorCodes } from '../../utils/error';
import { pluckDistinct } from '../../utils/rx';
import { Message } from '../../messages/types';
import { decodeJsonMessage, getMessageSigner } from '../../messages/utils';

Expand Down Expand Up @@ -68,49 +68,26 @@ function waitMember$(
address: Address,
{ latest$ }: Pick<RaidenEpicDeps, 'latest$'>,
) {
return latest$.pipe(
map(({ state }) => state.transport.rooms?.[address]?.[0]),
// wait for a room to exist (created or invited) for address
filter(isntNil),
distinctUntilChanged(),
// this switchMap unsubscribes from previous "wait" if first room for address changes
switchMap((roomId) =>
// get/wait room object for roomId
// may wait for the room state to be populated (happens after createRoom resolves)
getRoom$(matrix, roomId).pipe(
mergeMap((room) =>
// wait for address to be monitored & online (after getting Room for address)
// latest$ ensures it happens immediatelly if all conditions are satisfied
latest$.pipe(
pluck('presences', address),
map((presence) =>
presence?.payload?.available ? presence.payload.userId : undefined,
),
distinctUntilChanged(),
map((userId) => ({ room, userId })),
),
),
// when user is online, get room member for partner's userId
// this switchMap unsubscribes from previous wait if userId changes or go offline
switchMap(({ room, userId }) => {
if (!userId) return EMPTY; // user not monitored or not available
const member = room.getMember(userId);
// if it already joined room, return its membership
if (member && member.membership === 'join') return of(member);
// else, wait for the user to join/accept invite
return fromEvent<[MatrixEvent, RoomMember]>(matrix, 'RoomMember.membership').pipe(
pluck(1),
filter(
(member) =>
member.roomId === room.roomId &&
member.userId === userId &&
member.membership === 'join',
),
);
}),
pluck('roomId'),
),
return combineLatest([
latest$.pipe(
pluckDistinct('presences', address),
filter((presence) => !!presence?.payload?.available),
),
latest$.pipe(
map(({ state }) => state.transport.rooms?.[address]?.[0]),
// wait for a room to exist (created or invited) for address
filter(isntNil),
distinctUntilChanged(),
),
fromEvent(matrix, 'Room').pipe(startWith(null)),
fromEvent(matrix, 'RoomMember.membership').pipe(startWith(null)),
]).pipe(
filter(
([presence, roomId]) =>
matrix.getRoom(roomId)?.getMember?.(presence.payload.userId)?.membership === 'join',
),
pluck(1),
take(1),
);
}

Expand All @@ -137,59 +114,43 @@ export function waitMemberAndSend$<C extends { msgtype: string; body: string }>(
allowRtc = false,
): Observable<string> {
const RETRY_COUNT = 3; // is this relevant enough to become a constant/setting?
return merge(
// if available & open, use channel
latest$.pipe(
filter(
({ presences, rtc }) =>
allowRtc &&
address in presences &&
presences[address].payload.available &&
rtc[address]?.readyState === 'open',
),
pluck('rtc', address),
),
// if available and Capabilities.TO_DEVICE enabled on both ends, use ToDevice messages
latest$.pipe(
pluck('presences', address),
withLatestFrom(config$),
filter(
([presence, { caps }]) =>
!!caps?.[Capabilities.TO_DEVICE] &&
!!presence?.payload?.available &&
!!presence.payload.caps?.[Capabilities.TO_DEVICE],
),
pluck('0', 'payload', 'userId'),
),
waitMember$(matrix, address, { latest$ }),
).pipe(
take(1), // use first room/user which meets all requirements/filters above
mergeMap((via) =>
defer<Promise<unknown>>(
async () =>
typeof via !== 'string'
? via.send(content.body) // via RTC channel
: via.startsWith('@')
? matrix.sendToDevice(type, { [via]: { '*': content } }) // via toDevice message
: matrix.sendEvent(via, type, content, ''), // via room
).pipe(
// this returned value is just for notification, and shouldn't be relayed on;
// all functionality is provided as side effects of the subscription
mapTo(typeof via === 'string' ? via : via.label),
retryWhen((err$) =>
// if sendEvent throws, omit & retry after pollingInterval
// up to RETRY_COUNT times; if it continues to error, throws down
err$.pipe(
withLatestFrom(config$),
mergeMap(([err, { pollingInterval }], i) => {
if (i < RETRY_COUNT - 1) {
log.warn(`messageSend error, retrying ${i + 1}/${RETRY_COUNT}`, err);
return timer(pollingInterval);
// give up
} else return throwError(err);
}),
),
),
return latest$.pipe(
filter(({ presences }) => !!presences[address]?.payload?.available),
take(1),
withLatestFrom(config$),
mergeMap(([{ presences, rtc }, { caps }]) => {
// if available & open, use channel
if (allowRtc && rtc[address]?.readyState === 'open') return of(rtc[address]);
// if available and Capabilities.TO_DEVICE enabled on both ends, use ToDevice messages
if (
caps?.[Capabilities.TO_DEVICE] &&
presences[address].payload.caps?.[Capabilities.TO_DEVICE]
)
return of(presences[address].payload.userId);
return waitMember$(matrix, address, { latest$ });
}),
mergeMap(async (via) => {
if (typeof via !== 'string') via.send(content.body);
// via RTC channel
else if (via.startsWith('@')) await matrix.sendToDevice(type, { [via]: { '*': content } });
// via toDevice message
else await matrix.sendEvent(via, type, content, ''); // via room
// this returned value is just for notification, and shouldn't be relayed on;
// all functionality is provided as side effects of the subscription
return typeof via !== 'string' ? via.label : via;
}),
retryWhen((err$) =>
// if sendEvent throws, omit & retry since first 'latest$' after pollingInterval
// up to RETRY_COUNT times; if it continues to error, throws down
err$.pipe(
withLatestFrom(config$),
mergeMap(([err, { pollingInterval }], i) => {
if (i < RETRY_COUNT - 1) {
log.warn(`messageSend error, retrying ${i + 1}/${RETRY_COUNT}`, err);
return timer(pollingInterval);
// give up
} else return throwError(err);
}),
),
),
);
Expand Down
88 changes: 47 additions & 41 deletions raiden-ts/src/transport/epics/init.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ import {
merge,
defer,
AsyncSubject,
timer,
} from 'rxjs';
import {
catchError,
concatMap,
delay,
filter,
ignoreElements,
map,
Expand All @@ -28,12 +28,13 @@ import {
pluck,
throwIfEmpty,
retryWhen,
delayWhen,
} from 'rxjs/operators';
import { fromFetch } from 'rxjs/fetch';
import sortBy from 'lodash/sortBy';
import isEmpty from 'lodash/isEmpty';

import { createClient, MatrixClient, MatrixEvent, Filter } from 'matrix-js-sdk';
import { createClient, MatrixClient, MatrixEvent } from 'matrix-js-sdk';
import { logger as matrixLogger } from 'matrix-js-sdk/lib/logger';

import { assert } from '../../utils';
Expand All @@ -43,7 +44,8 @@ import { RaidenAction } from '../../actions';
import { RaidenConfig } from '../../config';
import { RaidenState } from '../../state';
import { getServerName } from '../../utils/matrix';
import { pluckDistinct, retryAsync$ } from '../../utils/rx';
import { pluckDistinct, retryAsync$, retryWaitWhile } from '../../utils/rx';
import { exponentialBackoff } from '../../transfers/epics/utils';
import { matrixSetup } from '../actions';
import { RaidenMatrixSetup } from '../state';
import { Caps } from '../types';
Expand Down Expand Up @@ -95,23 +97,21 @@ function joinGlobalRooms(config: RaidenConfig, matrix: MatrixClient): Observable
* @param roomIds - The ids of the rooms to filter out during sync.
* @returns Observable of the {@link Filter} that was created.
*/
function createFilter(matrix: MatrixClient, roomIds: string[]): Observable<Filter> {
return defer(() => {
const roomFilter = {
not_rooms: roomIds,
ephemeral: {
not_types: ['m.receipt', 'm.typing'],
},
timeline: {
limit: 0,
not_senders: [matrix.getUserId()!],
},
};
const filterDefinition = {
room: roomFilter,
};
return matrix.createFilter(filterDefinition);
});
async function createFilter(matrix: MatrixClient, roomIds: string[]) {
weilbith marked this conversation as resolved.
Show resolved Hide resolved
andrevmatos marked this conversation as resolved.
Show resolved Hide resolved
const roomFilter = {
not_rooms: roomIds,
ephemeral: {
not_types: ['m.receipt', 'm.typing'],
},
timeline: {
limit: 0,
not_senders: [matrix.getUserId()!],
},
};
const filterDefinition = {
room: roomFilter,
};
return matrix.createFilter(filterDefinition);
}

function startMatrixSync(
Expand All @@ -127,12 +127,17 @@ function startMatrixSync(
matrix$.next(matrix);
matrix$.complete();
}),
delay(1e3), // wait 1s before starting matrix, so event listeners can be registered
withLatestFrom(config$),
// wait 1s before starting matrix, so event listeners can be registered
delayWhen(([, { pollingInterval }]) => timer(Math.ceil(pollingInterval / 5))),
mergeMap(([, config]) =>
joinGlobalRooms(config, matrix).pipe(
mergeMap((roomIds) => createFilter(matrix, roomIds)),
mergeMap((filter) => matrix.startClient({ filter })),
retryWaitWhile(
exponentialBackoff(config.pollingInterval, config.httpTimeout),
(err) => err?.httpStatus !== 429, // retry rate-limit errors only
),
),
),
ignoreElements(),
Expand Down Expand Up @@ -246,7 +251,7 @@ function setupMatrixClient$(
accessToken: setup.accessToken,
deviceId: setup.deviceId,
});
return of({ matrix, server, setup });
return of({ matrix, server, setup, pollingInterval });
} else {
const matrix = createClient({ baseUrl: server });
const userName = address.toLowerCase(),
Expand Down Expand Up @@ -291,31 +296,32 @@ function setupMatrixClient$(
accessToken: access_token,
deviceId: device_id,
displayName: signedUserId,
} as RaidenMatrixSetup,
},
pollingInterval,
})),
);
}),
// the APIs below are authenticated, and therefore also act as validator
mergeMap(({ matrix, server, setup }) =>
// set these properties before starting sync
merge(
retryAsync$(
() => matrix.setDisplayName(setup.displayName),
pollingInterval,
(err) => err?.httpStatus !== 429,
),
retryAsync$(
() => matrix.setAvatarUrl(caps && !isEmpty(caps) ? stringifyCaps(caps) : ''),
pollingInterval,
(err) => err?.httpStatus !== 429,
),
).pipe(
mapTo({ matrix, server, setup }), // return triplet again
),
),
);
}
}),
// the APIs below are authenticated, and therefore also act as validator
mergeMap(({ matrix, server, setup, pollingInterval }) =>
// set these properties before starting sync
combineLatest([
retryAsync$(
() => matrix.setDisplayName(setup.displayName),
pollingInterval,
(err) => err?.httpStatus !== 429,
),
retryAsync$(
() => matrix.setAvatarUrl(caps && !isEmpty(caps) ? stringifyCaps(caps) : ''),
pollingInterval,
(err) => err?.httpStatus !== 429,
),
]).pipe(
mapTo({ matrix, server, setup }), // return triplet again
),
),
);
}

Expand Down
Loading