From 1c400f28b36d2fb91a2a15b3a9697345da7f95f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Vitor=20de=20Lima=20Matos?= Date: Wed, 24 Jun 2020 22:58:14 -0300 Subject: [PATCH 1/3] sdk: Refactor channel event monitoring, rename channelMonitor[ed] action --- raiden-ts/src/channels/actions.ts | 9 +- raiden-ts/src/channels/epics.ts | 698 ++++++++++++---------- raiden-ts/src/raiden.ts | 8 +- raiden-ts/src/services/epics.ts | 2 +- raiden-ts/src/transport/epics/presence.ts | 4 +- raiden-ts/src/transport/epics/rooms.ts | 8 +- raiden-ts/src/utils/ethers.ts | 100 ++-- raiden-ts/src/utils/rx.ts | 4 +- 8 files changed, 468 insertions(+), 365 deletions(-) diff --git a/raiden-ts/src/channels/actions.ts b/raiden-ts/src/channels/actions.ts index 34afd3c56b..5bc82835d0 100644 --- a/raiden-ts/src/channels/actions.ts +++ b/raiden-ts/src/channels/actions.ts @@ -28,6 +28,7 @@ export const tokenMonitored = createAction( }), t.partial({ fromBlock: t.number, + toBlock: t.number, }), ]), ); @@ -60,12 +61,12 @@ export namespace channelOpen { } /* Channel with meta:ChannelId + payload.id should be monitored */ -export const channelMonitor = createAction( - 'channel/monitor', - t.intersection([t.type({ id: t.number }), t.partial({ fromBlock: t.number })]), +export const channelMonitored = createAction( + 'channel/monitored', + t.type({ id: t.number }), ChannelId, ); -export interface channelMonitor extends ActionType {} +export interface channelMonitored extends ActionType {} export const channelDeposit = createAsyncAction( ChannelId, diff --git a/raiden-ts/src/channels/epics.ts b/raiden-ts/src/channels/epics.ts index 7af62755cd..3a355abaac 100644 --- a/raiden-ts/src/channels/epics.ts +++ b/raiden-ts/src/channels/epics.ts @@ -10,6 +10,7 @@ import { combineLatest, timer, throwError, + AsyncSubject, } from 'rxjs'; import { catchError, @@ -17,9 +18,7 @@ import { map, mergeMap, withLatestFrom, - groupBy, exhaustMap, - first, take, mapTo, pluck, @@ -27,19 +26,19 @@ import { ignoreElements, skip, retryWhen, - takeUntil, - repeatWhen, - takeLast, mergeMapTo, + first, + delayWhen, + finalize, } from 'rxjs/operators'; import findKey from 'lodash/findKey'; +import sortBy from 'lodash/sortBy'; import isEmpty from 'lodash/isEmpty'; -import identity from 'lodash/identity'; import { BigNumber, concat, defaultAbiCoder } from 'ethers/utils'; import { Event } from 'ethers/contract'; import { Zero } from 'ethers/constants'; -import { Filter } from 'ethers/providers'; +import { Filter, Log } from 'ethers/providers'; import { RaidenEpicDeps } from '../types'; import { RaidenAction, raidenShutdown, ConfirmableAction } from '../actions'; @@ -48,19 +47,19 @@ import { ShutdownReason } from '../constants'; import { chooseOnchainAccount, getContractWithSigner } from '../helpers'; import { Address, Hash, UInt, Signature, isntNil, HexString } from '../utils/types'; import { isActionOf } from '../utils/actions'; -import { pluckDistinct } from '../utils/rx'; -import { fromEthersEvent, getEventsStream, getNetwork } from '../utils/ethers'; +import { pluckDistinct, distinctRecordValues } from '../utils/rx'; +import { fromEthersEvent, getNetwork, logToContractEvent } from '../utils/ethers'; import { encode } from '../utils/data'; import { RaidenError, ErrorCodes, assert } from '../utils/error'; import { createBalanceHash, MessageTypeId } from '../messages/utils'; import { TokenNetwork } from '../contracts/TokenNetwork'; import { HumanStandardToken } from '../contracts/HumanStandardToken'; import { findBalanceProofMatchingBalanceHash } from '../transfers/utils'; -import { ChannelState, Channel } from './state'; +import { ChannelState } from './state'; import { newBlock, tokenMonitored, - channelMonitor, + channelMonitored, channelOpen, channelDeposit, channelClose, @@ -68,7 +67,7 @@ import { channelSettleable, channelWithdrawn, } from './actions'; -import { assertTx, channelKey, groupChannel$ } from './utils'; +import { assertTx, channelKey, groupChannel$, channelUniqueKey } from './utils'; /** * Receives an async function and returns an observable which will retry it every interval until it @@ -110,9 +109,8 @@ export const initNewBlockEpic = ( ); /** - * On first run, scan registry and token networks for registered TokenNetworks of interest - * (ones which has/had channels with us) and monitors them. On next runs, just monitors the - * previously monitored ones. + * If state.tokens is empty (usually only on first run), scan registry and token networks for + * registered TokenNetworks of interest (ones which has/had channels with us) and monitors them. * * @param action$ - Observable of RaidenActions * @param state$ - Observable of RaidenStates @@ -130,95 +128,64 @@ export const initTokensRegistryEpic = ( ): Observable => state$.pipe( take(1), - mergeMap((state) => { - const encodedAddress = defaultAbiCoder.encode(['address'], [address]); - // if tokens are already initialized, use it - if (!isEmpty(state.tokens)) - return from( - (Object.entries(state.tokens) as [Address, Address][]).map(([token, tokenNetwork]) => - tokenMonitored({ token, tokenNetwork }), - ), - ); - // else, do an initial registry scan, from deploy to now - else - return retryAsync$( - () => - provider.getLogs({ - ...registryContract.filters.TokenNetworkCreated(null, null), - fromBlock: contractsInfo.TokenNetworkRegistry.block_number, - toBlock: 'latest', - }), - provider.pollingInterval, + filter((state) => isEmpty(state.tokens)), // proceed to scan only if state.tokens isEmpty + mergeMap(() => + retryAsync$( + () => + provider.getLogs({ + ...registryContract.filters.TokenNetworkCreated(null, null), + fromBlock: contractsInfo.TokenNetworkRegistry.block_number, + toBlock: 'latest', + }), + provider.pollingInterval, + ), + ), + mergeMap(from), + map((log) => ({ log, parsed: registryContract.interface.parseLog(log) })), + filter(({ parsed }) => !!parsed.values?.token_network_address), + // for each TokenNetwork found, scan for channels with us + mergeMap( + ({ log, parsed }) => { + const encodedAddress = defaultAbiCoder.encode(['address'], [address]); + return concat$( + // concat channels opened by us and to us separately + // take(1) won't subscribe the later if something is found on former + retryAsync$( + () => + provider.getLogs({ + // filter equivalent to tokenNetworkContract.filter.ChannelOpened() + address: parsed.values.token_network_address, + topics: [null, null, encodedAddress] as string[], // channels from us + fromBlock: log.blockNumber!, + toBlock: 'latest', + }), + provider.pollingInterval, + ).pipe(mergeMap(from)), + retryAsync$( + () => + provider.getLogs({ + address: parsed.values.token_network_address, + topics: [null, null, null, encodedAddress] as string[], // channels to us + fromBlock: log.blockNumber!, + toBlock: 'latest', + }), + provider.pollingInterval, + ).pipe(mergeMap(from)), ).pipe( - mergeMap((logs) => from(logs)), - map((log) => ({ log, parsed: registryContract.interface.parseLog(log) })), - filter(({ parsed }) => !!parsed.values?.token_network_address), - // for each TokenNetwork found, scan for channels with us - mergeMap( - ({ log, parsed }) => - concat$( - // concat channels opened by us and to us separately - // take(1) won't subscribe the later if something is found on former - retryAsync$( - () => - provider.getLogs({ - address: parsed.values.token_network_address, - topics: [null, null, encodedAddress] as string[], // channels from us - fromBlock: - log.blockNumber ?? contractsInfo.TokenNetworkRegistry.block_number, - toBlock: 'latest', - }), - provider.pollingInterval, - ).pipe(mergeMap(from)), - retryAsync$( - () => - provider.getLogs({ - address: parsed.values.token_network_address, - topics: [null, null, null, encodedAddress] as string[], // channels to us - fromBlock: log.blockNumber!, - toBlock: 'latest', - }), - provider.pollingInterval, - ).pipe(mergeMap(from)), - ).pipe( - // if found at least one, register this TokenNetwork as of interest - // else, do nothing - take(1), - mapTo( - tokenMonitored({ - token: parsed.values.token_address, - tokenNetwork: parsed.values.token_network_address, - fromBlock: log.blockNumber!, - }), - ), - ), - 5, // limit concurrency, don't hammer the node with hundreds of parallel getLogs + // if found at least one, register this TokenNetwork as of interest + // else, do nothing + take(1), + mapTo( + tokenMonitored({ + token: parsed.values.token_address, + tokenNetwork: parsed.values.token_network_address, + fromBlock: log.blockNumber!, + }), ), ); - }), - ); - -/** - * Monitor channels previously already on state - * - * @param action$ - Observable of RaidenActions - * @param state$ - Observable of RaidenStates - * @returns Observable of channelMonitor actions - */ -export const initMonitorChannelsEpic = ( - {}: Observable, - state$: Observable, -): Observable => - state$.pipe( - first(), - mergeMap(function* (state) { - for (const channel of Object.values(state.channels)) { - yield channelMonitor( - { id: channel.id }, - { tokenNetwork: channel.tokenNetwork, partner: channel.partner.address }, - ); - } - }), + }, + 5, // limit concurrency, don't hammer the node with hundreds of parallel getLogs + ), ); /** @@ -274,90 +241,9 @@ export const initMonitorProviderEpic = ( ), ); -/** - * Starts monitoring a token network for events - * When this action goes through (because a former or new token registry event was deteceted), - * subscribe to events and emit respective actions to the stream. Currently: - * - ChannelOpened events with us or by us - * - * @param action$ - Observable of tokenMonitored actions - * @param state$ - Observable of RaidenStates - * @param deps - RaidenEpicDeps members - * @param deps.address - Our address - * @param deps.getTokenNetworkContract - TokenNetwork contract instance getter - * @returns Observable of channelOpen.success actions - */ -export const tokenMonitoredEpic = ( - action$: Observable, - {}: Observable, - { address, getTokenNetworkContract }: RaidenEpicDeps, -): Observable => - action$.pipe( - filter(isActionOf(tokenMonitored)), - groupBy((action) => action.payload.tokenNetwork), - mergeMap((grouped$) => - grouped$.pipe( - exhaustMap((action) => { - const tokenNetworkContract = getTokenNetworkContract(action.payload.tokenNetwork); - - // type of elements emitted by getEventsStream (past and new events coming from - // contract): [channelId, partner1, partner2, settleTimeout, Event] - return getEventsStream<[BigNumber, Address, Address, BigNumber, Event]>( - tokenNetworkContract, - // it's cheaper for monitoring to fetch all channels and filter client-side, - // than to query/create/request 2 filters on every block (from and to us) - [tokenNetworkContract.filters.ChannelOpened(null, null, null, null)], - // if first time monitoring this token network, - // fetch TokenNetwork's pastEvents since registry deployment as fromBlock$ - action.payload.fromBlock ? of(action.payload.fromBlock) : undefined, - ).pipe( - filter(([, p1, p2]) => p1 === address || p2 === address), - map(([id, p1, p2, settleTimeout, event]) => - channelOpen.success( - { - id: id.toNumber(), - token: action.payload.token, - settleTimeout: settleTimeout.toNumber(), - isFirstParticipant: address === p1, - txHash: event.transactionHash! as Hash, - txBlock: event.blockNumber!, - confirmed: undefined, - }, - { - tokenNetwork: tokenNetworkContract.address as Address, - partner: address === p1 ? p2 : p1, - }, - ), - ), - ); - }), - ), - ), - ); - -/** - * When we see a new ChannelOpenedAction event, starts monitoring channel - * - * @param action$ - Observable of channelOpen.success actions - * @returns Observable of channelMonitor actions - */ -export const channelOpenedEpic = (action$: Observable): Observable => - action$.pipe( - filter(isActionOf(channelOpen.success)), - filter((action) => !!action.payload.confirmed), - map((action) => - channelMonitor( - { - id: action.payload.id, - // fetch past events as well, if needed, including events before confirmation - fromBlock: action.payload.txBlock, - }, - action.meta, - ), - ), - ); - -// type of elements emitted by getEventsStream (past and new events coming from contract): +// type of elements mapped from contract-emitted events/logs +// [channelId, participant1, participant2, settleTimeout, Event] +type ChannelOpenedEvent = [BigNumber, Address, Address, BigNumber, Event]; // [channelId, participant, totalDeposit, Event] type ChannelNewDepositEvent = [BigNumber, Address, UInt<32>, Event]; // [channelId, participant, totalWithdraw, Event] @@ -367,6 +253,7 @@ type ChannelClosedEvent = [BigNumber, Address, UInt<8>, Hash, Event]; // [channelId, part1_amount, part1_locksroot, part2_amount, part2_locksroot Event] type ChannelSettledEvent = [BigNumber, UInt<32>, Hash, UInt<32>, Hash, Event]; type ChannelEvents = + | ChannelOpenedEvent | ChannelNewDepositEvent | ChannelWithdrawEvent | ChannelClosedEvent @@ -375,6 +262,7 @@ type ChannelEvents = function getChannelEventsTopics(tokenNetworkContract: TokenNetwork) { const events = tokenNetworkContract.interface.events; return { + openTopic: events.ChannelOpened.topic, depositTopic: events.ChannelNewDeposit.topic, withdrawTopic: events.ChannelWithdraw.topic, closedTopic: events.ChannelClosed.topic, @@ -382,141 +270,326 @@ function getChannelEventsTopics(tokenNetworkContract: TokenNetwork) { }; } -function mapChannelEvents(tokenNetworkContract: TokenNetwork, partner: Address) { - const { depositTopic, withdrawTopic, closedTopic, settledTopic } = getChannelEventsTopics( - tokenNetworkContract, - ); - const meta = { tokenNetwork: tokenNetworkContract.address as Address, partner }; - return ([data, channel]: [ChannelEvents, Channel]) => { - const event = data[data.length - 1] as Event; - const topic = event.topics?.[0]; - const id = data[0].toNumber(); - let action; - switch (topic) { - case depositTopic: { - const [, participant, totalDeposit] = data as ChannelNewDepositEvent; - const end = participant === partner ? 'partner' : 'own'; - if (totalDeposit.lte(channel[end].deposit)) break; - action = channelDeposit.success( - { - id, - participant, - totalDeposit, - txHash: event.transactionHash! as Hash, - txBlock: event.blockNumber!, - confirmed: undefined, - }, - meta, - ); - break; - } - case withdrawTopic: { - const [, participant, totalWithdraw] = data as ChannelWithdrawEvent; - const end = participant === partner ? 'partner' : 'own'; - if (totalWithdraw.lte(channel[end].withdraw)) break; - action = channelWithdrawn( - { - id, - participant, - totalWithdraw, - txHash: event.transactionHash! as Hash, - txBlock: event.blockNumber!, - confirmed: undefined, - }, - meta, - ); - break; - } - case closedTopic: { - if ('closeBlock' in channel) break; - const [, participant] = data as ChannelClosedEvent; - action = channelClose.success( - { - id, - participant, - txHash: event.transactionHash! as Hash, - txBlock: event.blockNumber!, - confirmed: undefined, - }, - meta, +function mapChannelEventsToAction( + [token, tokenNetwork]: [Address, Address], + { address, latest$, getTokenNetworkContract }: RaidenEpicDeps, +) { + const tokenNetworkContract = getTokenNetworkContract(tokenNetwork); + const { + openTopic, + depositTopic, + withdrawTopic, + closedTopic, + settledTopic, + } = getChannelEventsTopics(tokenNetworkContract); + return (input$: Observable) => + input$.pipe( + withLatestFrom(latest$), + map(([args, { state, config }]) => { + const id = args[0].toNumber(); + const channel = Object.values(state.channels).find( + (c) => c.tokenNetwork === tokenNetwork && c.id === id, ); - break; - } - case settledTopic: { - action = channelSettle.success( - { - id, - txHash: event.transactionHash! as Hash, - txBlock: event.blockNumber!, - confirmed: undefined, - locks: channel.partner.locks, - }, - meta, - ); - break; - } - } - return action; // action isn't any, it gets its type from assignments above + // partner will be defined whenever id refers to a past fetched, confirmed and persisted + // event/channel or an open event seen on this session and still confirming; + // if it's undefined, this channel is unknown/not with us, and should be filtered out + const partner = + channel?.partner?.address || + state.pendingTxs.filter(channelOpen.success.is).find((a) => a.payload.id === id)?.meta + .partner; + + const event = args[args.length - 1] as Event; + const topic = event.topics?.[0]; + const txHash = event.transactionHash! as Hash; + const txBlock = event.blockNumber!; + const confirmed = + txBlock + config.confirmationBlocks <= state.blockNumber ? true : undefined; + + let action; + switch (topic) { + case openTopic: { + const [, p1, p2, settleTimeout] = args as ChannelOpenedEvent; + // filter out open events not with us + if ((address === p1 || address === p2) && (!channel || id > channel.id)) { + const partner = address == p1 ? p2 : p1; + action = channelOpen.success( + { + id, + token: token as Address, + settleTimeout: settleTimeout.toNumber(), + isFirstParticipant: address === p1, + txHash, + txBlock, + confirmed, + }, + { tokenNetwork, partner }, + ); + } + break; + } + case depositTopic: { + const [, participant, totalDeposit] = args as ChannelNewDepositEvent; + if ( + partner && + (channel?.id !== id || + totalDeposit.gt( + channel[participant === channel.partner.address ? 'partner' : 'own'].deposit, + )) + ) + action = channelDeposit.success( + { id, participant, totalDeposit, txHash, txBlock, confirmed }, + { tokenNetwork, partner }, + ); + break; + } + case withdrawTopic: { + const [, participant, totalWithdraw] = args as ChannelWithdrawEvent; + if ( + partner && + (channel?.id !== id || + totalWithdraw.gt( + channel[participant === channel.partner.address ? 'partner' : 'own'].withdraw, + )) + ) + action = channelWithdrawn( + { id, participant, totalWithdraw, txHash, txBlock, confirmed }, + { tokenNetwork, partner }, + ); + break; + } + case closedTopic: { + if (partner && (channel?.id !== id || !('closeBlock' in channel))) { + const [, participant] = args as ChannelClosedEvent; + action = channelClose.success( + { id, participant, txHash, txBlock, confirmed }, + { tokenNetwork, partner }, + ); + } + break; + } + case settledTopic: { + // settle may only happen more tha confirmation blocks after opening, so be stricter + if (channel?.id === id) + action = channelSettle.success( + { id, txHash, txBlock, confirmed, locks: channel.partner.locks }, + { tokenNetwork, partner: channel.partner.address }, + ); + break; + } + } + return action; // action isn't any, it gets its type from assignments above + }), + filter(isntNil), + ); +} + +function fetchPastChannelEvents$( + [fromBlock, toBlock]: [number, number], + [token, tokenNetwork]: [Address, Address], + deps: RaidenEpicDeps, +) { + const { address, provider, latest$, getTokenNetworkContract } = deps; + const tokenNetworkContract = getTokenNetworkContract(tokenNetwork); + const { openTopic } = getChannelEventsTopics(tokenNetworkContract); + + // start by scanning [fromBlock, toBlock] interval for ChannelOpened events limited to or from us + return retryAsync$( + () => + Promise.all([ + provider.getLogs({ + ...tokenNetworkContract.filters.ChannelOpened(null, address, null, null), + fromBlock, + toBlock, + }), + provider.getLogs({ + ...tokenNetworkContract.filters.ChannelOpened(null, null, address, null), + fromBlock, + toBlock, + }), + ]), + provider.pollingInterval, + ).pipe( + withLatestFrom(latest$), + mergeMap(([[logs1, logs2], { state }]) => { + // map Log to ContractEvent and filter out channels which we know are already gone + const openEvents = logs1 + .concat(logs2) + .map(logToContractEvent(tokenNetworkContract)) + .filter(isntNil) + .filter(([_id, p1, p2]) => { + const partner = address === p1 ? p2 : p1; + const id = _id.toNumber(); + const key = channelKey({ tokenNetwork, partner }); + // filter out settled or old channels, no new event could come from it + return !( + channelUniqueKey({ id, tokenNetwork, partner }) in state.oldChannels || + (key in state.channels && id < state.channels[key].id) + ); + }); + const channelIds = [ + ...openEvents, // use new past openEvents ids + ...Object.values(state.channels) + .filter((c) => c.tokenNetwork === tokenNetwork) + .map((c) => [c.id]), // use previous confirmed channels ids + ].map(([id]) => defaultAbiCoder.encode(['uint256'], [id])); + if (channelIds.length === 0) return EMPTY; + + // get all events of interest in the block range for all channelIds from open events above + return retryAsync$( + () => + provider.getLogs({ + address: tokenNetwork, + topics: [ + // events of interest as topics[0], without open events (already fetched above) + Object.values(getChannelEventsTopics(tokenNetworkContract)).filter( + (t) => t !== openTopic, + ), + channelIds, // ORed channelIds set as topics[1]=channelId + ], + fromBlock, + toBlock, + }), + provider.pollingInterval, + ).pipe( + // synchronously sort/interleave open|(deposit|withdraw|close|settle) events, and unwind + mergeMap((logs) => { + const otherEvents = logs + .map( + logToContractEvent>(tokenNetworkContract), + ) + .filter(isntNil); + const allEvents = [...openEvents, ...otherEvents]; + return from(sortBy(allEvents, (args) => (args[args.length - 1] as Event).blockNumber)); + }), + ); + }), + mapChannelEventsToAction([token, tokenNetwork], deps), + ); +} + +function fetchNewChannelEvents$([token, tokenNetwork]: [Address, Address], deps: RaidenEpicDeps) { + const { provider, getTokenNetworkContract } = deps; + const tokenNetworkContract = getTokenNetworkContract(tokenNetwork); + + // this mapping is needed to handle channel events emitted before open is confirmed/stored + const channelFilter: Filter = { + address: tokenNetwork, + // set only topics[0], to get also open events (new ids); filter client-side + topics: [Object.values(getChannelEventsTopics(tokenNetworkContract))], }; + + return fromEthersEvent(provider, channelFilter).pipe( + map(logToContractEvent(tokenNetworkContract)), + filter(isntNil), + mapChannelEventsToAction([token, tokenNetwork], deps), + ); } /** - * Listen open channels for channel Events - * Monitors each channel in RaidenState.channels, stops when it gets settled + * Listen TokenNetwork contract for channel Events * Currently monitored events: + * - ChannelOpened, fires a channelopen.success action * - ChannelNewDeposit, fires a channelDeposit.success action * - ChannelWithdraw, fires a channelWithdrawn action * - ChannelClosedEvent, fires a channelClose.success action - * - ChannelSettledEvent, fires a channelSettle.success action and completes that channel observable + * - ChannelSettledEvent, fires a channelSettle.success action + * Also emits tokenMonitored to tell we're monitoring a tokenNetwork, with its [fromBlock, toBlock] + * ranges of fetched pastEvents * * @param action$ - Observable of RaidenActions * @param state$ - Observable of RaidenStates * @param deps - RaidenEpicDeps members - * @param deps.getTokenNetworkContract - TokenNetwork contract instance getter - * @param deps.latest$ - Latest observable - * @returns Observable of channelDeposit.success,channelClose.success,channelSettle.success actions + * @returns Observable of channelOpen.success,channelDeposit.success,channelClose.success, + * channelSettle.success actions */ -export const channelMonitoredEpic = ( - {}: Observable, +export const channelEventsEpic = ( + action$: Observable, state$: Observable, - { getTokenNetworkContract, latest$ }: RaidenEpicDeps, + deps: RaidenEpicDeps, ): Observable< - channelDeposit.success | channelWithdrawn | channelClose.success | channelSettle.success + | tokenMonitored + | channelOpen.success + | channelDeposit.success + | channelWithdrawn + | channelClose.success + | channelSettle.success > => + action$.pipe( + filter(newBlock.is), + pluck('payload', 'blockNumber'), + publishReplay(1, undefined, (blockNumber$) => + state$.pipe( + pluck('tokens'), + distinctRecordValues(), + withLatestFrom(state$), + mergeMap(([[token, tokenNetwork], state]) => { + // fromBlock is latest on-chain event seen for this contract, or registry deployment block +1 + const fromBlock = + Object.values(state.channels) + .concat(Object.values(state.oldChannels)) + .filter((channel) => channel.tokenNetwork === tokenNetwork) + .reduce( + (acc, channel) => + Math.max( + acc, + 'settleBlock' in channel + ? channel.settleBlock + : 'closeBlock' in channel + ? channel.closeBlock + : channel.openBlock, + ), + deps.contractsInfo.TokenNetworkRegistry.block_number, + ) + 1; + + // notifies when past events fetching completes + const pastDone$ = new AsyncSubject(); + + // blockNumber$ holds latest blockNumber, or waits for it to be fetched + return blockNumber$.pipe( + first(), + mergeMap((toBlock) => + // this merge + finalize + delayWhen AsyncSubject outputs like concat, but ensures + // both subscriptions are done simultaneously, to avoid losing monitored new events + // or that they'd come before any pastEvent + merge( + of(tokenMonitored({ token: token as Address, tokenNetwork, fromBlock, toBlock })), + fetchPastChannelEvents$( + [fromBlock, toBlock], + [token as Address, tokenNetwork], + deps, + ).pipe(finalize(() => (pastDone$.next(true), pastDone$.complete()))), + fetchNewChannelEvents$([token as Address, tokenNetwork], deps).pipe( + delayWhen(() => pastDone$), // holds new events until pastEvents fetching ends + ), + ), + ), + ); + }), + ), + ), + ); + +/** + * Emit channelMonitored action for channels on state + * + * @param state$ - Observable of RaidenStates + * @returns Observable of channelMonitored actions + */ +export const channelMonitoredEpic = ( + {}: Observable, + state$: Observable, +): Observable => state$.pipe( groupChannel$, mergeMap((grouped$) => grouped$.pipe( - // exhaustMap ignores new emits due to state changes on already monitored channels - exhaustMap((channel) => { - const { tokenNetwork } = channel; - const partner = channel.partner.address; - const key = channelKey(channel); - const tokenNetworkContract = getTokenNetworkContract(tokenNetwork); - const encodedId = defaultAbiCoder.encode(['uint256'], [channel.id]); - const mergedFilter: Filter = { - address: tokenNetwork, - topics: [Object.values(getChannelEventsTopics(tokenNetworkContract)), [encodedId]], - }; - - return getEventsStream( - tokenNetworkContract, - [mergedFilter], - // fetch since openBlock at subscribe time; already processed events will be skipped - // by mapChannelEvents or reducer, or be idempotent - of(channel.openBlock), - ).pipe( - // use up-to-date channel for mapChannelEvents - withLatestFrom(latest$.pipe(pluck('state', 'channels', key))), - map(mapChannelEvents(tokenNetworkContract, partner)), - filter(isntNil), - // in case of complete, repeat until takeUntil below - repeatWhen(identity), - ); - }), - // this takeUntil is applied over and completes inner getEventsStream when grouped$ - // completes, which happens when channel is settled and gone from state on groupChannel$ - takeUntil(grouped$.pipe(takeLast(1))), + first(), + map((channel) => + channelMonitored( + { id: channel.id }, + { tokenNetwork: channel.tokenNetwork, partner: channel.partner.address }, + ), + ), ), ), ); @@ -559,7 +632,7 @@ const makeDeposit$ = ( ), ), assertTx('setTotalDeposit', ErrorCodes.CNL_SETTOTALDEPOSIT_FAILED, { log }), - // ignore success so it's picked by channelMonitoredEpic + // ignore success so it's picked by channelEventsEpic ignoreElements(), catchError((error) => of( @@ -576,7 +649,7 @@ const makeDeposit$ = ( * A channelOpen action requested by user * Needs to be called on a previously monitored tokenNetwork. Calls TokenNetwork.openChannel * with given parameters. If tx goes through successfuly, stop as ChannelOpened success action - * will instead be detected and fired by tokenMonitoredEpic. If anything detectable goes wrong, + * will instead be detected and fired by channelEventsEpic. If anything detectable goes wrong, * fires a ChannnelOpenActionFailed instead * * @param action$ - Observable of channelOpen actions @@ -665,7 +738,7 @@ export const channelOpenEpic = ( ), ), ), - // ignore success so it's picked by tokenMonitoredEpic + // ignore success so it's picked by channelEventsEpic catchError((error) => of(channelOpen.failure(error, action.meta))), ), ), @@ -677,9 +750,8 @@ export const channelOpenEpic = ( * A ChannelDeposit action requested by user * Needs to be called on a previously monitored channel. Calls Token.approve for TokenNetwork * and then set respective setTotalDeposit. If all tx go through successfuly, stop as - * ChannelDeposited success action will instead be detected and reacted by - * channelMonitoredEpic. If anything detectable goes wrong, fires a ChannelDepositActionFailed - * instead + * ChannelDeposited success action will instead be detected and reacted by channelEventsEpic. + * If anything detectable goes wrong, fires a ChannelDepositActionFailed instead * * @param action$ - Observable of channelDeposit.request actions * @param state$ - Observable of RaidenStates @@ -743,9 +815,9 @@ export const channelDepositEpic = ( /** * A ChannelClose action requested by user * Needs to be called on an opened or closing (for retries) channel. - * If tx goes through successfuly, stop as ChannelClosed success action will instead be - * detected and reacted by channelMonitoredEpic. If anything detectable goes wrong, fires a - * ChannelCloseActionFailed instead + * If tx goes through successfuly, stop as ChannelClosed success action will instead be detected + * and reacted by channelEventsEpic. + * If anything detectable goes wrong, fires a ChannelCloseActionFailed instead * * @param action$ - Observable of channelClose actions * @param state$ - Observable of RaidenStates @@ -819,7 +891,7 @@ export const channelCloseEpic = ( ), assertTx('closeChannel', ErrorCodes.CNL_CLOSECHANNEL_FAILED, { log }), // if succeeded, return a empty/completed observable - // actual ChannelClosedAction will be detected and handled by channelMonitoredEpic + // actual ChannelClosedAction will be detected and handled by channelEventsEpic // if any error happened on tx call/pipeline, catchError will then emit the // channelClose.failure action instead ignoreElements(), @@ -923,9 +995,9 @@ export const channelUpdateEpic = ( /** * A ChannelSettle action requested by user * Needs to be called on an settleable or settling (for retries) channel. - * If tx goes through successfuly, stop as ChannelSettled success action will instead be - * detected and reacted by channelMonitoredEpic. If anything detectable goes wrong, fires a - * ChannelSettleActionFailed instead + * If tx goes through successfuly, stop as ChannelSettled success action will instead be detected + * and reacted by channelEventsEpic. + * If anything detectable goes wrong, fires a ChannelSettleActionFailed instead * * @param action$ - Observable of channelSettle actions * @param state$ - Observable of RaidenStates @@ -1026,7 +1098,7 @@ export const channelSettleEpic = ( }), assertTx('settleChannel', ErrorCodes.CNL_SETTLECHANNEL_FAILED, { log }), // if succeeded, return a empty/completed observable - // actual ChannelSettledAction will be detected and handled by channelMonitoredEpic + // actual ChannelSettledAction will be detected and handled by channelEventsEpic // if any error happened on tx call/pipeline, mergeMap below won't be hit, and catchError // will then emit the channelSettle.failure action instead ignoreElements(), diff --git a/raiden-ts/src/raiden.ts b/raiden-ts/src/raiden.ts index 7e2e23c95e..d162273023 100644 --- a/raiden-ts/src/raiden.ts +++ b/raiden-ts/src/raiden.ts @@ -554,11 +554,9 @@ export class Raiden { */ public async monitorToken(token: string): Promise
{ assert(Address.is(token), [ErrorCodes.DTA_INVALID_ADDRESS, { token }], this.log.info); - const alreadyMonitoredTokens = this.state.tokens; - if (token in alreadyMonitoredTokens) return alreadyMonitoredTokens[token]; - const tokenNetwork = (await this.deps.registryContract.token_to_token_networks( - token, - )) as Address; + let tokenNetwork = this.state.tokens[token]; + if (tokenNetwork) return tokenNetwork; + tokenNetwork = (await this.deps.registryContract.token_to_token_networks(token)) as Address; assert( tokenNetwork && tokenNetwork !== AddressZero, ErrorCodes.RDN_UNKNOWN_TOKEN_NETWORK, diff --git a/raiden-ts/src/services/epics.ts b/raiden-ts/src/services/epics.ts index 3b83dd470a..7ee78d87c7 100644 --- a/raiden-ts/src/services/epics.ts +++ b/raiden-ts/src/services/epics.ts @@ -435,7 +435,7 @@ export const pfsCapacityUpdateEpic = ( * PFSFeeUpdate to path_finding global room, so PFSs can pick us for mediation * TODO: Currently, we always send Zero fees; we should send correct fee data from config * - * @param action$ - Observable of channelMonitor actions + * @param action$ - Observable of channelMonitored actions * @param state$ - Observable of RaidenStates * @param deps - Raiden epic dependencies * @param deps.log - Logger instance diff --git a/raiden-ts/src/transport/epics/presence.ts b/raiden-ts/src/transport/epics/presence.ts index cafa0afb2c..1e07ad1b97 100644 --- a/raiden-ts/src/transport/epics/presence.ts +++ b/raiden-ts/src/transport/epics/presence.ts @@ -32,7 +32,7 @@ import { RaidenState } from '../../state'; import { getUserPresence } from '../../utils/matrix'; import { pluckDistinct } from '../../utils/rx'; import { matrixPresence } from '../actions'; -import { channelMonitor } from '../../channels/actions'; +import { channelMonitored } from '../../channels/actions'; import { parseCaps, stringifyCaps } from '../utils'; // unavailable just means the user didn't do anything over a certain amount of time, but they're @@ -256,7 +256,7 @@ export const matrixMonitorChannelPresenceEpic = ( action$: Observable, ): Observable => action$.pipe( - filter(channelMonitor.is), + filter(channelMonitored.is), map((action) => matrixPresence.request(undefined, { address: action.meta.partner })), ); diff --git a/raiden-ts/src/transport/epics/rooms.ts b/raiden-ts/src/transport/epics/rooms.ts index b621944698..fd958984ee 100644 --- a/raiden-ts/src/transport/epics/rooms.ts +++ b/raiden-ts/src/transport/epics/rooms.ts @@ -28,7 +28,7 @@ import { Address, isntNil } from '../../utils/types'; import { isActionOf } from '../../utils/actions'; import { RaidenEpicDeps } from '../../types'; import { RaidenAction } from '../../actions'; -import { channelMonitor } from '../../channels/actions'; +import { channelMonitored } from '../../channels/actions'; import { messageSend, messageReceived } from '../../messages/actions'; import { transferSigned } from '../../transfers/actions'; import { RaidenState } from '../../state'; @@ -104,7 +104,7 @@ function inviteLoop$( * Create room (if needed) for a transfer's target, channel's partner or, as a fallback, for any * recipient of a messageSend.request action * - * @param action$ - Observable of transferSigned|channelMonitor|messageSend.request actions + * @param action$ - Observable of transferSigned|channelMonitored|messageSend.request actions * @param state$ - Observable of RaidenStates * @param deps - RaidenEpicDeps members * @param deps.address - Our address @@ -122,7 +122,7 @@ export const matrixCreateRoomEpic = ( action$.pipe( // ensure there's a room for address of interest for each of these actions // matrixRoomLeave ensures a new room is created if all we had are forgotten/left - filter(isActionOf([transferSigned, channelMonitor, messageSend.request, matrixRoomLeave])), + filter(isActionOf([transferSigned, channelMonitored, messageSend.request, matrixRoomLeave])), map((action) => { let peer; switch (action.type) { @@ -138,7 +138,7 @@ export const matrixCreateRoomEpic = ( ) peer = action.payload.message.initiator; break; - case channelMonitor.type: + case channelMonitored.type: peer = action.meta.partner; break; default: diff --git a/raiden-ts/src/utils/ethers.ts b/raiden-ts/src/utils/ethers.ts index 631b8e7273..0e5e04f8f0 100644 --- a/raiden-ts/src/utils/ethers.ts +++ b/raiden-ts/src/utils/ethers.ts @@ -4,8 +4,7 @@ import { Provider, JsonRpcProvider, Listener, EventType, Filter, Log } from 'eth import { Network } from 'ethers/utils'; import { getNetwork as parseNetwork } from 'ethers/utils/networks'; import { Observable, fromEventPattern, merge, from, of, EMPTY, combineLatest, defer } from 'rxjs'; -import { filter, first, map, switchMap, mergeMap, share } from 'rxjs/operators'; -import flatten from 'lodash/flatten'; +import { filter, first, map, mergeMap, share, toArray } from 'rxjs/operators'; import sortBy from 'lodash/sortBy'; import { isntNil } from './types'; @@ -22,7 +21,7 @@ import { isntNil } from './types'; export function fromEthersEvent( target: Provider, event: EventType, - resultSelector?: (...args: any[]) => T, // eslint-disable-line + resultSelector?: (...args: any[]) => T, ): Observable { return fromEventPattern( (handler: Listener) => target.on(event, handler), @@ -31,27 +30,33 @@ export function fromEthersEvent( ) as Observable; } +export type ContractEvent = + | [Event] + | [any, Event] + | [any, any, Event] + | [any, any, any, Event] + | [any, any, any, any, Event] + | [any, any, any, any, any, Event] + | [any, any, any, any, any, any, Event] + | [any, any, any, any, any, any, any, Event] + | [any, any, any, any, any, any, any, any, Event] + | [any, any, any, any, any, any, any, any, any, Event]; +export function logToContractEvent( + contract: Contract, +): (log: Log) => T | undefined; +export function logToContractEvent( + contract: Contract, + log: Log, +): T | undefined; /** - * getEventsStream returns a stream of T-type tuples (arrays) from Contract's - * events from filters. These events are polled since provider's [re]setEventsBlock to newest - * polled block. If both 'fromBlock$' and 'lastSeenBlock$' are specified, also fetch past events - * since fromBlock up to lastSeenBlock$ === provider.resetEventsBlock - 1 - * T must be a tuple-like type receiving all filters arguments plus the respective Event in the end + * Curried(2) function to map an ethers's Provider log to a contract event tuple * - * @param contract - Contract source instance for filters, connected to a provider - * @param filters - array of OR filters from tokenNetwork - * @param fromBlock$ - Observable of a past blockNumber since when to fetch past events - * If not provided, last resetEventsBlock is automatically used. - * @returns Observable of contract's events + * @param contract - Contract instance + * @param log - Log to map + * @returns Tuple of events args plus Event object */ -export function getEventsStream( - contract: Contract, - filters: Filter[], - fromBlock$?: Observable, -): Observable { - const provider = contract.provider as JsonRpcProvider; - - const logToEvent = (log: Log): T | undefined => { +export function logToContractEvent(contract: Contract, log?: Log) { + const mapper = (log: Log): T | undefined => { // parse log into [...args, event: Event] array, // the same that contract.on events/callbacks const parsed = contract.interface.parseLog(log); @@ -67,13 +72,35 @@ export function getEventsStream( removeListener: () => { /* getLogs don't install filter */ }, - getBlock: () => provider.getBlock(log.blockHash!), - getTransaction: () => provider.getTransaction(log.transactionHash!), - getTransactionReceipt: () => provider.getTransactionReceipt(log.transactionHash!), + getBlock: () => contract.provider.getBlock(log.blockHash!), + getTransaction: () => contract.provider.getTransaction(log.transactionHash!), + getTransactionReceipt: () => contract.provider.getTransactionReceipt(log.transactionHash!), decode: (data: string, topics?: string[]) => parsed.decode(data, topics || log.topics), }; return [...args, event] as T; }; + return log !== undefined ? mapper(log) : mapper; +} + +/** + * getEventsStream returns a stream of T-type tuples (arrays) from Contract's + * events from filters. These events are polled since provider's [re]setEventsBlock to newest + * polled block. If both 'fromBlock$' and 'lastSeenBlock$' are specified, also fetch past events + * since fromBlock up to lastSeenBlock$ === provider.resetEventsBlock - 1 + * T must be a tuple-like type receiving all filters arguments plus the respective Event in the end + * + * @param contract - Contract source instance for filters, connected to a provider + * @param filters - array of OR filters from tokenNetwork + * @param fromBlock$ - Observable of a past blockNumber since when to fetch past events + * If not provided, last resetEventsBlock is automatically used. + * @returns Observable of contract's events + */ +export function getEventsStream( + contract: Contract, + filters: Filter[], + fromBlock$?: Observable, +): Observable { + const provider = contract.provider as JsonRpcProvider; // past events (in the closed-interval=[fromBlock, lastSeenBlock]), // fetch once, sort by blockNumber, emit all, complete @@ -88,18 +115,23 @@ export function getEventsStream( ? of(provider.blockNumber) : fromEthersEvent(provider, 'block').pipe( first(), - map((b) => provider.blockNumber ?? b), + map(() => provider.blockNumber), + share(), ), - ).pipe(share()); + ); pastEvents$ = combineLatest(fromBlock$, nextBlock$).pipe( first(), - switchMap(([fromBlock, toBlock]) => - Promise.all(filters.map((filter) => provider.getLogs({ ...filter, fromBlock, toBlock }))), + mergeMap(([fromBlock, toBlock]) => + from(filters).pipe( + mergeMap((filter) => provider.getLogs({ ...filter, fromBlock, toBlock })), + // flatten array of each getLogs query response and sort them + // emit log array elements as separate logs into stream (unwind) + mergeMap(from), + toArray(), + mergeMap((logs) => from(sortBy(logs, ['blockNumber']))), + ), ), - // flatten array of each getLogs query response and sort them - // emit log array elements as separate logs into stream (unwind) - mergeMap((logs) => from(sortBy(flatten(logs), ['blockNumber']))), - map(logToEvent), + map(logToContractEvent(contract)), filter(isntNil), ); } @@ -108,9 +140,9 @@ export function getEventsStream( // where lastSeenBlock is the currentBlock at call time // doesn't complete, keep emitting events for each new block (if any) until unsubscription const newEvents$: Observable = nextBlock$.pipe( - switchMap(() => from(filters)), + mergeMap(() => from(filters)), mergeMap((filter) => fromEthersEvent(provider, filter)), - map(logToEvent), + map(logToContractEvent(contract)), filter(isntNil), ); diff --git a/raiden-ts/src/utils/rx.ts b/raiden-ts/src/utils/rx.ts index 03a3cffe16..4f66a169ba 100644 --- a/raiden-ts/src/utils/rx.ts +++ b/raiden-ts/src/utils/rx.ts @@ -1,4 +1,4 @@ -import { Observable, OperatorFunction, from } from 'rxjs'; +import { Observable, OperatorFunction, pairs } from 'rxjs'; import { pluck, distinctUntilChanged, mergeMap, scan, filter } from 'rxjs/operators'; import { isntNil } from './types'; @@ -70,7 +70,7 @@ export function distinctRecordValues( return (input: Observable>): Observable<[string, R]> => input.pipe( distinctUntilChanged(), - mergeMap((map) => from(Object.entries(map))), + mergeMap((map) => pairs(map)), /* this scan stores a reference to each [key,value] in 'acc', and emit as 'changed' iff it * changes from last time seen. It relies on value references changing only if needed */ scan<[string, R], { acc: { [k: string]: R }; changed?: [string, R] }>( From a05aa2c0041203011ac35fcc668af2b376eed1c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Vitor=20de=20Lima=20Matos?= Date: Sat, 27 Jun 2020 16:35:24 -0300 Subject: [PATCH 2/3] sdk: Adjust tests for events monitoring refactor --- raiden-ts/tests/e2e/raiden.spec.ts | 26 +-- raiden-ts/tests/unit/actions.spec.ts | 13 +- raiden-ts/tests/unit/epics/channels.spec.ts | 121 +++++++------- raiden-ts/tests/unit/epics/path.spec.ts | 6 +- raiden-ts/tests/unit/epics/raiden.spec.ts | 160 +++---------------- raiden-ts/tests/unit/epics/transport.spec.ts | 6 +- raiden-ts/tests/unit/fixtures.ts | 70 ++++---- 7 files changed, 139 insertions(+), 263 deletions(-) diff --git a/raiden-ts/tests/e2e/raiden.spec.ts b/raiden-ts/tests/e2e/raiden.spec.ts index 1180cb25d6..cbc7a6d357 100644 --- a/raiden-ts/tests/e2e/raiden.spec.ts +++ b/raiden-ts/tests/e2e/raiden.spec.ts @@ -482,6 +482,8 @@ describe('Raiden', () => { raiden.state$.subscribe((state) => (raidenState = state)); raiden.start(); + await provider.mine(5); + // ensure after hot boot, state is rehydrated and contains (only) previous token expect(raidenState).toBeDefined(); expect(raidenState!.tokens).toEqual({ [token]: tokenNetwork }); @@ -633,11 +635,13 @@ describe('Raiden', () => { await expect(raiden.monitorToken(token)).resolves.toBe(tokenNetwork); await expect(promise).resolves.toEqual( - tokenMonitored({ - fromBlock: expect.any(Number), - token: token as Address, - tokenNetwork: tokenNetwork as Address, - }), + tokenMonitored( + expect.objectContaining({ + fromBlock: expect.any(Number), + token: token as Address, + tokenNetwork: tokenNetwork as Address, + }), + ), ); // while partner is not yet initialized, open a channel with them @@ -654,11 +658,13 @@ describe('Raiden', () => { // promise1, contrary to promise, should resolve at initialization, upon first scan // detects tokenNetwork as being of interest for having a channel with parner await expect(promise1).resolves.toEqual( - tokenMonitored({ - fromBlock: expect.any(Number), - token: token as Address, - tokenNetwork: tokenNetwork as Address, - }), + tokenMonitored( + expect.objectContaining({ + fromBlock: expect.any(Number), + token: token as Address, + tokenNetwork: tokenNetwork as Address, + }), + ), ); raiden1.stop(); diff --git a/raiden-ts/tests/unit/actions.spec.ts b/raiden-ts/tests/unit/actions.spec.ts index 173c502f5b..aba4ec1861 100644 --- a/raiden-ts/tests/unit/actions.spec.ts +++ b/raiden-ts/tests/unit/actions.spec.ts @@ -2,7 +2,7 @@ import * as t from 'io-ts'; import { from } from 'rxjs'; import { bigNumberify } from 'ethers/utils'; -import { channelDeposit, channelMonitor } from 'raiden-ts/channels/actions'; +import { channelDeposit, channelMonitored } from 'raiden-ts/channels/actions'; import { RaidenError, ErrorCodec, ErrorCodes } from 'raiden-ts/utils/error'; import { Address, UInt, decode } from 'raiden-ts/utils/types'; import { @@ -19,12 +19,11 @@ import { ConfirmableAction } from 'raiden-ts/actions'; describe('action factories not tested in reducers.spec.ts', () => { const tokenNetwork = '0x0000000000000000000000000000000000020001' as Address, partner = '0x0000000000000000000000000000000000000020' as Address; - test('channelMonitor', () => { - const id = 12, - fromBlock = 5123; - expect(channelMonitor({ id, fromBlock }, { tokenNetwork, partner })).toEqual({ - type: channelMonitor.type, - payload: { id, fromBlock }, + test('channelMonitored', () => { + const id = 12; + expect(channelMonitored({ id }, { tokenNetwork, partner })).toEqual({ + type: channelMonitored.type, + payload: { id }, meta: { tokenNetwork, partner }, }); }); diff --git a/raiden-ts/tests/unit/epics/channels.spec.ts b/raiden-ts/tests/unit/epics/channels.spec.ts index 3d18cfe2c0..4f3d4ade37 100644 --- a/raiden-ts/tests/unit/epics/channels.spec.ts +++ b/raiden-ts/tests/unit/epics/channels.spec.ts @@ -1,4 +1,4 @@ -import { makeLog, makeRaidens, makeHash, waitBlock, makeTransaction } from '../mocks'; +import { makeLog, makeRaidens, makeHash, waitBlock, makeTransaction, makeAddress } from '../mocks'; import { token, tokenNetwork, @@ -21,17 +21,17 @@ import { ensureChannelIsSettled, getChannel, amount, + getChannelEventsFilter, } from '../fixtures'; import { bigNumberify, BigNumber } from 'ethers/utils'; import { Zero, HashZero } from 'ethers/constants'; import { defaultAbiCoder } from 'ethers/utils/abi-coder'; -import { Filter } from 'ethers/providers'; import { UInt } from 'raiden-ts/utils/types'; import { LocksrootZero } from 'raiden-ts/constants'; import { - channelMonitor, + channelMonitored, channelOpen, channelDeposit, channelClose, @@ -41,7 +41,6 @@ import { } from 'raiden-ts/channels/actions'; import { channelUniqueKey } from 'raiden-ts/channels/utils'; import { ChannelState } from 'raiden-ts/channels'; -import { TokenNetwork } from 'raiden-ts/contracts/TokenNetwork'; import { createBalanceHash, getBalanceProofFromEnvelopeMessage } from 'raiden-ts/messages'; import { getLocksroot } from 'raiden-ts/transfers/utils'; @@ -123,45 +122,59 @@ describe('channelOpenEpic', () => { }); }); -test('channelOpenedEpic', async () => { +test('channelMonitoredEpic', async () => { expect.assertions(1); const [raiden, partner] = await makeRaidens(2); await ensureChannelIsOpen([raiden, partner]); expect(raiden.output).toContainEqual( - channelMonitor( - { id, fromBlock: expect.any(Number) }, - { tokenNetwork, partner: partner.address }, - ), + channelMonitored({ id }, { tokenNetwork, partner: partner.address }), ); }); -describe('channelMonitoredEpic', () => { - const idEncoded = defaultAbiCoder.encode(['uint256'], [id]); +describe('channelEventsEpic', () => { const depositEncoded = defaultAbiCoder.encode(['uint256'], [deposit]); - function getMonitoredFilter(tokenNetworkContract: TokenNetwork): Filter { - return { - address: tokenNetworkContract.address, - topics: [ - [ - tokenNetworkContract.interface.events.ChannelNewDeposit.topic, - tokenNetworkContract.interface.events.ChannelWithdraw.topic, - tokenNetworkContract.interface.events.ChannelClosed.topic, - tokenNetworkContract.interface.events.ChannelSettled.topic, - ], - [idEncoded], - ], - }; - } - test('initial monitor with past$ own ChannelNewDeposit event', async () => { - expect.assertions(1); + expect.assertions(2); + const settleTimeoutEncoded = defaultAbiCoder.encode(['uint256'], [settleTimeout]); - const [raiden, partner] = await makeRaidens(2); + const [raiden, partner] = await makeRaidens(2, false); const tokenNetworkContract = raiden.deps.getTokenNetworkContract(tokenNetwork); + // put a previous channel in state, to trigger logs to be fetched since it + raiden.store.dispatch( + channelOpen.success( + { + id: id - 1, + token, + settleTimeout, + isFirstParticipant: true, + txHash: makeHash(), + txBlock: openBlock - 5, + confirmed: true, + }, + { tokenNetwork, partner: makeAddress() }, + ), + ); + await raiden.start(); + await partner.start(); + + raiden.deps.provider.getLogs.mockResolvedValueOnce([ + makeLog({ + blockNumber: openBlock, + filter: tokenNetworkContract.filters.ChannelOpened( + id, + raiden.address, + partner.address, + null, + ), + data: settleTimeoutEncoded, // non-indexed settleTimeout goes in data + }), + ]); + // getLogs for our address as 2nd participant returns no event + raiden.deps.provider.getLogs.mockResolvedValueOnce([]); raiden.deps.provider.getLogs.mockResolvedValue([ makeLog({ blockNumber: openBlock + 1, @@ -169,10 +182,8 @@ describe('channelMonitoredEpic', () => { data: depositEncoded, // non-indexed total_deposit = 1023 goes in data }), ]); - await ensureChannelIsOpen([raiden, partner]); - raiden.store.dispatch( - channelMonitor({ id, fromBlock: openBlock }, { tokenNetwork, partner: partner.address }), - ); + await ensureTokenIsMonitored(raiden); + await ensureTokenIsMonitored(partner); await waitBlock(); expect(raiden.output).toContainEqual( @@ -188,6 +199,20 @@ describe('channelMonitoredEpic', () => { { tokenNetwork, partner: partner.address }, ), ); + // expect getLogs to have been limited fromBlock since last known event + expect(raiden.deps.provider.getLogs).toHaveBeenCalledWith({ + address: tokenNetwork, + topics: [ + expect.arrayContaining([ + tokenNetworkContract.interface.events.ChannelNewDeposit.topic, + tokenNetworkContract.interface.events.ChannelSettled.topic, + ]), + // ensure already confirmed channel also got into scanned channelIds set + expect.arrayContaining([id - 1, id].map((i) => defaultAbiCoder.encode(['uint256'], [i]))), + ], + fromBlock: openBlock - 4, + toBlock: expect.any(Number), + }); }); test('already monitored with new$ partner ChannelNewDeposit event', async () => { @@ -197,12 +222,9 @@ describe('channelMonitoredEpic', () => { const tokenNetworkContract = raiden.deps.getTokenNetworkContract(tokenNetwork); await ensureChannelIsOpen([raiden, partner]); - raiden.store.dispatch( - channelMonitor({ id, fromBlock: openBlock }, { tokenNetwork, partner: partner.address }), - ); await waitBlock(openBlock + 2); raiden.deps.provider.emit( - getMonitoredFilter(tokenNetworkContract), + getChannelEventsFilter(tokenNetworkContract), makeLog({ blockNumber: openBlock + 2, filter: tokenNetworkContract.filters.ChannelNewDeposit(id, partner.address, null), @@ -235,12 +257,9 @@ describe('channelMonitoredEpic', () => { const tokenNetworkContract = raiden.deps.getTokenNetworkContract(tokenNetwork); await ensureChannelIsOpen([raiden, partner]); - raiden.store.dispatch( - channelMonitor({ id, fromBlock: openBlock }, { tokenNetwork, partner: partner.address }), - ); await waitBlock(closeBlock - 1); raiden.deps.provider.emit( - getMonitoredFilter(tokenNetworkContract), + getChannelEventsFilter(tokenNetworkContract), makeLog({ blockNumber: closeBlock - 1, transactionHash: txHash, @@ -272,12 +291,9 @@ describe('channelMonitoredEpic', () => { const tokenNetworkContract = raiden.deps.getTokenNetworkContract(tokenNetwork); await ensureChannelIsOpen([raiden, partner]); - raiden.store.dispatch( - channelMonitor({ id, fromBlock: openBlock }, { tokenNetwork, partner: partner.address }), - ); await waitBlock(closeBlock); raiden.deps.provider.emit( - getMonitoredFilter(tokenNetworkContract), + getChannelEventsFilter(tokenNetworkContract), makeLog({ blockNumber: closeBlock, transactionHash: txHash, @@ -303,7 +319,7 @@ describe('channelMonitoredEpic', () => { }); test('new$ ChannelSettled event', async () => { - expect.assertions(9); + expect.assertions(7); const settleDataEncoded = defaultAbiCoder.encode( ['uint256', 'bytes32', 'uint256', 'bytes32'], [Zero, HashZero, Zero, HashZero], @@ -313,17 +329,11 @@ describe('channelMonitoredEpic', () => { const tokenNetworkContract = raiden.deps.getTokenNetworkContract(tokenNetwork); await ensureChannelIsClosed([raiden, partner]); - raiden.store.dispatch( - channelMonitor({ id, fromBlock: openBlock }, { tokenNetwork, partner: partner.address }), - ); + raiden.store.dispatch(channelMonitored({ id }, { tokenNetwork, partner: partner.address })); await waitBlock(settleBlock); - const filter = getMonitoredFilter(tokenNetworkContract); + const filter = getChannelEventsFilter(tokenNetworkContract); expect(raiden.deps.provider.on).toHaveBeenCalledWith(filter, expect.any(Function)); - expect(raiden.deps.provider.removeListener).not.toHaveBeenCalledWith( - filter, - expect.any(Function), - ); expect(raiden.deps.provider.listenerCount(filter)).toBe(1); const settleHash = makeHash(); @@ -353,9 +363,8 @@ describe('channelMonitoredEpic', () => { ), ); - // ensure ChannelSettledAction completed channel monitoring and unsubscribed from events - expect(raiden.deps.provider.removeListener).toHaveBeenCalledWith(filter, expect.any(Function)); - expect(raiden.deps.provider.listenerCount(filter)).toBe(0); + // ensure TokenNetwork is still being monitored after settle + expect(raiden.deps.provider.removeListener).not.toHaveBeenCalled(); // ensure channel state is moved from 'channels' to 'oldChannels' expect(getChannel(raiden, partner)).toBeUndefined(); diff --git a/raiden-ts/tests/unit/epics/path.spec.ts b/raiden-ts/tests/unit/epics/path.spec.ts index 6d19568b79..455214e522 100644 --- a/raiden-ts/tests/unit/epics/path.spec.ts +++ b/raiden-ts/tests/unit/epics/path.spec.ts @@ -13,7 +13,7 @@ import { channelOpen, channelDeposit, channelClose, - channelMonitor, + channelMonitored, } from 'raiden-ts/channels/actions'; import { raidenConfigUpdate, RaidenAction } from 'raiden-ts/actions'; import { matrixPresence } from 'raiden-ts/transport/actions'; @@ -1282,7 +1282,7 @@ describe('PFS: pfsFeeUpdateEpic', () => { txHash, } = epicFixtures(depsMock)); state$ = depsMock.latest$.pipe(pluck('state')); - action = channelMonitor({ id: channelId }, { tokenNetwork, partner }); + action = channelMonitored({ id: channelId }, { tokenNetwork, partner }); [ raidenConfigUpdate({ @@ -1313,7 +1313,7 @@ describe('PFS: pfsFeeUpdateEpic', () => { depsMock.latest$.complete(); }); - test('success: send PFSFeeUpdate to global pfsRoom on channelMonitor', async () => { + test('success: send PFSFeeUpdate to global pfsRoom on channelMonitored', async () => { expect.assertions(1); const promise = pfsFeeUpdateEpic(action$, state$, depsMock).toPromise(); diff --git a/raiden-ts/tests/unit/epics/raiden.spec.ts b/raiden-ts/tests/unit/epics/raiden.spec.ts index 1d8159bd06..3ff694ea48 100644 --- a/raiden-ts/tests/unit/epics/raiden.spec.ts +++ b/raiden-ts/tests/unit/epics/raiden.spec.ts @@ -14,9 +14,15 @@ import { import { defaultAbiCoder } from 'ethers/utils/abi-coder'; import { raidenShutdown } from 'raiden-ts/actions'; -import { newBlock, tokenMonitored, channelMonitor, channelOpen } from 'raiden-ts/channels/actions'; +import { + newBlock, + tokenMonitored, + channelMonitored, + channelOpen, +} from 'raiden-ts/channels/actions'; import { ShutdownReason } from 'raiden-ts/constants'; import { RaidenError, ErrorCodes } from 'raiden-ts/utils/error'; +import { first, pluck } from 'rxjs/operators'; const partner = makeAddress(); @@ -29,18 +35,7 @@ describe('raiden init epics', () => { expect(raiden.output).toContainEqual(newBlock({ blockNumber: expect.any(Number) })); }); - test('init previous tokenMonitored', async () => { - expect.assertions(2); - const raiden = await makeRaiden(undefined, false); - // change initial state before starting - raiden.store.dispatch(tokenMonitored({ token, tokenNetwork })); - expect(raiden.store.getState().tokens).toEqual({ [token]: tokenNetwork }); - await raiden.start(); - await waitBlock(); - expect(raiden.output).toContainEqual(tokenMonitored({ token, tokenNetwork })); - }); - - test('init tokenMonitored with scanned tokenNetwork', async () => { + test('init tokenMonitored with scanned tokenNetwork, retryAsync$ retry', async () => { expect.assertions(3); const raiden = await makeRaiden(undefined, false); const { registryContract } = raiden.deps; @@ -71,13 +66,23 @@ describe('raiden init epics', () => { raiden.address, null, ), + data: defaultAbiCoder.encode(['uint256'], [settleTimeout]), }), ]; } return []; }); + + // ensure one getLogs error doesn't fail and is retried by retryAsync$ + raiden.deps.provider.getLogs.mockRejectedValueOnce(new Error('network error;')); + const promise = raiden.deps.latest$ + .pipe(pluck('action'), first(tokenMonitored.is)) + .toPromise(); + await raiden.start(); await waitBlock(); + // since we put some delay on retryAsync$, we need to wait + await promise; expect(raiden.output).toContainEqual( tokenMonitored({ token, tokenNetwork, fromBlock: expect.any(Number) }), @@ -113,7 +118,7 @@ describe('raiden init epics', () => { ); await raiden.start(); - expect(raiden.output).toContainEqual(channelMonitor({ id }, meta)); + expect(raiden.output).toContainEqual(channelMonitored({ id }, meta)); }); test('ShutdownReason.ACCOUNT_CHANGED', async () => { @@ -164,133 +169,6 @@ describe('raiden init epics', () => { }); }); -describe('tokenMonitoredEpic', () => { - const settleTimeoutEncoded = defaultAbiCoder.encode(['uint256'], [settleTimeout]); - - test('first tokenMonitored with past$ ChannelOpened event', async () => { - expect.assertions(1); - - const raiden = await makeRaiden(); - const tokenNetworkContract = raiden.deps.getTokenNetworkContract(tokenNetwork); - - raiden.deps.provider.getLogs.mockImplementation(async ({ address, topics }) => { - if ( - address === tokenNetwork && - topics?.[0] === tokenNetworkContract.interface.events.ChannelOpened.topic - ) - return [ - makeLog({ - transactionHash: txHash, - blockNumber: openBlock, - filter: tokenNetworkContract.filters.ChannelOpened(id, partner, raiden.address, null), - data: settleTimeoutEncoded, - }), - ]; - return []; - }); - - raiden.store.dispatch( - tokenMonitored({ - token, - tokenNetwork, - fromBlock: 2, - }), - ); - await waitBlock(); - - expect(raiden.output).toContainEqual( - channelOpen.success( - { - id, - token, - settleTimeout, - isFirstParticipant: false, - txHash, - txBlock: openBlock, - confirmed: undefined, - }, - { tokenNetwork, partner }, - ), - ); - }); - - test('already tokenMonitored with new$ ChannelOpened event', async () => { - expect.assertions(1); - - const raiden = await makeRaiden(undefined, false); - raiden.store.dispatch(tokenMonitored({ token, tokenNetwork })); - await raiden.start(); - - const tokenNetworkContract = raiden.deps.getTokenNetworkContract(tokenNetwork); - - raiden.deps.provider.emit( - tokenNetworkContract.filters.ChannelOpened(null, null, null, null), - makeLog({ - transactionHash: txHash, - blockNumber: openBlock, - filter: tokenNetworkContract.filters.ChannelOpened(id, raiden.address, partner, null), - data: settleTimeoutEncoded, - }), - ); - await waitBlock(); - - expect(raiden.output).toContainEqual( - channelOpen.success( - { - id, - token, - settleTimeout, - isFirstParticipant: true, - txHash, - txBlock: openBlock, - confirmed: undefined, - }, - { tokenNetwork, partner }, - ), - ); - }); - - test("ensure multiple tokenMonitored don't produce duplicated events", async () => { - expect.assertions(2); - - const raiden = await makeRaiden(); - for (let i = 0; i < 16; i++) raiden.store.dispatch(tokenMonitored({ token, tokenNetwork })); - - const tokenNetworkContract = raiden.deps.getTokenNetworkContract(tokenNetwork); - - raiden.deps.provider.emit( - tokenNetworkContract.filters.ChannelOpened(null, null, null, null), - makeLog({ - transactionHash: txHash, - blockNumber: openBlock, - filter: tokenNetworkContract.filters.ChannelOpened(id, raiden.address, partner, null), - data: settleTimeoutEncoded, - }), - ); - await waitBlock(); - - expect(raiden.output).toContainEqual( - channelOpen.success( - { - id, - token, - settleTimeout, - isFirstParticipant: true, - txHash, - txBlock: openBlock, - confirmed: undefined, - }, - { tokenNetwork, partner }, - ), - ); - expect( - raiden.output.filter( - (action) => channelOpen.success.is(action) && action.payload.confirmed === undefined, - ), - ).toHaveLength(1); - }); -}); - describe('confirmationEpic', () => { test('confirmed', async () => { expect.assertions(2); diff --git a/raiden-ts/tests/unit/epics/transport.spec.ts b/raiden-ts/tests/unit/epics/transport.spec.ts index 3f8b25b1d6..023ddb4c94 100644 --- a/raiden-ts/tests/unit/epics/transport.spec.ts +++ b/raiden-ts/tests/unit/epics/transport.spec.ts @@ -11,7 +11,7 @@ import { verifyMessage, BigNumber } from 'ethers/utils'; import { RaidenAction, raidenConfigUpdate } from 'raiden-ts/actions'; import { raidenReducer } from 'raiden-ts/reducer'; -import { channelMonitor } from 'raiden-ts/channels/actions'; +import { channelMonitored } from 'raiden-ts/channels/actions'; import { matrixPresence, matrixRoom, @@ -267,9 +267,9 @@ describe('transport epic', () => { }); describe('matrixMonitorChannelPresenceEpic', () => { - test('channelMonitor triggers matrixPresence.request', async () => { + test('channelMonitored triggers matrixPresence.request', async () => { const action$ = of( - channelMonitor({ id: channelId }, { tokenNetwork, partner }), + channelMonitored({ id: channelId }, { tokenNetwork, partner }), ); const promise = matrixMonitorChannelPresenceEpic(action$).toPromise(); await expect(promise).resolves.toEqual( diff --git a/raiden-ts/tests/unit/fixtures.ts b/raiden-ts/tests/unit/fixtures.ts index 79294ae1ac..707abd94b0 100644 --- a/raiden-ts/tests/unit/fixtures.ts +++ b/raiden-ts/tests/unit/fixtures.ts @@ -38,6 +38,7 @@ import { ChannelState, Channel } from 'raiden-ts/channels'; import { Direction } from 'raiden-ts/transfers/state'; import { transfer, transferUnlock } from 'raiden-ts/transfers/actions'; import { messageReceived } from 'raiden-ts/messages/actions'; +import { TokenNetwork } from 'raiden-ts/contracts/TokenNetwork'; /** * Composes several constants used across epics @@ -178,6 +179,28 @@ export function getChannel( return raiden.store.getState().channels[channelKey({ tokenNetwork: _tokenNetwork, partner })]; } +/** + * Returns the filter used to monitor for channel events + * + * @param tokenNetworkContract - TokenNetwork contract + * @returns Filter used to monitor TokenNetwork for Channel events + */ +export function getChannelEventsFilter(tokenNetworkContract: TokenNetwork): Filter { + const events = tokenNetworkContract.interface.events; + return { + address: tokenNetworkContract.address, + topics: [ + [ + events.ChannelOpened.topic, + events.ChannelNewDeposit.topic, + events.ChannelWithdraw.topic, + events.ChannelClosed.topic, + events.ChannelSettled.topic, + ], + ], + }; +} + /** * Ensure token is monitored on raiden's state * @@ -206,7 +229,7 @@ export async function ensureChannelIsOpen([raiden, partner]: [ const tokenNetworkContract = raiden.deps.getTokenNetworkContract(tokenNetwork); providersEmit( - tokenNetworkContract.filters.ChannelOpened(null, null, null, null), + getChannelEventsFilter(tokenNetworkContract), makeLog({ transactionHash: makeHash(), blockNumber: openBlock, @@ -242,21 +265,8 @@ export async function ensureChannelIsDeposited( const participant = raiden.address; const tokenNetworkContract = raiden.deps.getTokenNetworkContract(tokenNetwork); - const events = tokenNetworkContract.interface.events; - const monitorFilter: Filter = { - address: tokenNetwork, - topics: [ - [ - events.ChannelNewDeposit.topic, - events.ChannelWithdraw.topic, - events.ChannelClosed.topic, - events.ChannelSettled.topic, - ], - [defaultAbiCoder.encode(['uint256'], [id])], - ], - }; providersEmit( - monitorFilter, + getChannelEventsFilter(tokenNetworkContract), makeLog({ transactionHash: txHash, blockNumber: txBlock, @@ -286,21 +296,8 @@ export async function ensureChannelIsClosed([raiden, partner]: [ await ensureChannelIsOpen([raiden, partner]); if (getChannel(raiden, partner).state === ChannelState.closed) return; const tokenNetworkContract = raiden.deps.getTokenNetworkContract(tokenNetwork); - const events = tokenNetworkContract.interface.events; - const monitorFilter: Filter = { - address: tokenNetwork, - topics: [ - [ - events.ChannelNewDeposit.topic, - events.ChannelWithdraw.topic, - events.ChannelClosed.topic, - events.ChannelSettled.topic, - ], - [defaultAbiCoder.encode(['uint256'], [id])], - ], - }; providersEmit( - monitorFilter, + getChannelEventsFilter(tokenNetworkContract), makeLog({ transactionHash: makeHash(), blockNumber: closeBlock, @@ -326,21 +323,8 @@ export async function ensureChannelIsSettled([raiden, partner]: [ await ensureChannelIsClosed([raiden, partner]); if (!getChannel(raiden, partner)) return; const tokenNetworkContract = raiden.deps.getTokenNetworkContract(tokenNetwork); - const events = tokenNetworkContract.interface.events; - const monitorFilter: Filter = { - address: tokenNetwork, - topics: [ - [ - events.ChannelNewDeposit.topic, - events.ChannelWithdraw.topic, - events.ChannelClosed.topic, - events.ChannelSettled.topic, - ], - [defaultAbiCoder.encode(['uint256'], [id])], - ], - }; providersEmit( - monitorFilter, + getChannelEventsFilter(tokenNetworkContract), makeLog({ transactionHash: makeHash(), blockNumber: settleBlock, From 7fba5634fe5c6cd98bd905607c29476b80f93ef6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Vitor=20de=20Lima=20Matos?= Date: Sat, 27 Jun 2020 16:39:00 -0300 Subject: [PATCH 3/3] sdk: Add changelog entry for events monitoring refactor --- raiden-ts/CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/raiden-ts/CHANGELOG.md b/raiden-ts/CHANGELOG.md index 47efb591b3..0d749aa1a9 100644 --- a/raiden-ts/CHANGELOG.md +++ b/raiden-ts/CHANGELOG.md @@ -9,6 +9,7 @@ - [#1690] Fix LockExpired with empty balanceHash verification - [#1698] Fix estimateGas errors on channelOpen not properly being handled - [#1761] Fix deposit error on openChannel not rejecting promise +- [#1787] Fix TokenNetwork monitoring losing events ### Added - [#1374] Monitors MonitoringService contract and emit event when MS acts @@ -25,6 +26,7 @@ - [#1657] Expose RaidenChannel's id,settleTimeout,openBlock as required properties - [#1708] Expose RaidenTransfer's secret as optional property - [#1705] All transfers become monitored per default to make receiving transfers safe +- [#1822] Refactor and optimize TokenNetwork events monitoring: one filter per Tokennetwork [#837]: https://github.com/raiden-network/light-client/issues/837 [#1374]: https://github.com/raiden-network/light-client/issues/1374 @@ -44,6 +46,8 @@ [#1705]: https://github.com/raiden-network/light-client/issues/1705 [#1711]: https://github.com/raiden-network/light-client/pull/1711 [#1761]: https://github.com/raiden-network/light-client/issues/1761 +[#1787]: https://github.com/raiden-network/light-client/issues/1787 +[#1822]: https://github.com/raiden-network/light-client/pull/1822 ## [0.9.0] - 2020-05-28 ### Added