Skip to content

Commit

Permalink
fix(store-sync): batch call block number with logs to account for uns…
Browse files Browse the repository at this point in the history
…ynced RPCs
  • Loading branch information
dhvanipa committed Jun 13, 2024
1 parent e0c9eaa commit 0aa7cc6
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 9 deletions.
13 changes: 12 additions & 1 deletion packages/block-logs-stream/src/fetchLogs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,18 @@ export async function* fetchLogs<TAbiEvents extends readonly AbiEvent[]>({
try {
const toBlock = fromBlock + blockRange;
debug("getting logs", { fromBlock, toBlock });
const logs = await publicClient.getLogs({ ...getLogsOpts, fromBlock, toBlock, strict: true });

const [latestBlockNumber, logs] = await Promise.all([
publicClient.getBlockNumber({ cacheTime: 0 }),
publicClient.getLogs({ ...getLogsOpts, fromBlock, toBlock, strict: true }),
]);
if (latestBlockNumber < toBlock) {
const blockTimeInSeconds = 2;
const seconds = Number(toBlock - latestBlockNumber) * blockTimeInSeconds;
debug(`latest block number ${latestBlockNumber} is less than toBlock ${toBlock}, retrying in ${seconds}s`);
await wait(1000 * seconds);
continue;
}
yield { fromBlock, toBlock, logs };
fromBlock = toBlock + 1n;
blockRange = bigIntMin(maxBlockRange, getLogsOpts.toBlock - fromBlock);
Expand Down
24 changes: 16 additions & 8 deletions packages/store-sync/src/createStoreSync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,35 +187,43 @@ export async function createStoreSync<config extends StoreConfig = StoreConfig>(
tap((startBlock) => debug("starting sync from block", startBlock)),
);

let startBlock: bigint | null = null;
let endBlock: bigint | null = null;
let lastBlockNumberProcessed: bigint | null = null;

const latestBlock$ = createBlockStream({ publicClient, blockTag: followBlockTag }).pipe(shareReplay(1));
const latestBlockNumber$ = latestBlock$.pipe(
map((block) => block.number),
tap((blockNumber) => {
debug("on block number", blockNumber, "for", followBlockTag, "block tag");
}),
filter((blockNumber) => {
return lastBlockNumberProcessed == null || blockNumber > lastBlockNumberProcessed;
}),
shareReplay(1),
);

let startBlock: bigint | null = null;
let endBlock: bigint | null = null;
let lastBlockNumberProcessed: bigint | null = null;

const storedBlock$ = combineLatest([startBlock$, latestBlockNumber$]).pipe(
map(([startBlock, endBlock]) => ({ startBlock, endBlock })),
tap((range) => {
startBlock = range.startBlock;
endBlock = range.endBlock;
}),
concatMap((range) => {
const fromBlock = lastBlockNumberProcessed
? bigIntMax(range.startBlock, lastBlockNumberProcessed + 1n)
: range.startBlock;
const toBlock = range.endBlock;
if (toBlock < fromBlock) {
throw new Error(`toBlock ${toBlock} is less than fromBlock ${fromBlock}`);
}
const storedBlocks = fetchAndStoreLogs({
publicClient,
address,
events: storeEventsAbi,
maxBlockRange,
fromBlock: lastBlockNumberProcessed
? bigIntMax(range.startBlock, lastBlockNumberProcessed + 1n)
: range.startBlock,
toBlock: range.endBlock,
fromBlock: fromBlock,
toBlock: toBlock,
storageAdapter,
logFilter,
});
Expand Down

0 comments on commit 0aa7cc6

Please sign in to comment.