From ad078ba4efc425d2f126e1a3345f7a9aa319a811 Mon Sep 17 00:00:00 2001 From: Chris Heaney Date: Mon, 17 Jul 2023 16:33:37 -0400 Subject: [PATCH 1/2] sdk: create rpc method to get transactions to avoid some web3.js overhead --- sdk/src/events/fetchLogs.ts | 56 +++++++++++++++++++++++++++++-------- 1 file changed, 44 insertions(+), 12 deletions(-) diff --git a/sdk/src/events/fetchLogs.ts b/sdk/src/events/fetchLogs.ts index 3cf172b46..4bf11173c 100644 --- a/sdk/src/events/fetchLogs.ts +++ b/sdk/src/events/fetchLogs.ts @@ -8,6 +8,7 @@ import { VersionedTransactionResponse, } from '@solana/web3.js'; import { WrappedEvents } from './types'; +import { promiseTimeout } from '../util/promiseTimeout'; type Log = { txSig: TransactionSignature; slot: number; logs: string[] }; type FetchLogsResponse = { @@ -61,23 +62,14 @@ export async function fetchLogs( const chunkedSignatures = chunk(filteredSignatures, 100); - const config = { commitment: finality, maxSupportedTransactionVersion: 0 }; - const transactionLogs = ( await Promise.all( chunkedSignatures.map(async (chunk) => { - const transactions = await connection.getTransactions( + return await fetchTransactionLogs( + connection, chunk.map((confirmedSignature) => confirmedSignature.signature), - //@ts-ignore - config + finality ); - - return transactions.reduce((logs, transaction) => { - if (transaction) { - logs.push(mapTransactionResponseToLog(transaction)); - } - return logs; - }, new Array()); }) ) ).flat(); @@ -95,6 +87,46 @@ export async function fetchLogs( }; } +export async function fetchTransactionLogs( + connection: Connection, + signatures: TransactionSignature[], + finality: Finality +): Promise { + const requests = new Array<{ methodName: string; args: any }>(); + for (const signature of signatures) { + const args = [ + signature, + { commitment: finality, maxSupportedTransactionVersion: 0 }, + ]; + + requests.push({ + methodName: 'getTransaction', + args, + }); + } + console.log(requests); + + const rpcResponses: any | null = await promiseTimeout( + // @ts-ignore + connection._rpcBatchRequest(requests), + 10 * 1000 // 10 second timeout + ); + + if (rpcResponses === null) { + return Promise.reject('RPC request timed out fetching transactions'); + } + + const logs = new Array(); + for (const i in rpcResponses) { + const rpcResponse = rpcResponses[i]; + if (rpcResponse.result) { + logs.push(mapTransactionResponseToLog(rpcResponse.result)); + } + } + + return logs; +} + function chunk(array: readonly T[], size: number): T[][] { return new Array(Math.ceil(array.length / size)) .fill(null) From 5440269ba1f74573a3b63b82528bcc2fa819f680 Mon Sep 17 00:00:00 2001 From: Chris Heaney Date: Mon, 17 Jul 2023 17:19:14 -0400 Subject: [PATCH 2/2] sdk: custom parse logs logic --- sdk/src/events/eventSubscriber.ts | 9 ++- sdk/src/events/parse.ts | 108 ++++++++++++++++++++++++++++++ 2 files changed, 112 insertions(+), 5 deletions(-) create mode 100644 sdk/src/events/parse.ts diff --git a/sdk/src/events/eventSubscriber.ts b/sdk/src/events/eventSubscriber.ts index 5cad13c7e..288655766 100644 --- a/sdk/src/events/eventSubscriber.ts +++ b/sdk/src/events/eventSubscriber.ts @@ -17,6 +17,7 @@ import { WebSocketLogProvider } from './webSocketLogProvider'; import { EventEmitter } from 'events'; import StrictEventEmitter from 'strict-event-emitter-types'; import { getSortFn } from './sort'; +import { parseLogs } from './parse'; export class EventSubscriber { private address: PublicKey; @@ -166,11 +167,9 @@ export class EventSubscriber { ): WrappedEvents { const records = []; // @ts-ignore - const eventGenerator = this.program._events._eventParser.parseLogs( - logs, - false - ); - for (const event of eventGenerator) { + const events = parseLogs(this.program, slot, logs); + for (const event of events) { + // @ts-ignore const expectRecordType = this.eventListMap.has(event.name); if (expectRecordType) { event.data.txSig = txSig; diff --git a/sdk/src/events/parse.ts b/sdk/src/events/parse.ts new file mode 100644 index 000000000..73b947d63 --- /dev/null +++ b/sdk/src/events/parse.ts @@ -0,0 +1,108 @@ +import { Program, Event } from '@coral-xyz/anchor'; + +const driftProgramId = 'dRiftyHA39MWEi3m9aunc5MzRF1JYuBsbn6VPcn33UH'; +const driftProgramStart = `Program ${driftProgramId} invoke`; +const PROGRAM_LOG = 'Program log: '; +const PROGRAM_DATA = 'Program data: '; +const PROGRAM_LOG_START_INDEX = PROGRAM_LOG.length; +const PROGRAM_DATA_START_INDEX = PROGRAM_DATA.length; + +export function parseLogs( + program: Program, + slot: number, + logs: string[] +): Event[] { + const events = []; + const execution = new ExecutionContext(); + for (const log of logs) { + const [event, newProgram, didPop] = handleLog(execution, log, program); + if (event) { + events.push(event); + } + if (newProgram) { + execution.push(newProgram); + } + if (didPop) { + execution.pop(); + } + } + return events; +} + +function handleLog( + execution: ExecutionContext, + log: string, + program: Program +): [Event | null, string | null, boolean] { + // Executing program is drift program. + if (execution.stack.length > 0 && execution.program() === driftProgramId) { + return handleProgramLog(log, program); + } + // Executing program is not drift program. + else { + return [null, ...handleSystemLog(log)]; + } +} + +// Handles logs from *drift* program. +function handleProgramLog( + log: string, + program: Program +): [Event | null, string | null, boolean] { + // This is a `msg!` log or a `sol_log_data` log. + if (log.startsWith(PROGRAM_LOG)) { + const logStr = log.slice(PROGRAM_LOG_START_INDEX); + const event = program.coder.events.decode(logStr); + return [event, null, false]; + } else if (log.startsWith(PROGRAM_DATA)) { + const logStr = log.slice(PROGRAM_DATA_START_INDEX); + const event = program.coder.events.decode(logStr); + return [event, null, false]; + } else { + return [null, ...handleSystemLog(log)]; + } +} + +// Handles logs when the current program being executing is *not* drift. +function handleSystemLog(log: string): [string | null, boolean] { + // System component. + const logStart = log.split(':')[0]; + + // Did the program finish executing? + if (logStart.match(/^Program (.*) success/g) !== null) { + return [null, true]; + // Recursive call. + } else if (logStart.startsWith(driftProgramStart)) { + return [driftProgramId, false]; + } + // CPI call. + else if (logStart.includes('invoke')) { + return ['cpi', false]; // Any string will do. + } else { + return [null, false]; + } +} + +// Stack frame execution context, allowing one to track what program is +// executing for a given log. +class ExecutionContext { + stack: string[] = []; + + program(): string { + if (!this.stack.length) { + throw new Error('Expected the stack to have elements'); + } + return this.stack[this.stack.length - 1]; + } + + push(newProgram: string) { + this.stack.push(newProgram); + } + + pop() { + if (!this.stack.length) { + throw new Error('Expected the stack to have elements'); + } + this.stack.pop(); + } +}