Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix per channel event monitoring and past events fetching #1475

Merged
merged 4 commits into from
May 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions raiden-ts/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@

### Fixed
- [#1456] Retry without stored setup if auth fails
- [#1434] Ensure past channel events are correctly fetched

[#1456]: https://github.com/raiden-network/light-client/issues/1456
[#1434]: https://github.com/raiden-network/light-client/issues/1434

### Changed
- [#1462] Refactor state schema and types to be simpler and safer
Expand Down
342 changes: 168 additions & 174 deletions raiden-ts/src/channels/epics.ts

Large diffs are not rendered by default.

29 changes: 16 additions & 13 deletions raiden-ts/src/channels/reducer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import {
channelWithdrawn,
} from './actions';
import { Channel, ChannelState, ChannelEnd } from './state';
import { channelKey } from './utils';
import { channelKey, channelUniqueKey } from './utils';

// state.blockNumber specific reducer, handles only newBlock action
const blockNumber = createReducer(initialState.blockNumber).handle(
Expand Down Expand Up @@ -70,7 +70,12 @@ const emptyChannelEnd: ChannelEnd = {
function channelOpenSuccessReducer(state: RaidenState, action: channelOpen.success): RaidenState {
const key = channelKey(action.meta);
// ignore if older than currently set channel, or unconfirmed or removed
if ((state.channels[key]?.openBlock ?? 0) >= action.payload.txBlock || !action.payload.confirmed)
const prevChannel = state.channels[key];
if (
(prevChannel?.openBlock ?? 0) >= action.payload.txBlock ||
(prevChannel?.id ?? 0) >= action.payload.id ||
!action.payload.confirmed
)
return state;
const channel: Channel = {
state: ChannelState.open,
Expand Down Expand Up @@ -102,22 +107,20 @@ function channelUpdateOnchainBalanceStateReducer(
let channel = state.channels[key];
if (channel?.state !== ChannelState.open || channel.id !== action.payload.id) return state;

const prop = channelWithdrawn.is(action) ? 'withdraw' : 'deposit';
const total = channelWithdrawn.is(action)
? action.payload.totalWithdraw
: action.payload.totalDeposit;
const [prop, total] = channelWithdrawn.is(action)
? ['withdraw' as const, action.payload.totalWithdraw]
: ['deposit' as const, action.payload.totalDeposit];
const end = action.payload.participant === channel.partner.address ? 'partner' : 'own';

const isPartner = action.payload.participant === action.meta.partner;
const channelSide = isPartner ? 'partner' : 'own';
if (total.lte(channel[end][prop])) return state; // ignore if past event

channel = {
...channel,
[channelSide]: {
...channel[channelSide],
[end]: {
...channel[end],
[prop]: total,
},
};

return { ...state, channels: { ...state.channels, [key]: channel } };
}

Expand All @@ -131,7 +134,7 @@ function channelCloseSuccessReducer(
// even on non-confirmed action, already set channel state as closing, so it can't be used for new transfers
if (action.payload.confirmed === undefined && channel.state === ChannelState.open)
channel = { ...channel, state: ChannelState.closing };
else if (action.payload.confirmed)
else if (!('closeBlock' in channel) && action.payload.confirmed)
channel = {
...channel,
state: ChannelState.closed,
Expand Down Expand Up @@ -182,7 +185,7 @@ function channelSettleSuccessReducer(
oldChannels: {
// persist popped channel on oldChannels with augmented channelKey
...state.oldChannels,
[`${channel.id}#${key}`]: {
[channelUniqueKey(channel)]: {
...channel,
state: ChannelState.settled,
settleBlock: action.payload.txBlock,
Expand Down
4 changes: 3 additions & 1 deletion raiden-ts/src/channels/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ import * as t from 'io-ts';

import { Address, Hash, UInt } from '../utils/types';

// should this become a brand?
// should these become brands?
export const ChannelKey = t.string;
export type ChannelKey = string;
export const ChannelUniqueKey = t.string;
export type ChannelUniqueKey = string;

// Represents a HashTime-Locked amount in a channel
export const Lock = t.type(
Expand Down
82 changes: 77 additions & 5 deletions raiden-ts/src/channels/utils.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,28 @@
import { OperatorFunction, from } from 'rxjs';
import { tap, mergeMap, map } from 'rxjs/operators';
import { OperatorFunction, from, Observable } from 'rxjs';
import {
tap,
mergeMap,
map,
concatMap,
scan,
pluck,
filter,
groupBy,
takeUntil,
} from 'rxjs/operators';
import { Zero } from 'ethers/constants';
import { ContractTransaction } from 'ethers/contract';

import { RaidenState } from '../state';
import { RaidenEpicDeps } from '../types';
import { UInt, Hash, Address } from '../utils/types';
import { UInt, Hash, Address, isntNil } from '../utils/types';
import { ErrorCodes, RaidenError } from '../utils/error';
import { pluckDistinct } from '../utils/rx';
import { Channel, ChannelState } from './state';
import { ChannelKey } from './types';
import { ChannelKey, ChannelUniqueKey } from './types';

/**
* Returns a unique key (string) for a channel
* Returns a key (string) for a channel unique per tokenNetwork+partner
*
* @param channel - Either a Channel or a { tokenNetwork, partner } pair of addresses
* @returns A string, for now
Expand All @@ -23,6 +35,21 @@ export function channelKey<
}@${tokenNetwork}`;
}

/**
* Returns a unique key (string) for a channel per tokenNetwork+partner+id
*
* @param channel - Either a Channel or a { tokenNetwork, partner } pair of addresses
* @returns A string, for now
*/
export function channelUniqueKey<
C extends { id: number; tokenNetwork: Address } & (
| { partner: { address: Address } }
| { partner: Address }
)
>(channel: C): ChannelUniqueKey {
return `${channel.id}#${channelKey(channel)}`;
}

/**
* Calculates and returns partial and total amounts of given channel state
*
Expand Down Expand Up @@ -111,3 +138,48 @@ export function assertTx(
),
);
}

/**
* Reactively on state, emits grouped observables per channel which emits respective channel
* states and completes when channel is settled.
* Can be used either passing input directly or as an operator
*
* @param state$ - RaidenState observable, use Latest['state'] for emit at subscription
* @returns Tuple containing grouped Observable and { key, id }: { ChannelKey, number } values
*/
export function groupChannel$(state$: Observable<RaidenState>) {
return state$.pipe(
pluckDistinct('channels'),
concatMap((channels) => from(Object.entries(channels))),
/* this scan stores a reference to each [key,value] in 'acc', and emit as 'changed' iff it
* changes from last time seen. It relies on value references changing only if needed */
scan(
({ acc }, [key, channel]) =>
// if ref didn't change, emit previous accumulator, without 'changed' value
acc[key] === channel
? { acc }
: // else, update ref in 'acc' and emit value in 'changed' prop
{
acc: { ...acc, [key]: channel },
changed: channel,
},
{ acc: {} } as { acc: RaidenState['channels']; changed?: Channel },
),
pluck('changed'),
filter(isntNil), // filter out if reference didn't change from last emit
groupBy(channelUniqueKey),
map((grouped$) => {
const [_id, key] = grouped$.key.split('#');
const id = +_id;
return grouped$.pipe(
takeUntil(
state$.pipe(
// takeUntil first time state's channelId differs from this observable's
// e.g. when channel is settled and gone (channel.id will be undefined)
filter(({ channels }) => channels[key]?.id !== id),
),
),
);
}),
);
}
12 changes: 9 additions & 3 deletions raiden-ts/src/raiden.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import { RaidenConfig, PartialRaidenConfig } from './config';
import { RaidenChannels, ChannelState } from './channels/state';
import { RaidenTransfer, Direction } from './transfers/state';
import { raidenReducer } from './reducer';
import { raidenRootEpic } from './epics';
import { raidenRootEpic, getLatest$ } from './epics';
import {
RaidenAction,
RaidenEvents,
Expand Down Expand Up @@ -72,7 +72,7 @@ import { RaidenError, ErrorCodes } from './utils/error';
export class Raiden {
private readonly store: Store<RaidenState, RaidenAction>;
private readonly deps: RaidenEpicDeps;
public config: RaidenConfig;
public config!: RaidenConfig;

/**
* action$ exposes the internal events pipeline. It's intended for debugging, and its interface
Expand Down Expand Up @@ -234,7 +234,6 @@ export class Raiden {
},
});

this.config = { ...defaultConfig, ...state.config };
this.deps.config$.subscribe((config) => (this.config = config));

this.deps.config$
Expand All @@ -255,6 +254,13 @@ export class Raiden {
state as any, // eslint-disable-line @typescript-eslint/no-explicit-any
applyMiddleware(loggerMiddleware, this.epicMiddleware),
);

// populate deps.latest$, to ensure config & logger subscriptions are setup before start
getLatest$(
of(raidenConfigUpdate({})),
of(this.store.getState()),
this.deps,
).subscribe((latest) => this.deps.latest$.next(latest));
}

/**
Expand Down
58 changes: 18 additions & 40 deletions raiden-ts/src/services/epics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,9 @@ import { RaidenConfig } from '../config';
import { messageGlobalSend } from '../messages/actions';
import { MessageType, PFSCapacityUpdate, PFSFeeUpdate, MonitorRequest } from '../messages/types';
import { MessageTypeId, signMessage, createBalanceHash } from '../messages/utils';
import { channelMonitor } from '../channels/actions';
import { ChannelState, Channel } from '../channels/state';
import { channelAmounts, channelKey } from '../channels/utils';
import { Address, decode, Int, Signature, Signed, UInt, isntNil, assert } from '../utils/types';
import { channelAmounts, groupChannel$ } from '../channels/utils';
import { Address, decode, Int, Signature, Signed, UInt, assert } from '../utils/types';
import { isActionOf } from '../utils/actions';
import { encode, losslessParse, losslessStringify } from '../utils/data';
import { getEventsStream } from '../utils/ethers';
Expand Down Expand Up @@ -389,29 +388,6 @@ export const pathFindServiceEpic = (
);
};

function distinctChannel$(latest$: RaidenEpicDeps['latest$']) {
return latest$.pipe(
pluckDistinct('state', 'channels'),
concatMap((channels) => from(Object.entries(channels))),
/* this scan stores a reference to each [key,value] in 'acc', and emit as 'changed' iff it
* changes from last time seen. It relies on value references changing only if needed */
scan(
({ acc }, [key, channel]) =>
// if ref didn't change, emit previous accumulator, without 'changed' value
acc[key] === channel
? { acc }
: // else, update ref in 'acc' and emit value in 'changed' prop
{
acc: { ...acc, [key]: channel },
changed: channel,
},
{ acc: {} } as { acc: RaidenState['channels']; changed?: Channel },
),
pluck('changed'),
filter(isntNil), // filter out if reference didn't change from last emit
);
}

/**
* Sends a [[PFSCapacityUpdate]] to PFS global room on new deposit on our side of channels
*
Expand All @@ -424,8 +400,9 @@ export const pfsCapacityUpdateEpic = (
{}: Observable<RaidenState>,
{ log, address, network, signer, latest$, config$ }: RaidenEpicDeps,
): Observable<messageGlobalSend> =>
distinctChannel$(latest$).pipe(
groupBy(channelKey),
latest$.pipe(
pluck('state'),
groupChannel$,
withLatestFrom(config$),
mergeMap(([grouped$, { httpTimeout }]) =>
grouped$.pipe(
Expand Down Expand Up @@ -477,24 +454,25 @@ export const pfsCapacityUpdateEpic = (
* @returns Observable of messageGlobalSend actions
*/
export const pfsFeeUpdateEpic = (
action$: Observable<RaidenAction>,
{}: Observable<RaidenAction>,
state$: Observable<RaidenState>,
{ log, address, network, signer, config$ }: RaidenEpicDeps,
): Observable<messageGlobalSend> =>
action$.pipe(
filter(channelMonitor.is),
withLatestFrom(state$, config$),
state$.pipe(
groupChannel$,
// get only first state per channel
mergeMap((grouped$) => grouped$.pipe(first())),
withLatestFrom(config$),
// ignore actions while/if mediating not enabled
filter(([, , { pfsRoom, caps }]) => !!pfsRoom && !caps?.[Capabilities.NO_MEDIATE]),
mergeMap(([action, state, { pfsRoom }]) => {
const channel = state.channels[channelKey(action.meta)];
if (channel?.state !== ChannelState.open) return EMPTY;
filter(([, { pfsRoom, caps }]) => !!pfsRoom && !caps?.[Capabilities.NO_MEDIATE]),
mergeMap(([channel, { pfsRoom }]) => {
if (channel.state !== ChannelState.open) return EMPTY;

const message: PFSFeeUpdate = {
type: MessageType.PFS_FEE_UPDATE,
canonical_identifier: {
chain_identifier: bigNumberify(network.chainId) as UInt<32>,
token_network_address: action.meta.tokenNetwork,
token_network_address: channel.tokenNetwork,
channel_identifier: bigNumberify(channel.id) as UInt<32>,
},
updating_participant: address,
Expand Down Expand Up @@ -689,9 +667,9 @@ export const monitorRequestEpic = (
{}: Observable<RaidenState>,
deps: RaidenEpicDeps,
): Observable<messageGlobalSend> =>
distinctChannel$(deps.latest$).pipe(
// per channel
groupBy(channelKey),
deps.latest$.pipe(
pluck('state'),
groupChannel$,
withLatestFrom(deps.config$),
mergeMap(([grouped$, { httpTimeout }]) =>
grouped$.pipe(
Expand Down
11 changes: 4 additions & 7 deletions raiden-ts/src/transfers/epics/locked.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import {
tap,
withLatestFrom,
} from 'rxjs/operators';
import findKey from 'lodash/findKey';
import isMatchWith from 'lodash/isMatchWith';
import pick from 'lodash/pick';

Expand Down Expand Up @@ -44,7 +43,7 @@ import { RaidenEpicDeps } from '../../types';
import { isActionOf } from '../../utils/actions';
import { LruCache } from '../../utils/lru';
import { pluckDistinct } from '../../utils/rx';
import { Address, assert, BigNumberC, Hash, Signed, UInt, Int } from '../../utils/types';
import { assert, BigNumberC, Hash, Signed, UInt, Int } from '../../utils/types';
import { RaidenError, ErrorCodes } from '../../utils/error';
import { Capabilities } from '../../constants';
import {
Expand Down Expand Up @@ -124,13 +123,12 @@ function makeAndSignTransfer$(
secrethash: action.meta.secrethash,
};
const locksroot = getLocksroot([...channel.own.locks, lock]);
const token = findKey(state.tokens, (tn) => tn === action.payload.tokenNetwork)! as Address;

log.info(
'Signing transfer of value',
action.payload.value.toString(),
'of token',
token,
channel.token,
', to',
action.payload.target,
', through routes',
Expand All @@ -151,7 +149,7 @@ function makeAndSignTransfer$(
locked_amount: channel.own.balanceProof.lockedAmount.add(lock.amount) as UInt<32>,
locksroot,
payment_identifier: action.payload.paymentId,
token,
token: channel.token,
recipient: partner,
lock,
target: action.payload.target,
Expand Down Expand Up @@ -545,13 +543,12 @@ function receiveTransferSigned(
);
const locksroot = getLocksroot([...channel.partner.locks, transfer.lock]);
assert(transfer.locksroot === locksroot, 'locksroot mismatch');
const token = findKey(state.tokens, (tn) => tn === tokenNetwork)! as Address;

log.info(
'Receiving transfer of value',
transfer.lock.amount.toString(),
'of token',
token,
channel.token,
', from',
transfer.initiator,
', through partner',
Expand Down
Loading