From 8f37e74f595308cc7f06ac58b93b0929f479a085 Mon Sep 17 00:00:00 2001 From: wphan Date: Tue, 17 Sep 2024 15:43:52 -0700 Subject: [PATCH] sdk: EventSubscriber: support events server (#1222) * sdk: EventSubscriber: support events server * check for eventEmitter before listening for reconnect * add browser websocket support --- sdk/src/events/eventSubscriber.ts | 179 +++++++++++++++------- sdk/src/events/eventsServerLogProvider.ts | 152 ++++++++++++++++++ sdk/src/events/pollingLogProvider.ts | 2 +- sdk/src/events/types.ts | 35 ++++- sdk/src/events/webSocketLogProvider.ts | 8 +- 5 files changed, 311 insertions(+), 65 deletions(-) create mode 100644 sdk/src/events/eventsServerLogProvider.ts diff --git a/sdk/src/events/eventSubscriber.ts b/sdk/src/events/eventSubscriber.ts index a1ab66be3..8a9e6f2e1 100644 --- a/sdk/src/events/eventSubscriber.ts +++ b/sdk/src/events/eventSubscriber.ts @@ -9,6 +9,10 @@ import { LogProvider, EventSubscriberEvents, WebSocketLogProviderConfig, + PollingLogProviderConfig, + EventsServerLogProviderConfig, + LogProviderType, + StreamingLogProviderConfig, } from './types'; import { TxEventCache } from './txEventCache'; import { EventList } from './eventList'; @@ -19,6 +23,7 @@ import { EventEmitter } from 'events'; import StrictEventEmitter from 'strict-event-emitter-types'; import { getSortFn } from './sort'; import { parseLogs } from './parse'; +import { EventsServerLogProvider } from './eventsServerLogProvider'; export class EventSubscriber { private address: PublicKey; @@ -27,6 +32,7 @@ export class EventSubscriber { private awaitTxPromises = new Map>(); private awaitTxResolver = new Map void>(); private logProvider: LogProvider; + private currentProviderType: LogProviderType; public eventEmitter: StrictEventEmitter; private lastSeenSlot: number; private lastSeenBlockTime: number | undefined; @@ -43,22 +49,57 @@ export class EventSubscriber { this.eventListMap = new Map>(); this.eventEmitter = new EventEmitter(); - if (this.options.logProviderConfig.type === 'websocket') { + this.currentProviderType = this.options.logProviderConfig.type; + this.initializeLogProvider(); + } + + private initializeLogProvider(subscribe = false) { + if (this.currentProviderType === 'websocket') { + const logProviderConfig = this.options + .logProviderConfig as WebSocketLogProviderConfig; this.logProvider = new WebSocketLogProvider( // @ts-ignore this.connection, this.address, this.options.commitment, - this.options.logProviderConfig.resubTimeoutMs + logProviderConfig.resubTimeoutMs ); - } else { + } else if (this.currentProviderType === 'polling') { + const logProviderConfig = this.options + .logProviderConfig as PollingLogProviderConfig; this.logProvider = new PollingLogProvider( // @ts-ignore this.connection, this.address, - options.commitment, - this.options.logProviderConfig.frequency, - this.options.logProviderConfig.batchSize + this.options.commitment, + logProviderConfig.frequency, + logProviderConfig.batchSize + ); + } else if (this.currentProviderType === 'events-server') { + const logProviderConfig = this.options + .logProviderConfig as EventsServerLogProviderConfig; + this.logProvider = new EventsServerLogProvider( + logProviderConfig.url, + this.options.eventTypes, + this.options.address ? this.options.address.toString() : undefined + ); + } else { + throw new Error(`Invalid log provider type: ${this.currentProviderType}`); + } + + if (subscribe) { + this.logProvider.subscribe( + (txSig, slot, logs, mostRecentBlockTime, txSigIndex) => { + this.handleTxLogs( + txSig, + slot, + logs, + mostRecentBlockTime, + this.currentProviderType === 'events-server', + txSigIndex + ); + }, + true ); } } @@ -77,6 +118,33 @@ export class EventSubscriber { } } + /** + * Implements fallback logic for reconnecting to LogProvider. Currently terminates at polling, + * could be improved to try the original type again after some cooldown. + */ + private updateFallbackProviderType( + reconnectAttempts: number, + maxReconnectAttempts: number + ) { + if (reconnectAttempts < maxReconnectAttempts) { + return; + } + + let nextProviderType = this.currentProviderType; + if (this.currentProviderType === 'events-server') { + nextProviderType = 'websocket'; + } else if (this.currentProviderType === 'websocket') { + nextProviderType = 'polling'; + } else if (this.currentProviderType === 'polling') { + nextProviderType = 'polling'; + } + + console.log( + `EventSubscriber: Failing over providerType ${this.currentProviderType} to ${nextProviderType}` + ); + this.currentProviderType = nextProviderType; + } + public async subscribe(): Promise { try { if (this.logProvider.isSubscribed()) { @@ -85,52 +153,46 @@ export class EventSubscriber { this.populateInitialEventListMap(); - if (this.options.logProviderConfig.type === 'websocket') { - if (this.options.logProviderConfig.resubTimeoutMs) { - if ( - this.options.logProviderConfig.maxReconnectAttempts && - this.options.logProviderConfig.maxReconnectAttempts > 0 - ) { - const logProviderConfig = this.options - .logProviderConfig as WebSocketLogProviderConfig; - this.logProvider.eventEmitter.on( - 'reconnect', - (reconnectAttempts) => { - if ( - reconnectAttempts > logProviderConfig.maxReconnectAttempts - ) { - console.log('Failing over to polling'); - this.logProvider.eventEmitter.removeAllListeners('reconnect'); - this.unsubscribe().then(() => { - this.logProvider = new PollingLogProvider( - // @ts-ignore - this.connection, - this.address, - this.options.commitment, - logProviderConfig.fallbackFrequency, - logProviderConfig.fallbackBatchSize - ); - this.logProvider.subscribe( - (txSig, slot, logs, mostRecentBlockTime) => { - this.handleTxLogs( - txSig, - slot, - logs, - mostRecentBlockTime - ); - }, - true - ); - }); - } + if ( + this.options.logProviderConfig.type === 'websocket' || + this.options.logProviderConfig.type === 'events-server' + ) { + const logProviderConfig = this.options + .logProviderConfig as StreamingLogProviderConfig; + + if (this.logProvider.eventEmitter) { + this.logProvider.eventEmitter.on( + 'reconnect', + async (reconnectAttempts) => { + if (reconnectAttempts > logProviderConfig.maxReconnectAttempts) { + console.log( + `EventSubscriber: Reconnect attempts ${reconnectAttempts}/${logProviderConfig.maxReconnectAttempts}, reconnecting...` + ); + this.logProvider.eventEmitter.removeAllListeners('reconnect'); + await this.unsubscribe(); + this.updateFallbackProviderType( + reconnectAttempts, + logProviderConfig.maxReconnectAttempts + ); + this.initializeLogProvider(true); } - ); - } + } + ); } } - this.logProvider.subscribe((txSig, slot, logs, mostRecentBlockTime) => { - this.handleTxLogs(txSig, slot, logs, mostRecentBlockTime); - }, true); + this.logProvider.subscribe( + (txSig, slot, logs, mostRecentBlockTime, txSigIndex) => { + this.handleTxLogs( + txSig, + slot, + logs, + mostRecentBlockTime, + this.currentProviderType === 'events-server', + txSigIndex + ); + }, + true + ); return true; } catch (e) { @@ -144,13 +206,20 @@ export class EventSubscriber { txSig: TransactionSignature, slot: number, logs: string[], - mostRecentBlockTime: number | undefined + mostRecentBlockTime: number | undefined, + fromEventsServer = false, + txSigIndex: number | undefined = undefined ): void { - if (this.txEventCache.has(txSig)) { + if (!fromEventsServer && this.txEventCache.has(txSig)) { return; } - const wrappedEvents = this.parseEventsFromLogs(txSig, slot, logs); + const wrappedEvents = this.parseEventsFromLogs( + txSig, + slot, + logs, + txSigIndex + ); for (const wrappedEvent of wrappedEvents) { this.eventListMap.get(wrappedEvent.eventType).insert(wrappedEvent); @@ -225,7 +294,8 @@ export class EventSubscriber { private parseEventsFromLogs( txSig: TransactionSignature, slot: number, - logs: string[] + logs: string[], + txSigIndex: number | undefined ): WrappedEvents { const records = []; // @ts-ignore @@ -238,7 +308,8 @@ export class EventSubscriber { event.data.txSig = txSig; event.data.slot = slot; event.data.eventType = event.name; - event.data.txSigIndex = runningEventIndex; + event.data.txSigIndex = + txSigIndex !== undefined ? txSigIndex : runningEventIndex; records.push(event.data); } runningEventIndex++; diff --git a/sdk/src/events/eventsServerLogProvider.ts b/sdk/src/events/eventsServerLogProvider.ts new file mode 100644 index 000000000..798890a68 --- /dev/null +++ b/sdk/src/events/eventsServerLogProvider.ts @@ -0,0 +1,152 @@ +// import WebSocket from 'ws'; +import { logProviderCallback, EventType, LogProvider } from './types'; +import { EventEmitter } from 'events'; + +// browser support +let WebSocketImpl: typeof WebSocket; +if (typeof window !== 'undefined' && window.WebSocket) { + WebSocketImpl = window.WebSocket; +} else { + WebSocketImpl = require('ws'); +} + +const EVENT_SERVER_HEARTBEAT_INTERVAL_MS = 5000; +const ALLOWED_MISSED_HEARTBEATS = 3; + +export class EventsServerLogProvider implements LogProvider { + private ws?: WebSocket; + private callback?: logProviderCallback; + private isUnsubscribing = false; + private externalUnsubscribe = false; + private lastHeartbeat = 0; + private timeoutId?: NodeJS.Timeout; + private reconnectAttempts = 0; + eventEmitter?: EventEmitter; + + public constructor( + private readonly url: string, + private readonly eventTypes: EventType[], + private readonly userAccount?: string + ) { + this.eventEmitter = new EventEmitter(); + } + + public isSubscribed(): boolean { + return this.ws !== undefined; + } + + public async subscribe(callback: logProviderCallback): Promise { + if (this.ws !== undefined) { + return true; + } + this.ws = new WebSocketImpl(this.url); + + this.callback = callback; + this.ws.addEventListener('open', () => { + for (const channel of this.eventTypes) { + const subscribeMessage = { + type: 'subscribe', + channel: channel, + }; + if (this.userAccount) { + subscribeMessage['user'] = this.userAccount; + } + this.ws.send(JSON.stringify(subscribeMessage)); + } + this.reconnectAttempts = 0; + }); + + this.ws.addEventListener('message', (data) => { + try { + if (!this.isUnsubscribing) { + clearTimeout(this.timeoutId); + this.setTimeout(); + if (this.reconnectAttempts > 0) { + console.log( + 'eventsServerLogProvider: Resetting reconnect attempts to 0' + ); + } + this.reconnectAttempts = 0; + } + + const parsedData = JSON.parse(data.data.toString()); + if (parsedData.channel === 'heartbeat') { + this.lastHeartbeat = Date.now(); + return; + } + if (parsedData.message !== undefined) { + return; + } + const event = JSON.parse(parsedData.data); + this.callback( + event.txSig, + event.slot, + [ + 'Program dRiftyHA39MWEi3m9aunc5MzRF1JYuBsbn6VPcn33UH invoke [1]', + event.rawLog, + 'Program dRiftyHA39MWEi3m9aunc5MzRF1JYuBsbn6VPcn33UH success', + ], + undefined, + event.txSigIndex + ); + } catch (error) { + console.error('Error parsing message:', error); + } + }); + + this.ws.addEventListener('close', () => { + console.log('eventsServerLogProvider: WebSocket closed'); + }); + + this.ws.addEventListener('error', (error) => { + console.error('eventsServerLogProvider: WebSocket error:', error); + }); + + this.setTimeout(); + + return true; + } + + public async unsubscribe(external = false): Promise { + this.isUnsubscribing = true; + this.externalUnsubscribe = external; + if (this.timeoutId) { + clearInterval(this.timeoutId); + this.timeoutId = undefined; + } + + if (this.ws !== undefined) { + this.ws.close(); + this.ws = undefined; + return true; + } else { + this.isUnsubscribing = false; + return true; + } + } + + private setTimeout(): void { + this.timeoutId = setTimeout(async () => { + if (this.isUnsubscribing || this.externalUnsubscribe) { + // If we are in the process of unsubscribing, do not attempt to resubscribe + return; + } + + const timeSinceLastHeartbeat = Date.now() - this.lastHeartbeat; + if ( + timeSinceLastHeartbeat > + EVENT_SERVER_HEARTBEAT_INTERVAL_MS * ALLOWED_MISSED_HEARTBEATS + ) { + console.log( + `eventServerLogProvider: No heartbeat in ${timeSinceLastHeartbeat}ms, resubscribing on attempt ${ + this.reconnectAttempts + 1 + }` + ); + await this.unsubscribe(); + this.reconnectAttempts++; + this.eventEmitter.emit('reconnect', this.reconnectAttempts); + this.subscribe(this.callback); + } + }, EVENT_SERVER_HEARTBEAT_INTERVAL_MS * 2); + } +} diff --git a/sdk/src/events/pollingLogProvider.ts b/sdk/src/events/pollingLogProvider.ts index dd53d8c0a..4fba71b28 100644 --- a/sdk/src/events/pollingLogProvider.ts +++ b/sdk/src/events/pollingLogProvider.ts @@ -60,7 +60,7 @@ export class PollingLogProvider implements LogProvider { const { mostRecentTx, transactionLogs } = response; for (const { txSig, slot, logs } of transactionLogs) { - callback(txSig, slot, logs, response.mostRecentBlockTime); + callback(txSig, slot, logs, response.mostRecentBlockTime, undefined); } this.mostRecentSeenTx = mostRecentTx; diff --git a/sdk/src/events/types.ts b/sdk/src/events/types.ts index 529c90e6c..8192ddc0f 100644 --- a/sdk/src/events/types.ts +++ b/sdk/src/events/types.ts @@ -56,7 +56,11 @@ export const DefaultEventSubscriptionOptions: EventSubscriptionOptions = { commitment: 'confirmed', maxTx: 4096, logProviderConfig: { - type: 'websocket', + type: 'events-server', + url: 'wss://events.drift.trade/ws', + maxReconnectAttempts: 5, + fallbackFrequency: 1000, + fallbackBatchSize: 100, }, }; @@ -126,7 +130,8 @@ export type logProviderCallback = ( txSig: TransactionSignature, slot: number, logs: string[], - mostRecentBlockTime: number | undefined + mostRecentBlockTime: number | undefined, + txSigIndex: number | undefined ) => void; export interface LogProvider { @@ -139,20 +144,38 @@ export interface LogProvider { eventEmitter?: EventEmitter; } -export type WebSocketLogProviderConfig = { - type: 'websocket'; - resubTimeoutMs?: number; +export type LogProviderType = 'websocket' | 'polling' | 'events-server'; + +export type StreamingLogProviderConfig = { + /// Max number of times to try reconnecting before failing over to fallback provider maxReconnectAttempts?: number; + /// used for PollingLogProviderConfig on fallback fallbackFrequency?: number; + /// used for PollingLogProviderConfig on fallback fallbackBatchSize?: number; }; +export type WebSocketLogProviderConfig = StreamingLogProviderConfig & { + type: 'websocket'; + /// Max time to wait before resubscribing + resubTimeoutMs?: number; +}; + export type PollingLogProviderConfig = { type: 'polling'; + /// frequency to poll for new events frequency: number; + /// max number of events to fetch per poll batchSize?: number; }; +export type EventsServerLogProviderConfig = StreamingLogProviderConfig & { + type: 'events-server'; + /// url of the events server + url: string; +}; + export type LogProviderConfig = | WebSocketLogProviderConfig - | PollingLogProviderConfig; + | PollingLogProviderConfig + | EventsServerLogProviderConfig; diff --git a/sdk/src/events/webSocketLogProvider.ts b/sdk/src/events/webSocketLogProvider.ts index 9fb896f65..af4831168 100644 --- a/sdk/src/events/webSocketLogProvider.ts +++ b/sdk/src/events/webSocketLogProvider.ts @@ -64,7 +64,7 @@ export class WebSocketLogProvider implements LogProvider { if (logs.err !== null) { return; } - callback(logs.signature, ctx.slot, logs.logs, undefined); + callback(logs.signature, ctx.slot, logs.logs, undefined, undefined); }, this.commitment ); @@ -106,9 +106,9 @@ export class WebSocketLogProvider implements LogProvider { if (this.receivingData) { console.log( - `No log data in ${this.resubTimeoutMs}ms, resubscribing on attempt ${ - this.reconnectAttempts + 1 - }` + `webSocketLogProvider: No log data in ${ + this.resubTimeoutMs + }ms, resubscribing on attempt ${this.reconnectAttempts + 1}` ); await this.unsubscribe(); this.receivingData = false;