diff --git a/.changeset/four-hotels-serve.md b/.changeset/four-hotels-serve.md new file mode 100644 index 0000000000..b54a90adc6 --- /dev/null +++ b/.changeset/four-hotels-serve.md @@ -0,0 +1,7 @@ +--- +"@latticexyz/store-sync": major +--- + +Previously, all `store-sync` strategies were susceptible to a potential memory leak where the stream that fetches logs from the RPC would get ahead of the stream that stores the logs in the provided storage adapter. We saw this most often when syncing to remote Postgres servers, where inserting records was much slower than we retrieving them from the RPC. In these cases, the stream would build up a backlog of items until the machine ran out of memory. + +This is now fixed by waiting for logs to be stored before fetching the next batch of logs from the RPC. To make this strategy work, we no longer return `blockLogs$` (stream of logs fetched from RPC but before they're stored) and instead just return `storedBlockLogs$` (stream of logs fetched from RPC after they're stored). diff --git a/packages/store-sync/src/common.ts b/packages/store-sync/src/common.ts index a7873ccbd9..bd5a20aee9 100644 --- a/packages/store-sync/src/common.ts +++ b/packages/store-sync/src/common.ts @@ -109,7 +109,6 @@ export type SyncOptions = { export type SyncResult = { latestBlock$: Observable; latestBlockNumber$: Observable; - blockLogs$: Observable; storedBlockLogs$: Observable; waitForTransaction: (tx: Hex) => Promise; }; diff --git a/packages/store-sync/src/createStoreSync.ts b/packages/store-sync/src/createStoreSync.ts index 59d83bdf51..5bcffb6f89 100644 --- a/packages/store-sync/src/createStoreSync.ts +++ b/packages/store-sync/src/createStoreSync.ts @@ -1,12 +1,19 @@ import { StoreConfig, storeEventsAbi } from "@latticexyz/store"; import { Hex, TransactionReceiptNotFoundError } from "viem"; -import { StorageAdapter, StorageAdapterBlock, SyncFilter, SyncOptions, SyncResult, internalTableIds } from "./common"; -import { createBlockStream, blockRangeToLogs, groupLogsByBlockNumber } from "@latticexyz/block-logs-stream"; +import { + StorageAdapter, + StorageAdapterBlock, + StoreEventsLog, + SyncFilter, + SyncOptions, + SyncResult, + internalTableIds, +} from "./common"; +import { createBlockStream } from "@latticexyz/block-logs-stream"; import { filter, map, tap, - mergeMap, from, concat, concatMap, @@ -24,6 +31,7 @@ import { debug as parentDebug } from "./debug"; import { SyncStep } from "./SyncStep"; import { bigIntMax, chunk, isDefined } from "@latticexyz/common/utils"; import { getSnapshot } from "./getSnapshot"; +import { fetchAndStoreLogs } from "./fetchAndStoreLogs"; const debug = parentDebug.extend("createStoreSync"); @@ -58,6 +66,16 @@ export async function createStoreSync ? [...initialFilters, ...tableIds.map((tableId) => ({ tableId })), ...defaultFilters] : []; + const logFilter = filters.length + ? (log: StoreEventsLog): boolean => + filters.some( + (filter) => + filter.tableId === log.args.tableId && + (filter.key0 == null || filter.key0 === log.args.keyTuple[0]) && + (filter.key1 == null || filter.key1 === log.args.keyTuple[1]) + ) + : undefined; + const initialBlockLogs$ = defer(async (): Promise => { const chainId = publicClient.chain?.id ?? (await publicClient.getChainId()); @@ -164,71 +182,60 @@ export async function createStoreSync let startBlock: bigint | null = null; let endBlock: bigint | null = null; - const blockLogs$ = combineLatest([startBlock$, latestBlockNumber$]).pipe( + let lastBlockNumberProcessed: bigint | null = null; + + const storedBlock$ = combineLatest([startBlock$, latestBlockNumber$]).pipe( map(([startBlock, endBlock]) => ({ startBlock, endBlock })), tap((range) => { startBlock = range.startBlock; endBlock = range.endBlock; }), - blockRangeToLogs({ - publicClient, - address, - events: storeEventsAbi, - // TODO: pass filters in here so we can filter at RPC level - maxBlockRange, + concatMap((range) => { + const storedBlocks = fetchAndStoreLogs({ + publicClient, + address, + events: storeEventsAbi, + maxBlockRange, + fromBlock: lastBlockNumberProcessed + ? bigIntMax(range.startBlock, lastBlockNumberProcessed + 1n) + : range.startBlock, + toBlock: range.endBlock, + storageAdapter, + logFilter, + }); + + return from(storedBlocks); }), - map(({ toBlock, logs }) => { - if (!filters.length) return { toBlock, logs }; - const filteredLogs = logs.filter((log) => - filters.some( - (filter) => - filter.tableId === log.args.tableId && - (filter.key0 == null || filter.key0 === log.args.keyTuple[0]) && - (filter.key1 == null || filter.key1 === log.args.keyTuple[1]) - ) - ); - return { toBlock, logs: filteredLogs }; + tap(({ blockNumber, logs }) => { + debug("stored", logs.length, "logs for block", blockNumber); + lastBlockNumberProcessed = blockNumber; + + if (startBlock != null && endBlock != null) { + if (blockNumber < endBlock) { + const totalBlocks = endBlock - startBlock; + const processedBlocks = lastBlockNumberProcessed - startBlock; + onProgress?.({ + step: SyncStep.RPC, + percentage: Number((processedBlocks * 1000n) / totalBlocks) / 10, + latestBlockNumber: endBlock, + lastBlockNumberProcessed, + message: "Hydrating from RPC", + }); + } else { + onProgress?.({ + step: SyncStep.LIVE, + percentage: 100, + latestBlockNumber: endBlock, + lastBlockNumberProcessed, + message: "All caught up!", + }); + } + } }), - mergeMap(({ toBlock, logs }) => from(groupLogsByBlockNumber(logs, toBlock))), share() ); - let lastBlockNumberProcessed: bigint | null = null; - const storedBlockLogs$ = concat( - storedInitialBlockLogs$, - blockLogs$.pipe( - concatMap(async (block) => { - await storageAdapter(block); - return block; - }), - tap(({ blockNumber, logs }) => { - debug("stored", logs.length, "logs for block", blockNumber); - lastBlockNumberProcessed = blockNumber; - - if (startBlock != null && endBlock != null) { - if (blockNumber < endBlock) { - const totalBlocks = endBlock - startBlock; - const processedBlocks = lastBlockNumberProcessed - startBlock; - onProgress?.({ - step: SyncStep.RPC, - percentage: Number((processedBlocks * 1000n) / totalBlocks) / 10, - latestBlockNumber: endBlock, - lastBlockNumberProcessed, - message: "Hydrating from RPC", - }); - } else { - onProgress?.({ - step: SyncStep.LIVE, - percentage: 100, - latestBlockNumber: endBlock, - lastBlockNumberProcessed, - message: "All caught up!", - }); - } - } - }) - ) - ).pipe(share()); + const storedBlockLogs$ = concat(storedInitialBlockLogs$, storedBlock$).pipe(share()); // keep 10 blocks worth processed transactions in memory const recentBlocksWindow = 10; @@ -274,7 +281,6 @@ export async function createStoreSync return { latestBlock$, latestBlockNumber$, - blockLogs$, storedBlockLogs$, waitForTransaction, }; diff --git a/packages/store-sync/src/fetchAndStoreLogs.ts b/packages/store-sync/src/fetchAndStoreLogs.ts new file mode 100644 index 0000000000..76a5f734e5 --- /dev/null +++ b/packages/store-sync/src/fetchAndStoreLogs.ts @@ -0,0 +1,22 @@ +import { FetchLogsOptions, fetchLogs, groupLogsByBlockNumber } from "@latticexyz/block-logs-stream"; +import { StoreEventsAbi } from "@latticexyz/store"; +import { StorageAdapter, StorageAdapterBlock, StoreEventsLog } from "./common"; + +type FetchAndStoreLogsOptions = FetchLogsOptions & { + storageAdapter: StorageAdapter; + logFilter?: (log: StoreEventsLog) => boolean; +}; + +export async function* fetchAndStoreLogs({ + storageAdapter, + logFilter, + ...fetchLogsOptions +}: FetchAndStoreLogsOptions): AsyncGenerator { + for await (const { logs, toBlock } of fetchLogs(fetchLogsOptions)) { + const blocks = groupLogsByBlockNumber(logFilter ? logs.filter(logFilter) : logs, toBlock); + for (const block of blocks) { + await storageAdapter(block); + yield block; + } + } +}