Skip to content

Commit

Permalink
feat(#543): Add transaction webhook capability to account-postgres-si…
Browse files Browse the repository at this point in the history
…nk (#544)
  • Loading branch information
ChewingGlass authored Jan 16, 2024
1 parent 9eb2470 commit a0d7bd9
Show file tree
Hide file tree
Showing 6 changed files with 290 additions and 8 deletions.
9 changes: 8 additions & 1 deletion packages/account-fetch-cache/src/accountFetchCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ export class MapAccountCache implements AccountCache {
cache = new Map<string, ParsedAccountBase<unknown> | null>() as AccountCache;

delete(key: string): void {
this.cache.delete(key)
this.cache.delete(key);
}
has(key: string): boolean {
return this.cache.has(key);
Expand All @@ -84,6 +84,7 @@ export class MapAccountCache implements AccountCache {
}

export class AccountFetchCache {
enableLogging: boolean;
connection: Connection;
chunkSize: number;
delay: number;
Expand Down Expand Up @@ -124,6 +125,7 @@ export class AccountFetchCache {
missingRefetchDelay = 10000,
extendConnection = false,
cache,
enableLogging = false,
}: {
connection: Connection;
chunkSize?: number;
Expand All @@ -133,7 +135,9 @@ export class AccountFetchCache {
/** Add functionatility to getAccountInfo that uses the cache */
extendConnection?: boolean;
cache?: AccountCache;
enableLogging?: boolean;
}) {
this.enableLogging = enableLogging;
this.genericCache = cache || new MapAccountCache();

this.id = ++id;
Expand Down Expand Up @@ -262,6 +266,9 @@ export class AccountFetchCache {
async fetchBatch() {
const currentBatch = this.currentBatch;
this.currentBatch = new Set(); // Erase current batch from state, so we can fetch multiple at a time
if (this.enableLogging) {
console.log(`Fetching batch of ${currentBatch.size} accounts`);
}
try {
const keys = Array.from(currentBatch);
const { array } = await getMultipleAccounts(
Expand Down
1 change: 1 addition & 0 deletions packages/account-postgres-sink-service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
"aws-sdk": "^2.1344.0",
"axios": "^1.3.6",
"axios-retry": "^3.8.0",
"bloom-filters": "^3.0.1",
"bn.js": "^5.2.0",
"bs58": "^4.0.1",
"deep-equal": "^2.2.2",
Expand Down
2 changes: 2 additions & 0 deletions packages/account-postgres-sink-service/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,5 @@ export const PROGRAM_ACCOUNT_CONFIGS =
export const HELIUS_AUTH_SECRET = process.env.HELIUS_AUTH_SECRET;

export const RUN_JOBS_AT_STARTUP = process.env.RUN_JOBS_AT_STARTUP === 'true';

export const FETCH_DELAY_SECONDS = Number(process.env.FETCH_DELAY_SECONDS || "10")
123 changes: 118 additions & 5 deletions packages/account-postgres-sink-service/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,19 @@ import fastifyCron from "fastify-cron";
import cors from "@fastify/cors";
import fs from "fs";
import { StatusCodes, ReasonPhrases } from "http-status-codes";
import { PublicKey } from "@solana/web3.js";
import {
Connection,
PublicKey,
SystemProgram,
TransactionMessage,
TransactionResponse,
} from "@solana/web3.js";
import {
FETCH_DELAY_SECONDS,
HELIUS_AUTH_SECRET,
PROGRAM_ACCOUNT_CONFIGS,
RUN_JOBS_AT_STARTUP,
SOLANA_URL,
} from "./env";
import database from "./utils/database";
import { defineAllIdlModels } from "./utils/defineIdlModels";
Expand All @@ -19,6 +27,8 @@ import { metrics } from "./plugins/metrics";
import { IConfig, IInitedPlugin } from "./types";
import { EventEmitter } from "events";
import { initPlugins } from "./plugins";
import { AccountFetchCache } from "@helium/account-fetch-cache";
const { BloomFilter } = require("bloom-filters");

if (!HELIUS_AUTH_SECRET) {
throw new Error("Helius auth secret not available");
Expand All @@ -28,13 +38,14 @@ if (!HELIUS_AUTH_SECRET) {
const { configs, indexConfigs } = (() => {
const dbConfigs: null | {
configs: IConfig[];
indexConfigs?: string[]
indexConfigs?: string[];
} = JSON.parse(fs.readFileSync(PROGRAM_ACCOUNT_CONFIGS, "utf8"));

return {
configs: dbConfigs && dbConfigs.configs ? dbConfigs.configs : [],
indexConfigs: dbConfigs && dbConfigs.indexConfigs ? dbConfigs.indexConfigs : [],
}
indexConfigs:
dbConfigs && dbConfigs.indexConfigs ? dbConfigs.indexConfigs : [],
};
})();

const customJobs = configs
Expand Down Expand Up @@ -160,6 +171,108 @@ if (!HELIUS_AUTH_SECRET) {
return acc;
}, {} as Record<string, Record<string, IInitedPlugin[]>>);

// Assume 10 million accounts we might not want to watch (token accounts, etc)
const nonWatchedAccountsFilter = BloomFilter.create(10000000, 0.05);
const cache = new AccountFetchCache({
connection: new Connection(SOLANA_URL),
commitment: "confirmed",
extendConnection: false,
enableLogging: true,
// One fetch every x second limit
delay: FETCH_DELAY_SECONDS * 1000,
});
server.post<{ Body: any[] }>("/transaction-webhook", async (req, res) => {
if (req.headers.authorization != HELIUS_AUTH_SECRET) {
res.code(StatusCodes.FORBIDDEN).send({
message: "Invalid authorization",
});
return;
}
if (refreshing) {
res.code(StatusCodes.SERVICE_UNAVAILABLE).send({
message: "Refresh is happening, cannot create transactions",
});
return;
}

try {
const transactions = req.body as TransactionResponse[];
const writableAccountKeys = transactions
.flatMap((tx) =>
tx.transaction.message.accountKeys.slice(
0,
tx.transaction.message.accountKeys.length -
(tx.transaction.message.header.numReadonlySignedAccounts +
tx.transaction.message.header.numReadonlyUnsignedAccounts)
)
)
.map((k) => new PublicKey(k));
const accounts = await Promise.all(
writableAccountKeys.map((key) => cache.search(key))
);

if (configs) {
let index = 0;
for (const account of accounts) {
const pubkey = writableAccountKeys[index];
index++;

// Account not found, delete it from any and all tables
if (!account) {
const tables = configs.flatMap((config) =>
config.accounts.map((acc) => acc.table)
);
const transaction = await database.transaction();
for (const table of tables) {
database.query(
`
DELETE FROM ${table} WHERE address = ${database.escape(
pubkey.toBase58()
)}
`,
{
transaction,
}
);
}
continue;
}

// If the owner isn't of a program we're watching, break
const owner = account.account.owner.toBase58();
if (nonWatchedAccountsFilter.has(owner)) {
continue;
}

const config = configs.find((x) => x.programId == owner);
if (!config) {
if (owner) nonWatchedAccountsFilter.add(owner);
continue;
}

try {
await handleAccountWebhook({
fastify: server,
programId: new PublicKey(config.programId),
accounts: config.accounts,
account: {
pubkey: account.pubkey.toBase58(),
data: [account.account.data, undefined],
},
pluginsByAccountType: pluginsByAccountTypeByProgram[owner] || {},
});
} catch (err) {
throw err;
}
}
}
res.code(StatusCodes.OK).send(ReasonPhrases.OK);
} catch (err) {
res.code(StatusCodes.INTERNAL_SERVER_ERROR).send(err);
console.error(err);
}
});

server.post("/account-webhook", async (req, res) => {
if (req.headers.authorization != HELIUS_AUTH_SECRET) {
res.code(StatusCodes.FORBIDDEN).send({
Expand All @@ -171,7 +284,7 @@ if (!HELIUS_AUTH_SECRET) {
res.code(StatusCodes.SERVICE_UNAVAILABLE).send({
message: "Refresh is happening, cannot create transactions",
});
return
return;
}

try {
Expand Down
85 changes: 84 additions & 1 deletion packages/account-postgres-sink-service/yarn.deploy.lock
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ __metadata:
aws-sdk: ^2.1344.0
axios: ^1.3.6
axios-retry: ^3.8.0
bloom-filters: ^3.0.1
bn.js: ^5.2.0
bs58: ^4.0.1
deep-equal: ^2.2.2
Expand Down Expand Up @@ -801,6 +802,13 @@ __metadata:
languageName: node
linkType: hard

"base64-arraybuffer@npm:^1.0.2":
version: 1.0.2
resolution: "base64-arraybuffer@npm:1.0.2"
checksum: 15e6400d2d028bf18be4ed97702b11418f8f8779fb8c743251c863b726638d52f69571d4cc1843224da7838abef0949c670bde46936663c45ad078e89fee5c62
languageName: node
linkType: hard

"base64-js@npm:^1.0.2, base64-js@npm:^1.3.1, base64-js@npm:^1.5.1":
version: 1.5.1
resolution: "base64-js@npm:1.5.1"
Expand Down Expand Up @@ -848,6 +856,23 @@ __metadata:
languageName: node
linkType: hard

"bloom-filters@npm:^3.0.1":
version: 3.0.1
resolution: "bloom-filters@npm:3.0.1"
dependencies:
base64-arraybuffer: ^1.0.2
is-buffer: ^2.0.5
lodash: ^4.17.15
lodash.eq: ^4.0.0
lodash.indexof: ^4.0.5
long: ^5.2.0
reflect-metadata: ^0.1.13
seedrandom: ^3.0.5
xxhashjs: ^0.2.2
checksum: 920d72607780dcbee4fb5e90a26f725b8626c637a841439a4e06ec9ce724248d2facfd5cb8610a38245e3f7c54002c89fc457bb4b9cc3713abc659bfe585d266
languageName: node
linkType: hard

"bn.js@npm:^5.1.2, bn.js@npm:^5.2.0, bn.js@npm:^5.2.1":
version: 5.2.1
resolution: "bn.js@npm:5.2.1"
Expand Down Expand Up @@ -1161,6 +1186,13 @@ __metadata:
languageName: node
linkType: hard

"cuint@npm:^0.2.2":
version: 0.2.2
resolution: "cuint@npm:0.2.2"
checksum: b8127a93a7f16ce120ffcb22108014327c9808b258ee20e7dbb4c6740d7cb0f0c12d18a054eb716b0f2470090666abaae8a082d3cd5ef0e94fa447dd155842c4
languageName: node
linkType: hard

"debug@npm:4, debug@npm:^4.0.0, debug@npm:^4.3.3, debug@npm:^4.3.4":
version: 4.3.4
resolution: "debug@npm:4.3.4"
Expand Down Expand Up @@ -2028,6 +2060,13 @@ __metadata:
languageName: node
linkType: hard

"is-buffer@npm:^2.0.5":
version: 2.0.5
resolution: "is-buffer@npm:2.0.5"
checksum: 764c9ad8b523a9f5a32af29bdf772b08eb48c04d2ad0a7240916ac2688c983bf5f8504bf25b35e66240edeb9d9085461f9b5dae1f3d2861c6b06a65fe983de42
languageName: node
linkType: hard

"is-callable@npm:^1.1.3":
version: 1.2.7
resolution: "is-callable@npm:1.2.7"
Expand Down Expand Up @@ -2303,13 +2342,34 @@ __metadata:
languageName: node
linkType: hard

"lodash@npm:^4.17.21":
"lodash.eq@npm:^4.0.0":
version: 4.0.0
resolution: "lodash.eq@npm:4.0.0"
checksum: c46d45b8da669151cdf4fcb996056c8847c1c32723fa3c60ef87433ee41b01b8e1ee615c4c433b6f27391fa9c1793ad6d892dd30bac4c9c3508be3fa5f3d5def
languageName: node
linkType: hard

"lodash.indexof@npm:^4.0.5":
version: 4.0.5
resolution: "lodash.indexof@npm:4.0.5"
checksum: 4f3ead786c2941f18f1ef250459bcc6182ac00f04e76a2b94d3933349c879cd4b79444d59b0de835ad93d7b01626c1bc8ee55d7776879f099c0617d896a29f28
languageName: node
linkType: hard

"lodash@npm:^4.17.15, lodash@npm:^4.17.21":
version: 4.17.21
resolution: "lodash@npm:4.17.21"
checksum: eb835a2e51d381e561e508ce932ea50a8e5a68f4ebdd771ea240d3048244a8d13658acbd502cd4829768c56f2e16bdd4340b9ea141297d472517b83868e677f7
languageName: node
linkType: hard

"long@npm:^5.2.0":
version: 5.2.3
resolution: "long@npm:5.2.3"
checksum: 885ede7c3de4facccbd2cacc6168bae3a02c3e836159ea4252c87b6e34d40af819824b2d4edce330bfb5c4d6e8ce3ec5864bdcf9473fa1f53a4f8225860e5897
languageName: node
linkType: hard

"lower-case@npm:^2.0.2":
version: 2.0.2
resolution: "lower-case@npm:2.0.2"
Expand Down Expand Up @@ -3119,6 +3179,13 @@ __metadata:
languageName: node
linkType: hard

"reflect-metadata@npm:^0.1.13":
version: 0.1.14
resolution: "reflect-metadata@npm:0.1.14"
checksum: 155ad339319cec3c2d9d84719f730f8b6a6cd2a074733ec29dbae6c89d48a2914c7d07a2350212594f3aae160fa4da4f903e6512f27ceaf968443a7c692bcad0
languageName: node
linkType: hard

"regenerator-runtime@npm:^0.14.0":
version: 0.14.0
resolution: "regenerator-runtime@npm:0.14.0"
Expand Down Expand Up @@ -3304,6 +3371,13 @@ __metadata:
languageName: node
linkType: hard

"seedrandom@npm:^3.0.5":
version: 3.0.5
resolution: "seedrandom@npm:3.0.5"
checksum: 728b56bc3bc1b9ddeabd381e449b51cb31bdc0aa86e27fcd0190cea8c44613d5bcb2f6bb63ed79f78180cbe791c20b8ec31a9627f7b7fc7f476fd2bdb7e2da9f
languageName: node
linkType: hard

"semver@npm:^7.3.4, semver@npm:^7.3.5, semver@npm:^7.5.0, semver@npm:^7.5.1":
version: 7.5.4
resolution: "semver@npm:7.5.4"
Expand Down Expand Up @@ -4086,6 +4160,15 @@ __metadata:
languageName: node
linkType: hard

"xxhashjs@npm:^0.2.2":
version: 0.2.2
resolution: "xxhashjs@npm:0.2.2"
dependencies:
cuint: ^0.2.2
checksum: cf6baf05bafe5651dbf108008bafdb1ebe972f65228633f00b56c49d7a1e614a821fe3345c4eb27462994c7c954d982eae05871be6a48146f30803dd87f3c3b6
languageName: node
linkType: hard

"y18n@npm:^5.0.5":
version: 5.0.8
resolution: "y18n@npm:5.0.8"
Expand Down
Loading

0 comments on commit a0d7bd9

Please sign in to comment.