Skip to content

Commit

Permalink
sdk: EventSubscriber: support events server (#1222)
Browse files Browse the repository at this point in the history
* sdk: EventSubscriber: support events server

* check for eventEmitter before listening for reconnect

* add browser websocket support
  • Loading branch information
wphan authored Sep 17, 2024
1 parent 0143abc commit 8f37e74
Show file tree
Hide file tree
Showing 5 changed files with 311 additions and 65 deletions.
179 changes: 125 additions & 54 deletions sdk/src/events/eventSubscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ import {
LogProvider,
EventSubscriberEvents,
WebSocketLogProviderConfig,
PollingLogProviderConfig,
EventsServerLogProviderConfig,
LogProviderType,
StreamingLogProviderConfig,
} from './types';
import { TxEventCache } from './txEventCache';
import { EventList } from './eventList';
Expand All @@ -19,6 +23,7 @@ import { EventEmitter } from 'events';
import StrictEventEmitter from 'strict-event-emitter-types';
import { getSortFn } from './sort';
import { parseLogs } from './parse';
import { EventsServerLogProvider } from './eventsServerLogProvider';

export class EventSubscriber {
private address: PublicKey;
Expand All @@ -27,6 +32,7 @@ export class EventSubscriber {
private awaitTxPromises = new Map<string, Promise<void>>();
private awaitTxResolver = new Map<string, () => void>();
private logProvider: LogProvider;
private currentProviderType: LogProviderType;
public eventEmitter: StrictEventEmitter<EventEmitter, EventSubscriberEvents>;
private lastSeenSlot: number;
private lastSeenBlockTime: number | undefined;
Expand All @@ -43,22 +49,57 @@ export class EventSubscriber {
this.eventListMap = new Map<EventType, EventList<EventType>>();
this.eventEmitter = new EventEmitter();

if (this.options.logProviderConfig.type === 'websocket') {
this.currentProviderType = this.options.logProviderConfig.type;
this.initializeLogProvider();
}

private initializeLogProvider(subscribe = false) {
if (this.currentProviderType === 'websocket') {
const logProviderConfig = this.options
.logProviderConfig as WebSocketLogProviderConfig;
this.logProvider = new WebSocketLogProvider(
// @ts-ignore
this.connection,
this.address,
this.options.commitment,
this.options.logProviderConfig.resubTimeoutMs
logProviderConfig.resubTimeoutMs
);
} else {
} else if (this.currentProviderType === 'polling') {
const logProviderConfig = this.options
.logProviderConfig as PollingLogProviderConfig;
this.logProvider = new PollingLogProvider(
// @ts-ignore
this.connection,
this.address,
options.commitment,
this.options.logProviderConfig.frequency,
this.options.logProviderConfig.batchSize
this.options.commitment,
logProviderConfig.frequency,
logProviderConfig.batchSize
);
} else if (this.currentProviderType === 'events-server') {
const logProviderConfig = this.options
.logProviderConfig as EventsServerLogProviderConfig;
this.logProvider = new EventsServerLogProvider(
logProviderConfig.url,
this.options.eventTypes,
this.options.address ? this.options.address.toString() : undefined
);
} else {
throw new Error(`Invalid log provider type: ${this.currentProviderType}`);
}

if (subscribe) {
this.logProvider.subscribe(
(txSig, slot, logs, mostRecentBlockTime, txSigIndex) => {
this.handleTxLogs(
txSig,
slot,
logs,
mostRecentBlockTime,
this.currentProviderType === 'events-server',
txSigIndex
);
},
true
);
}
}
Expand All @@ -77,6 +118,33 @@ export class EventSubscriber {
}
}

/**
* Implements fallback logic for reconnecting to LogProvider. Currently terminates at polling,
* could be improved to try the original type again after some cooldown.
*/
private updateFallbackProviderType(
reconnectAttempts: number,
maxReconnectAttempts: number
) {
if (reconnectAttempts < maxReconnectAttempts) {
return;
}

let nextProviderType = this.currentProviderType;
if (this.currentProviderType === 'events-server') {
nextProviderType = 'websocket';
} else if (this.currentProviderType === 'websocket') {
nextProviderType = 'polling';
} else if (this.currentProviderType === 'polling') {
nextProviderType = 'polling';
}

console.log(
`EventSubscriber: Failing over providerType ${this.currentProviderType} to ${nextProviderType}`
);
this.currentProviderType = nextProviderType;
}

public async subscribe(): Promise<boolean> {
try {
if (this.logProvider.isSubscribed()) {
Expand All @@ -85,52 +153,46 @@ export class EventSubscriber {

this.populateInitialEventListMap();

if (this.options.logProviderConfig.type === 'websocket') {
if (this.options.logProviderConfig.resubTimeoutMs) {
if (
this.options.logProviderConfig.maxReconnectAttempts &&
this.options.logProviderConfig.maxReconnectAttempts > 0
) {
const logProviderConfig = this.options
.logProviderConfig as WebSocketLogProviderConfig;
this.logProvider.eventEmitter.on(
'reconnect',
(reconnectAttempts) => {
if (
reconnectAttempts > logProviderConfig.maxReconnectAttempts
) {
console.log('Failing over to polling');
this.logProvider.eventEmitter.removeAllListeners('reconnect');
this.unsubscribe().then(() => {
this.logProvider = new PollingLogProvider(
// @ts-ignore
this.connection,
this.address,
this.options.commitment,
logProviderConfig.fallbackFrequency,
logProviderConfig.fallbackBatchSize
);
this.logProvider.subscribe(
(txSig, slot, logs, mostRecentBlockTime) => {
this.handleTxLogs(
txSig,
slot,
logs,
mostRecentBlockTime
);
},
true
);
});
}
if (
this.options.logProviderConfig.type === 'websocket' ||
this.options.logProviderConfig.type === 'events-server'
) {
const logProviderConfig = this.options
.logProviderConfig as StreamingLogProviderConfig;

if (this.logProvider.eventEmitter) {
this.logProvider.eventEmitter.on(
'reconnect',
async (reconnectAttempts) => {
if (reconnectAttempts > logProviderConfig.maxReconnectAttempts) {
console.log(
`EventSubscriber: Reconnect attempts ${reconnectAttempts}/${logProviderConfig.maxReconnectAttempts}, reconnecting...`
);
this.logProvider.eventEmitter.removeAllListeners('reconnect');
await this.unsubscribe();
this.updateFallbackProviderType(
reconnectAttempts,
logProviderConfig.maxReconnectAttempts
);
this.initializeLogProvider(true);
}
);
}
}
);
}
}
this.logProvider.subscribe((txSig, slot, logs, mostRecentBlockTime) => {
this.handleTxLogs(txSig, slot, logs, mostRecentBlockTime);
}, true);
this.logProvider.subscribe(
(txSig, slot, logs, mostRecentBlockTime, txSigIndex) => {
this.handleTxLogs(
txSig,
slot,
logs,
mostRecentBlockTime,
this.currentProviderType === 'events-server',
txSigIndex
);
},
true
);

return true;
} catch (e) {
Expand All @@ -144,13 +206,20 @@ export class EventSubscriber {
txSig: TransactionSignature,
slot: number,
logs: string[],
mostRecentBlockTime: number | undefined
mostRecentBlockTime: number | undefined,
fromEventsServer = false,
txSigIndex: number | undefined = undefined
): void {
if (this.txEventCache.has(txSig)) {
if (!fromEventsServer && this.txEventCache.has(txSig)) {
return;
}

const wrappedEvents = this.parseEventsFromLogs(txSig, slot, logs);
const wrappedEvents = this.parseEventsFromLogs(
txSig,
slot,
logs,
txSigIndex
);

for (const wrappedEvent of wrappedEvents) {
this.eventListMap.get(wrappedEvent.eventType).insert(wrappedEvent);
Expand Down Expand Up @@ -225,7 +294,8 @@ export class EventSubscriber {
private parseEventsFromLogs(
txSig: TransactionSignature,
slot: number,
logs: string[]
logs: string[],
txSigIndex: number | undefined
): WrappedEvents {
const records = [];
// @ts-ignore
Expand All @@ -238,7 +308,8 @@ export class EventSubscriber {
event.data.txSig = txSig;
event.data.slot = slot;
event.data.eventType = event.name;
event.data.txSigIndex = runningEventIndex;
event.data.txSigIndex =
txSigIndex !== undefined ? txSigIndex : runningEventIndex;
records.push(event.data);
}
runningEventIndex++;
Expand Down
Loading

0 comments on commit 8f37e74

Please sign in to comment.