Skip to content

Commit

Permalink
sdk: fix fromEthersEvent ranges fetching in case of temporary conne…
Browse files Browse the repository at this point in the history
…ctivity loss
  • Loading branch information
andrevmatos committed Jul 20, 2020
1 parent dcf1433 commit 8937a0b
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 34 deletions.
2 changes: 2 additions & 0 deletions raiden-ts/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## [Unreleased]
### Fixed
- [#1923] Fix `fromEthersEvent` ranges fetching in case of temporary connectivity loss

### Added
- [#703] Add option to fetch all contracts addresses from UserDeposit address alone
Expand All @@ -17,6 +18,7 @@
[#1910]: https://github.com/raiden-network/light-client/pull/1910
[#1913]: https://github.com/raiden-network/light-client/pull/1913
[#1824]: https://github.com/raiden-network/light-client/issues/1824
[#1923]: https://github.com/raiden-network/light-client/issues/1923

## [0.10.0] - 2020-07-13
### Fixed
Expand Down
63 changes: 44 additions & 19 deletions raiden-ts/src/utils/ethers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,17 @@ import { JsonRpcProvider, Listener, EventType, Filter, Log } from 'ethers/provid
import { Network } from 'ethers/utils';
import { getNetwork as parseNetwork } from 'ethers/utils/networks';
import { Observable, fromEventPattern, merge, from, of, EMPTY, combineLatest, defer } from 'rxjs';
import { filter, first, map, mergeMap, share, toArray, debounceTime, tap } from 'rxjs/operators';
import {
filter,
first,
map,
mergeMap,
share,
toArray,
debounceTime,
catchError,
exhaustMap,
} from 'rxjs/operators';
import sortBy from 'lodash/sortBy';

import { isntNil } from './types';
Expand Down Expand Up @@ -43,29 +53,44 @@ export function fromEthersEvent<T>(
resultSelector,
) as Observable<T>;

let first = 0;
let latestEventBlock = 0;
const blockQueue: number[] = []; // sorted 'fromBlock' queue, at most of [range] size
return defer(() => {
// 'first' starts with subscription-time's private value set with provider.resetEventsBlock
first = (target as any)._lastBlockNumber || target.blockNumber || Number.POSITIVE_INFINITY;
// 'resetEventsBlock' is private, set at [[Raiden]] constructor, so we need 'any'
const resetBlock: number = (target as any)._lastBlockNumber;
const firstBlock = resetBlock && resetBlock > 0 ? resetBlock : target.blockNumber ?? 1;
blockQueue.push(firstBlock); // starts 'blockQueue' with subscription-time's resetEventsBlock

return fromEthersEvent<number>(target, 'block');
}).pipe(
debounceTime(Math.ceil(target.pollingInterval / 10)), // debounce bursts of blocks
mergeMap(async (blockNumber, cnt) =>
target.getLogs({
...event,
// on first [range] calls, getLogs since [first], unless an event was found on a previous
// call, on which case use block after [latestEventBlock];
// after [range] calls, ignore 'first' to get only since [-range] blocks
fromBlock: Math.max(
Math.min(blockNumber - range, cnt < range ? first : Number.POSITIVE_INFINITY),
latestEventBlock + 1,
),
toBlock: 'latest',
}),
// exhaustMap will skip new events if it's still busy with a previous getLogs call,
// but next [fromBlock] in queue always includes range for any skipped block
exhaustMap((blockNumber) =>
defer(() =>
target.getLogs({ ...event, fromBlock: blockQueue[0], toBlock: blockNumber }),
).pipe(
mergeMap((logs) => {
// if queue is full, pop_front 'fromBlock' which was just queried
if (blockQueue.length >= range) blockQueue.shift();
// push_back next block iff getLogs didn't throw, queue is never empty
blockQueue.push(blockNumber + 1);

// if a log came, clear queued smaller blocks than it and push_front block after
const afterLogBlock =
Math.max(0, ...logs.map((log) => log.blockNumber).filter(isntNil)) + 1;
if (blockQueue[0] <= afterLogBlock)
blockQueue.splice(
0, // from queue's front
blockQueue.filter((block) => block <= afterLogBlock).length, // clear older blocks
afterLogBlock, // push_front block after newest log's blockNumber
);

return from(logs); // unwind logs
}),
// `getLogs` errors skip [blockQueue] update, so previous fromBlock is retried later
catchError(() => EMPTY),
),
),
mergeMap((logs) => from(logs)), // unwind
tap((log) => (latestEventBlock = Math.max(latestEventBlock, log.blockNumber!))),
);
}

Expand Down
29 changes: 14 additions & 15 deletions raiden-ts/tests/unit/epics/monitor.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { raidenEpicDeps, makeLog, makeAddress, makeHash, makeRaidens, waitBlock } from '../mocks';
import {
raidenEpicDeps,
makeLog,
makeAddress,
makeHash,
makeRaidens,
waitBlock,
providersEmit,
} from '../mocks';
import { epicFixtures, tokenNetwork, ensureChannelIsOpen, id } from '../fixtures';

import { bigNumberify, BigNumberish, defaultAbiCoder } from 'ethers/utils';
Expand Down Expand Up @@ -336,21 +344,12 @@ test('msMonitorNewBPEpic', async () => {
const monitoringService = makeAddress();
const nonce = Two as UInt<8>;
const txHash = makeHash();
const txBlock = 68;

// emit a NewBalanceProofReceived event
raiden.deps.provider.emit(
monitoringServiceContract.filters.NewBalanceProofReceived(
null,
null,
null,
null,
null,
raiden.address,
),
await providersEmit(
{},
makeLog({
transactionHash: txHash,
blockNumber: txBlock,
filter: monitoringServiceContract.filters.NewBalanceProofReceived(
null,
null,
Expand All @@ -365,8 +364,8 @@ test('msMonitorNewBPEpic', async () => {
),
}),
);

await waitBlock();

expect(raiden.output).toContainEqual(
msBalanceProofSent({
tokenNetwork,
Expand All @@ -376,8 +375,8 @@ test('msMonitorNewBPEpic', async () => {
nonce,
monitoringService,
txHash,
txBlock,
confirmed: true,
txBlock: expect.any(Number),
confirmed: undefined,
}),
);
});

0 comments on commit 8937a0b

Please sign in to comment.