Skip to content

Commit

Permalink
Merge pull request #300 from drift-labs/wphan/sdk_account_loader_starved
Browse files Browse the repository at this point in the history
sdk: fix BulkAccountLoader starvation
  • Loading branch information
wphan authored and crispheaney committed Dec 29, 2022
1 parent cd1d282 commit 41670fc
Show file tree
Hide file tree
Showing 9 changed files with 55 additions and 75 deletions.
1 change: 0 additions & 1 deletion sdk/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
"@solana/spl-token": "^0.1.6",
"@solana/web3.js": "1.66.2",
"@switchboard-xyz/switchboard-v2": "^0.0.67",
"async-mutex": "^0.4.0",
"strict-event-emitter-types": "^2.0.0",
"uuid": "^8.3.2"
},
Expand Down
78 changes: 36 additions & 42 deletions sdk/src/accounts/bulkAccountLoader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { Commitment, Connection, PublicKey } from '@solana/web3.js';
import { v4 as uuidv4 } from 'uuid';
import { BufferAndSlot } from './types';
import { promiseTimeout } from '../util/promiseTimeout';
import { Mutex } from 'async-mutex';

type AccountToLoad = {
publicKey: PublicKey;
Expand All @@ -23,7 +22,6 @@ export class BulkAccountLoader {
intervalId?: NodeJS.Timer;
// to handle clients spamming load
loadPromise?: Promise<void>;
loadPromiseLock: Mutex = new Mutex();
loadPromiseResolver: () => void;
lastTimeLoadingPromiseCleared = Date.now();
mostRecentSlot = 0;
Expand All @@ -38,10 +36,10 @@ export class BulkAccountLoader {
this.pollingFrequency = pollingFrequency;
}

public addAccount(
public async addAccount(
publicKey: PublicKey,
callback: (buffer: Buffer, slot: number) => void
): string {
): Promise<string> {
const existingSize = this.accountsToLoad.size;

const callbackId = uuidv4();
Expand All @@ -65,10 +63,8 @@ export class BulkAccountLoader {
this.startPolling();
}

// if a new account needs to be polled, remove the cached loadPromise in case client calls load immediately after
this.loadPromiseLock.runExclusive(() => {
this.loadPromise = undefined;
});
// resolve the current loadPromise in case client wants to call load
await this.loadPromise;

return callbackId;
}
Expand Down Expand Up @@ -105,43 +101,41 @@ export class BulkAccountLoader {
}

public async load(): Promise<void> {
await this.loadPromiseLock.runExclusive(async () => {
if (this.loadPromise) {
const now = Date.now();
if (now - this.lastTimeLoadingPromiseCleared > oneMinute) {
this.loadPromise = undefined;
} else {
return this.loadPromise;
}
}

this.loadPromise = new Promise((resolver) => {
this.loadPromiseResolver = resolver;
});
this.lastTimeLoadingPromiseCleared = Date.now();

try {
const chunks = this.chunks(
Array.from(this.accountsToLoad.values()),
GET_MULTIPLE_ACCOUNTS_CHUNK_SIZE
);

await Promise.all(
chunks.map((chunk) => {
return this.loadChunk(chunk);
})
);
} catch (e) {
console.error(`Error in bulkAccountLoader.load()`);
console.error(e);
for (const [_, callback] of this.errorCallbacks) {
callback(e);
}
} finally {
this.loadPromiseResolver();
if (this.loadPromise) {
const now = Date.now();
if (now - this.lastTimeLoadingPromiseCleared > oneMinute) {
this.loadPromise = undefined;
} else {
return this.loadPromise;
}
}

this.loadPromise = new Promise((resolver) => {
this.loadPromiseResolver = resolver;
});
this.lastTimeLoadingPromiseCleared = Date.now();

try {
const chunks = this.chunks(
Array.from(this.accountsToLoad.values()),
GET_MULTIPLE_ACCOUNTS_CHUNK_SIZE
);

await Promise.all(
chunks.map((chunk) => {
return this.loadChunk(chunk);
})
);
} catch (e) {
console.error(`Error in bulkAccountLoader.load()`);
console.error(e);
for (const [_, callback] of this.errorCallbacks) {
callback(e);
}
} finally {
this.loadPromiseResolver();
this.loadPromise = undefined;
}
}

async loadChunk(accountsToLoad: AccountToLoad[]): Promise<void> {
Expand Down
19 changes: 9 additions & 10 deletions sdk/src/accounts/pollingDriftClientAccountSubscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -194,20 +194,20 @@ export class PollingDriftClientAccountSubscriber

async addToAccountLoader(): Promise<void> {
for (const [_, accountToPoll] of this.accountsToPoll) {
this.addAccountToAccountLoader(accountToPoll);
await this.addAccountToAccountLoader(accountToPoll);
}

for (const [_, oracleToPoll] of this.oraclesToPoll) {
this.addOracleToAccountLoader(oracleToPoll);
await this.addOracleToAccountLoader(oracleToPoll);
}

this.errorCallbackId = this.accountLoader.addErrorCallbacks((error) => {
this.eventEmitter.emit('error', error);
});
}

addAccountToAccountLoader(accountToPoll: AccountToPoll): void {
accountToPoll.callbackId = this.accountLoader.addAccount(
async addAccountToAccountLoader(accountToPoll: AccountToPoll): Promise<void> {
accountToPoll.callbackId = await this.accountLoader.addAccount(
accountToPoll.publicKey,
(buffer: Buffer, slot: number) => {
if (!buffer) return;
Expand Down Expand Up @@ -236,13 +236,13 @@ export class PollingDriftClientAccountSubscriber
);
}

addOracleToAccountLoader(oracleToPoll: OraclesToPoll): void {
async addOracleToAccountLoader(oracleToPoll: OraclesToPoll): Promise<void> {
const oracleClient = this.oracleClientCache.get(
oracleToPoll.source,
this.program.provider.connection
);

oracleToPoll.callbackId = this.accountLoader.addAccount(
oracleToPoll.callbackId = await this.accountLoader.addAccount(
oracleToPoll.publicKey,
(buffer: Buffer, slot: number) => {
if (!buffer) return;
Expand Down Expand Up @@ -356,8 +356,7 @@ export class PollingDriftClientAccountSubscriber
await this.addSpotMarketAccountToPoll(marketIndex);

const accountToPoll = this.accountsToPoll.get(marketPublicKey.toString());
this.addAccountToAccountLoader(accountToPoll);

await this.addAccountToAccountLoader(accountToPoll);
return true;
}

Expand All @@ -374,7 +373,7 @@ export class PollingDriftClientAccountSubscriber
await this.addPerpMarketAccountToPoll(marketIndex);

const accountToPoll = this.accountsToPoll.get(marketPublicKey.toString());
this.addAccountToAccountLoader(accountToPoll);
await this.addAccountToAccountLoader(accountToPoll);

return true;
}
Expand All @@ -391,7 +390,7 @@ export class PollingDriftClientAccountSubscriber
const oracleToPoll = this.oraclesToPoll.get(
oracleInfo.publicKey.toString()
);
this.addOracleToAccountLoader(oracleToPoll);
await this.addOracleToAccountLoader(oracleToPoll);
return true;
}

Expand Down
6 changes: 3 additions & 3 deletions sdk/src/accounts/pollingOracleAccountSubscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ export class PollingOracleAccountSubscriber implements OracleAccountSubscriber {
return true;
}

this.addToAccountLoader();
await this.addToAccountLoader();

let subscriptionSucceeded = false;
let retries = 0;
Expand All @@ -59,12 +59,12 @@ export class PollingOracleAccountSubscriber implements OracleAccountSubscriber {
return subscriptionSucceeded;
}

addToAccountLoader(): void {
async addToAccountLoader(): Promise<void> {
if (this.callbackId) {
return;
}

this.callbackId = this.accountLoader.addAccount(
this.callbackId = await this.accountLoader.addAccount(
this.publicKey,
async (buffer, slot) => {
const oraclePriceData =
Expand Down
6 changes: 3 additions & 3 deletions sdk/src/accounts/pollingTokenAccountSubscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ export class PollingTokenAccountSubscriber implements TokenAccountSubscriber {
return true;
}

this.addToAccountLoader();
await this.addToAccountLoader();
let subscriptionSucceeded = false;
let retries = 0;
while (!subscriptionSucceeded && retries < 5) {
Expand All @@ -53,12 +53,12 @@ export class PollingTokenAccountSubscriber implements TokenAccountSubscriber {
return subscriptionSucceeded;
}

addToAccountLoader(): void {
async addToAccountLoader(): Promise<void> {
if (this.callbackId) {
return;
}

this.callbackId = this.accountLoader.addAccount(
this.callbackId = await this.accountLoader.addAccount(
this.publicKey,
(buffer, slot: number) => {
const tokenAccount = parseTokenAccount(buffer);
Expand Down
2 changes: 1 addition & 1 deletion sdk/src/accounts/pollingUserAccountSubscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ export class PollingUserAccountSubscriber implements UserAccountSubscriber {
});

for (const [_, accountToPoll] of this.accountsToPoll) {
accountToPoll.callbackId = this.accountLoader.addAccount(
accountToPoll.callbackId = await this.accountLoader.addAccount(
accountToPoll.publicKey,
(buffer, slot) => {
if (!buffer) {
Expand Down
2 changes: 1 addition & 1 deletion sdk/src/accounts/pollingUserStatsAccountSubscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ export class PollingUserStatsAccountSubscriber
});

for (const [_, accountToPoll] of this.accountsToPoll) {
accountToPoll.callbackId = this.accountLoader.addAccount(
accountToPoll.callbackId = await this.accountLoader.addAccount(
accountToPoll.publicKey,
(buffer, slot) => {
if (!buffer) {
Expand Down
4 changes: 2 additions & 2 deletions sdk/src/serum/serumSubscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ export class SerumSubscriber {
this.asksAddress = this.market.asksAddress;
this.asks = await this.market.loadAsks(this.connection);

this.asksCallbackId = this.accountLoader.addAccount(
this.asksCallbackId = await this.accountLoader.addAccount(
this.asksAddress,
(buffer, slot) => {
this.lastAsksSlot = slot;
Expand All @@ -57,7 +57,7 @@ export class SerumSubscriber {
this.bidsAddress = this.market.bidsAddress;
this.bids = await this.market.loadBids(this.connection);

this.bidsCallbackId = this.accountLoader.addAccount(
this.bidsCallbackId = await this.accountLoader.addAccount(
this.bidsAddress,
(buffer, slot) => {
this.lastBidsSlot = slot;
Expand Down
12 changes: 0 additions & 12 deletions sdk/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -771,13 +771,6 @@ astral-regex@^2.0.0:
resolved "https://registry.yarnpkg.com/astral-regex/-/astral-regex-2.0.0.tgz#483143c567aeed4785759c0865786dc77d7d2e31"
integrity sha512-Z7tMw1ytTXt5jqMcOP+OQteU1VuNK9Y02uuJtKQ1Sv69jXQKKg5cibLwGJow8yzZP+eAc18EmLGPal0bp36rvQ==

async-mutex@^0.4.0:
version "0.4.0"
resolved "https://registry.yarnpkg.com/async-mutex/-/async-mutex-0.4.0.tgz#ae8048cd4d04ace94347507504b3cf15e631c25f"
integrity sha512-eJFZ1YhRR8UN8eBLoNzcDPcy/jqjsg6I1AP+KvWQX80BqOSW1oJPJXDylPUEeMr2ZQvHgnQ//Lp6f3RQ1zI7HA==
dependencies:
tslib "^2.4.0"

asynckit@^0.4.0:
version "0.4.0"
resolved "https://registry.yarnpkg.com/asynckit/-/asynckit-0.4.0.tgz#c79ed97f7f34cb8f2ba1bc9790bcc366474b4b79"
Expand Down Expand Up @@ -2844,11 +2837,6 @@ tslib@^2.0.3:
resolved "https://registry.yarnpkg.com/tslib/-/tslib-2.3.1.tgz#e8a335add5ceae51aa261d32a490158ef042ef01"
integrity sha512-77EbyPPpMz+FRFRuAFlWMtmgUWGe9UOG2Z25NqCwiIjRhOf5iKGuzSe5P2w1laq+FkRy4p+PCuVkJSGkzTEKVw==

tslib@^2.4.0:
version "2.4.0"
resolved "https://registry.yarnpkg.com/tslib/-/tslib-2.4.0.tgz#7cecaa7f073ce680a05847aa77be941098f36dc3"
integrity sha512-d6xOpEDfsi2CZVlPQzGeux8XMwLT9hssAsaPYExaQMuYskwb+x1x7J371tWlbBdWHroy99KnVB6qIkUbs5X3UQ==

tsutils@^3.21.0:
version "3.21.0"
resolved "https://registry.yarnpkg.com/tsutils/-/tsutils-3.21.0.tgz#b48717d394cea6c1e096983eed58e9d61715b623"
Expand Down

0 comments on commit 41670fc

Please sign in to comment.