Skip to content

Commit

Permalink
feat(store-sync): fetch and store logs (#2003)
Browse files Browse the repository at this point in the history
  • Loading branch information
holic authored Dec 4, 2023
1 parent 854de07 commit a4aff73
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 60 deletions.
7 changes: 7 additions & 0 deletions .changeset/four-hotels-serve.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@latticexyz/store-sync": major
---

Previously, all `store-sync` strategies were susceptible to a potential memory leak where the stream that fetches logs from the RPC would get ahead of the stream that stores the logs in the provided storage adapter. We saw this most often when syncing to remote Postgres servers, where inserting records was much slower than we retrieving them from the RPC. In these cases, the stream would build up a backlog of items until the machine ran out of memory.

This is now fixed by waiting for logs to be stored before fetching the next batch of logs from the RPC. To make this strategy work, we no longer return `blockLogs$` (stream of logs fetched from RPC but before they're stored) and instead just return `storedBlockLogs$` (stream of logs fetched from RPC after they're stored).
1 change: 0 additions & 1 deletion packages/store-sync/src/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ export type SyncOptions<TConfig extends StoreConfig = StoreConfig> = {
export type SyncResult = {
latestBlock$: Observable<Block>;
latestBlockNumber$: Observable<bigint>;
blockLogs$: Observable<BlockLogs>;
storedBlockLogs$: Observable<StorageAdapterBlock>;
waitForTransaction: (tx: Hex) => Promise<void>;
};
Expand Down
124 changes: 65 additions & 59 deletions packages/store-sync/src/createStoreSync.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
import { StoreConfig, storeEventsAbi } from "@latticexyz/store";
import { Hex, TransactionReceiptNotFoundError } from "viem";
import { StorageAdapter, StorageAdapterBlock, SyncFilter, SyncOptions, SyncResult, internalTableIds } from "./common";
import { createBlockStream, blockRangeToLogs, groupLogsByBlockNumber } from "@latticexyz/block-logs-stream";
import {
StorageAdapter,
StorageAdapterBlock,
StoreEventsLog,
SyncFilter,
SyncOptions,
SyncResult,
internalTableIds,
} from "./common";
import { createBlockStream } from "@latticexyz/block-logs-stream";
import {
filter,
map,
tap,
mergeMap,
from,
concat,
concatMap,
Expand All @@ -24,6 +31,7 @@ import { debug as parentDebug } from "./debug";
import { SyncStep } from "./SyncStep";
import { bigIntMax, chunk, isDefined } from "@latticexyz/common/utils";
import { getSnapshot } from "./getSnapshot";
import { fetchAndStoreLogs } from "./fetchAndStoreLogs";

const debug = parentDebug.extend("createStoreSync");

Expand Down Expand Up @@ -58,6 +66,16 @@ export async function createStoreSync<TConfig extends StoreConfig = StoreConfig>
? [...initialFilters, ...tableIds.map((tableId) => ({ tableId })), ...defaultFilters]
: [];

const logFilter = filters.length
? (log: StoreEventsLog): boolean =>
filters.some(
(filter) =>
filter.tableId === log.args.tableId &&
(filter.key0 == null || filter.key0 === log.args.keyTuple[0]) &&
(filter.key1 == null || filter.key1 === log.args.keyTuple[1])
)
: undefined;

const initialBlockLogs$ = defer(async (): Promise<StorageAdapterBlock | undefined> => {
const chainId = publicClient.chain?.id ?? (await publicClient.getChainId());

Expand Down Expand Up @@ -164,71 +182,60 @@ export async function createStoreSync<TConfig extends StoreConfig = StoreConfig>

let startBlock: bigint | null = null;
let endBlock: bigint | null = null;
const blockLogs$ = combineLatest([startBlock$, latestBlockNumber$]).pipe(
let lastBlockNumberProcessed: bigint | null = null;

const storedBlock$ = combineLatest([startBlock$, latestBlockNumber$]).pipe(
map(([startBlock, endBlock]) => ({ startBlock, endBlock })),
tap((range) => {
startBlock = range.startBlock;
endBlock = range.endBlock;
}),
blockRangeToLogs({
publicClient,
address,
events: storeEventsAbi,
// TODO: pass filters in here so we can filter at RPC level
maxBlockRange,
concatMap((range) => {
const storedBlocks = fetchAndStoreLogs({
publicClient,
address,
events: storeEventsAbi,
maxBlockRange,
fromBlock: lastBlockNumberProcessed
? bigIntMax(range.startBlock, lastBlockNumberProcessed + 1n)
: range.startBlock,
toBlock: range.endBlock,
storageAdapter,
logFilter,
});

return from(storedBlocks);
}),
map(({ toBlock, logs }) => {
if (!filters.length) return { toBlock, logs };
const filteredLogs = logs.filter((log) =>
filters.some(
(filter) =>
filter.tableId === log.args.tableId &&
(filter.key0 == null || filter.key0 === log.args.keyTuple[0]) &&
(filter.key1 == null || filter.key1 === log.args.keyTuple[1])
)
);
return { toBlock, logs: filteredLogs };
tap(({ blockNumber, logs }) => {
debug("stored", logs.length, "logs for block", blockNumber);
lastBlockNumberProcessed = blockNumber;

if (startBlock != null && endBlock != null) {
if (blockNumber < endBlock) {
const totalBlocks = endBlock - startBlock;
const processedBlocks = lastBlockNumberProcessed - startBlock;
onProgress?.({
step: SyncStep.RPC,
percentage: Number((processedBlocks * 1000n) / totalBlocks) / 10,
latestBlockNumber: endBlock,
lastBlockNumberProcessed,
message: "Hydrating from RPC",
});
} else {
onProgress?.({
step: SyncStep.LIVE,
percentage: 100,
latestBlockNumber: endBlock,
lastBlockNumberProcessed,
message: "All caught up!",
});
}
}
}),
mergeMap(({ toBlock, logs }) => from(groupLogsByBlockNumber(logs, toBlock))),
share()
);

let lastBlockNumberProcessed: bigint | null = null;
const storedBlockLogs$ = concat(
storedInitialBlockLogs$,
blockLogs$.pipe(
concatMap(async (block) => {
await storageAdapter(block);
return block;
}),
tap(({ blockNumber, logs }) => {
debug("stored", logs.length, "logs for block", blockNumber);
lastBlockNumberProcessed = blockNumber;

if (startBlock != null && endBlock != null) {
if (blockNumber < endBlock) {
const totalBlocks = endBlock - startBlock;
const processedBlocks = lastBlockNumberProcessed - startBlock;
onProgress?.({
step: SyncStep.RPC,
percentage: Number((processedBlocks * 1000n) / totalBlocks) / 10,
latestBlockNumber: endBlock,
lastBlockNumberProcessed,
message: "Hydrating from RPC",
});
} else {
onProgress?.({
step: SyncStep.LIVE,
percentage: 100,
latestBlockNumber: endBlock,
lastBlockNumberProcessed,
message: "All caught up!",
});
}
}
})
)
).pipe(share());
const storedBlockLogs$ = concat(storedInitialBlockLogs$, storedBlock$).pipe(share());

// keep 10 blocks worth processed transactions in memory
const recentBlocksWindow = 10;
Expand Down Expand Up @@ -274,7 +281,6 @@ export async function createStoreSync<TConfig extends StoreConfig = StoreConfig>
return {
latestBlock$,
latestBlockNumber$,
blockLogs$,
storedBlockLogs$,
waitForTransaction,
};
Expand Down
22 changes: 22 additions & 0 deletions packages/store-sync/src/fetchAndStoreLogs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { FetchLogsOptions, fetchLogs, groupLogsByBlockNumber } from "@latticexyz/block-logs-stream";
import { StoreEventsAbi } from "@latticexyz/store";
import { StorageAdapter, StorageAdapterBlock, StoreEventsLog } from "./common";

type FetchAndStoreLogsOptions = FetchLogsOptions<StoreEventsAbi> & {
storageAdapter: StorageAdapter;
logFilter?: (log: StoreEventsLog) => boolean;
};

export async function* fetchAndStoreLogs({
storageAdapter,
logFilter,
...fetchLogsOptions
}: FetchAndStoreLogsOptions): AsyncGenerator<StorageAdapterBlock> {
for await (const { logs, toBlock } of fetchLogs(fetchLogsOptions)) {
const blocks = groupLogsByBlockNumber(logFilter ? logs.filter(logFilter) : logs, toBlock);
for (const block of blocks) {
await storageAdapter(block);
yield block;
}
}
}

0 comments on commit a4aff73

Please sign in to comment.