Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(store-sync): fetch and store logs #2003

Merged
merged 3 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/four-hotels-serve.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@latticexyz/store-sync": major
---

Previously, all `store-sync` strategies were susceptible to a potential memory leak where the stream that fetches logs from the RPC would get ahead of the stream that stores the logs in the provided storage adapter. We saw this most often when syncing to remote Postgres servers, where inserting records was much slower than we retrieving them from the RPC. In these cases, the stream would build up a backlog of items until the machine ran out of memory.

This is now fixed by waiting for logs to be stored before fetching the next batch of logs from the RPC. To make this strategy work, we no longer return `blockLogs$` (stream of logs fetched from RPC but before they're stored) and instead just return `storedBlockLogs$` (stream of logs fetched from RPC after they're stored).
1 change: 0 additions & 1 deletion packages/store-sync/src/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ export type SyncOptions<TConfig extends StoreConfig = StoreConfig> = {
export type SyncResult = {
latestBlock$: Observable<Block>;
latestBlockNumber$: Observable<bigint>;
blockLogs$: Observable<BlockLogs>;
storedBlockLogs$: Observable<StorageAdapterBlock>;
waitForTransaction: (tx: Hex) => Promise<void>;
};
Expand Down
124 changes: 65 additions & 59 deletions packages/store-sync/src/createStoreSync.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
import { StoreConfig, storeEventsAbi } from "@latticexyz/store";
import { Hex, TransactionReceiptNotFoundError } from "viem";
import { StorageAdapter, StorageAdapterBlock, SyncFilter, SyncOptions, SyncResult, internalTableIds } from "./common";
import { createBlockStream, blockRangeToLogs, groupLogsByBlockNumber } from "@latticexyz/block-logs-stream";
import {
StorageAdapter,
StorageAdapterBlock,
StoreEventsLog,
SyncFilter,
SyncOptions,
SyncResult,
internalTableIds,
} from "./common";
import { createBlockStream } from "@latticexyz/block-logs-stream";
import {
filter,
map,
tap,
mergeMap,
from,
concat,
concatMap,
Expand All @@ -24,6 +31,7 @@ import { debug as parentDebug } from "./debug";
import { SyncStep } from "./SyncStep";
import { bigIntMax, chunk, isDefined } from "@latticexyz/common/utils";
import { getSnapshot } from "./getSnapshot";
import { fetchAndStoreLogs } from "./fetchAndStoreLogs";

const debug = parentDebug.extend("createStoreSync");

Expand Down Expand Up @@ -58,6 +66,16 @@ export async function createStoreSync<TConfig extends StoreConfig = StoreConfig>
? [...initialFilters, ...tableIds.map((tableId) => ({ tableId })), ...defaultFilters]
: [];

const logFilter = filters.length
? (log: StoreEventsLog): boolean =>
filters.some(
(filter) =>
filter.tableId === log.args.tableId &&
(filter.key0 == null || filter.key0 === log.args.keyTuple[0]) &&
(filter.key1 == null || filter.key1 === log.args.keyTuple[1])
)
: undefined;

const initialBlockLogs$ = defer(async (): Promise<StorageAdapterBlock | undefined> => {
const chainId = publicClient.chain?.id ?? (await publicClient.getChainId());

Expand Down Expand Up @@ -164,71 +182,60 @@ export async function createStoreSync<TConfig extends StoreConfig = StoreConfig>

let startBlock: bigint | null = null;
let endBlock: bigint | null = null;
const blockLogs$ = combineLatest([startBlock$, latestBlockNumber$]).pipe(
let lastBlockNumberProcessed: bigint | null = null;

const storedBlock$ = combineLatest([startBlock$, latestBlockNumber$]).pipe(
map(([startBlock, endBlock]) => ({ startBlock, endBlock })),
tap((range) => {
startBlock = range.startBlock;
endBlock = range.endBlock;
}),
blockRangeToLogs({
publicClient,
address,
events: storeEventsAbi,
// TODO: pass filters in here so we can filter at RPC level
maxBlockRange,
concatMap((range) => {
const storedBlocks = fetchAndStoreLogs({
publicClient,
address,
events: storeEventsAbi,
maxBlockRange,
fromBlock: lastBlockNumberProcessed
? bigIntMax(range.startBlock, lastBlockNumberProcessed + 1n)
: range.startBlock,
toBlock: range.endBlock,
storageAdapter,
logFilter,
});

return from(storedBlocks);
}),
map(({ toBlock, logs }) => {
if (!filters.length) return { toBlock, logs };
const filteredLogs = logs.filter((log) =>
filters.some(
(filter) =>
filter.tableId === log.args.tableId &&
(filter.key0 == null || filter.key0 === log.args.keyTuple[0]) &&
(filter.key1 == null || filter.key1 === log.args.keyTuple[1])
)
);
return { toBlock, logs: filteredLogs };
tap(({ blockNumber, logs }) => {
debug("stored", logs.length, "logs for block", blockNumber);
lastBlockNumberProcessed = blockNumber;

if (startBlock != null && endBlock != null) {
if (blockNumber < endBlock) {
const totalBlocks = endBlock - startBlock;
const processedBlocks = lastBlockNumberProcessed - startBlock;
onProgress?.({
step: SyncStep.RPC,
percentage: Number((processedBlocks * 1000n) / totalBlocks) / 10,
latestBlockNumber: endBlock,
lastBlockNumberProcessed,
message: "Hydrating from RPC",
});
} else {
onProgress?.({
step: SyncStep.LIVE,
percentage: 100,
latestBlockNumber: endBlock,
lastBlockNumberProcessed,
message: "All caught up!",
});
}
}
}),
mergeMap(({ toBlock, logs }) => from(groupLogsByBlockNumber(logs, toBlock))),
share()
);

let lastBlockNumberProcessed: bigint | null = null;
const storedBlockLogs$ = concat(
storedInitialBlockLogs$,
blockLogs$.pipe(
concatMap(async (block) => {
await storageAdapter(block);
return block;
}),
tap(({ blockNumber, logs }) => {
debug("stored", logs.length, "logs for block", blockNumber);
lastBlockNumberProcessed = blockNumber;

if (startBlock != null && endBlock != null) {
if (blockNumber < endBlock) {
const totalBlocks = endBlock - startBlock;
const processedBlocks = lastBlockNumberProcessed - startBlock;
onProgress?.({
step: SyncStep.RPC,
percentage: Number((processedBlocks * 1000n) / totalBlocks) / 10,
latestBlockNumber: endBlock,
lastBlockNumberProcessed,
message: "Hydrating from RPC",
});
} else {
onProgress?.({
step: SyncStep.LIVE,
percentage: 100,
latestBlockNumber: endBlock,
lastBlockNumberProcessed,
message: "All caught up!",
});
}
}
})
)
).pipe(share());
const storedBlockLogs$ = concat(storedInitialBlockLogs$, storedBlock$).pipe(share());

// keep 10 blocks worth processed transactions in memory
const recentBlocksWindow = 10;
Expand Down Expand Up @@ -274,7 +281,6 @@ export async function createStoreSync<TConfig extends StoreConfig = StoreConfig>
return {
latestBlock$,
latestBlockNumber$,
blockLogs$,
storedBlockLogs$,
waitForTransaction,
};
Expand Down
22 changes: 22 additions & 0 deletions packages/store-sync/src/fetchAndStoreLogs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { FetchLogsOptions, fetchLogs, groupLogsByBlockNumber } from "@latticexyz/block-logs-stream";
import { StoreEventsAbi } from "@latticexyz/store";
import { StorageAdapter, StorageAdapterBlock, StoreEventsLog } from "./common";

type FetchAndStoreLogsOptions = FetchLogsOptions<StoreEventsAbi> & {
storageAdapter: StorageAdapter;
logFilter?: (log: StoreEventsLog) => boolean;
};

export async function* fetchAndStoreLogs({
storageAdapter,
logFilter,
...fetchLogsOptions
}: FetchAndStoreLogsOptions): AsyncGenerator<StorageAdapterBlock> {
for await (const { logs, toBlock } of fetchLogs(fetchLogsOptions)) {
const blocks = groupLogsByBlockNumber(logFilter ? logs.filter(logFilter) : logs, toBlock);
for (const block of blocks) {
await storageAdapter(block);
yield block;
}
}
}
Loading