Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ensure events are inserted into the raw event request table #1925

Merged
merged 8 commits into from
Jun 3, 2024
28 changes: 28 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,31 @@
## [7.10.0](https://github.com/hirosystems/stacks-blockchain-api/compare/v7.9.1...v7.10.0) (2024-04-15)


### Features

* add nakamoto block time to v2 endpoints ([#1921](https://github.com/hirosystems/stacks-blockchain-api/issues/1921)) ([ae6bbe8](https://github.com/hirosystems/stacks-blockchain-api/commit/ae6bbe80b66520b7c7c7bc42b29716fb60146229))
* add signer-keys from pox4 events ([#1857](https://github.com/hirosystems/stacks-blockchain-api/issues/1857)) ([c17ad23](https://github.com/hirosystems/stacks-blockchain-api/commit/c17ad23d3f451d7c072ff94f4cb1ae7a2f78705d))
* ingest signer_bitvec ([#1900](https://github.com/hirosystems/stacks-blockchain-api/issues/1900)) ([aa1750f](https://github.com/hirosystems/stacks-blockchain-api/commit/aa1750f7ebbdfe4c2a84583f98c3ff465236f8aa))
* nakamoto block timestamps ([#1886](https://github.com/hirosystems/stacks-blockchain-api/issues/1886)) ([f547832](https://github.com/hirosystems/stacks-blockchain-api/commit/f5478329d7267a65b5f3c557b197feadff298afb))
* pox 4 revoke events and signer-key support ([#1829](https://github.com/hirosystems/stacks-blockchain-api/issues/1829)) ([5e5650a](https://github.com/hirosystems/stacks-blockchain-api/commit/5e5650a29bcc5950f061ed0a84961075c855a863)), closes [#1849](https://github.com/hirosystems/stacks-blockchain-api/issues/1849)
* pox stacker & signer cycle details ([#1873](https://github.com/hirosystems/stacks-blockchain-api/issues/1873)) ([d2c2805](https://github.com/hirosystems/stacks-blockchain-api/commit/d2c28059cfca99cd9b9a35cb8c96074a60fedd35))
* rosetta pox4 stacking support ([#1928](https://github.com/hirosystems/stacks-blockchain-api/issues/1928)) ([2ba36f9](https://github.com/hirosystems/stacks-blockchain-api/commit/2ba36f9846f3d85de093376ad68ee7660e697846)), closes [#1929](https://github.com/hirosystems/stacks-blockchain-api/issues/1929)


### Bug Fixes

* add nakamoto testnet to openapi docs ([#1910](https://github.com/hirosystems/stacks-blockchain-api/issues/1910)) ([01fb971](https://github.com/hirosystems/stacks-blockchain-api/commit/01fb9713e86b1a289dbca016ad7b5c366aaef74c))
* batch drop mempool transactions ([#1920](https://github.com/hirosystems/stacks-blockchain-api/issues/1920)) ([a7ee96d](https://github.com/hirosystems/stacks-blockchain-api/commit/a7ee96de55c8a61c1e2d6bf9ef7c3b220fd82803))
* cycle signer filter ([#1916](https://github.com/hirosystems/stacks-blockchain-api/issues/1916)) ([dc7d600](https://github.com/hirosystems/stacks-blockchain-api/commit/dc7d6009556b833ff3994b35c96ba4456ca7e81f))
* cycles response for empty cycle info ([#1914](https://github.com/hirosystems/stacks-blockchain-api/issues/1914)) ([a7a4558](https://github.com/hirosystems/stacks-blockchain-api/commit/a7a4558105f669260cc4948b28213196c4c62079))
* delegate-stx burn-op parsing and test fix ([#1939](https://github.com/hirosystems/stacks-blockchain-api/issues/1939)) ([73ec0db](https://github.com/hirosystems/stacks-blockchain-api/commit/73ec0db76e8004370e6c9ccf02fd520449d6e9ba))
* event-replay readiness for nakamoto & fix for [#1879](https://github.com/hirosystems/stacks-blockchain-api/issues/1879) ([#1903](https://github.com/hirosystems/stacks-blockchain-api/issues/1903)) ([1572e73](https://github.com/hirosystems/stacks-blockchain-api/commit/1572e737337680510850b23662e1f36c57ebc198))
* log message when sql migration is performed ([#1942](https://github.com/hirosystems/stacks-blockchain-api/issues/1942)) ([49a4d25](https://github.com/hirosystems/stacks-blockchain-api/commit/49a4d25f0a251d28aef81c588f04d329825579e6))
* other empty result responses ([#1915](https://github.com/hirosystems/stacks-blockchain-api/issues/1915)) ([3cd2c64](https://github.com/hirosystems/stacks-blockchain-api/commit/3cd2c64674e7abe0b4ba3ed7c1890ea63c1b87b2))
* pox4 stack-stx burn-op handling ([#1936](https://github.com/hirosystems/stacks-blockchain-api/issues/1936)) ([9e9a464](https://github.com/hirosystems/stacks-blockchain-api/commit/9e9a464488cb6963c93e88d78e1a7ed67ae65ca2))
* remove signer columns from tenure-change transactions ([#1845](https://github.com/hirosystems/stacks-blockchain-api/issues/1845)) ([8ec726b](https://github.com/hirosystems/stacks-blockchain-api/commit/8ec726b05531abb7787d035d21f7afc276574b9c))
* sql transactional consistency bug with fetching chaintip in various areas ([#1853](https://github.com/hirosystems/stacks-blockchain-api/issues/1853)) ([ada8536](https://github.com/hirosystems/stacks-blockchain-api/commit/ada85364b5465b59e1dba0e82815bd8b8057f23f))

## [7.9.1](https://github.com/hirosystems/stacks-blockchain-api/compare/v7.9.0...v7.9.1) (2024-04-05)


Expand Down
40 changes: 19 additions & 21 deletions src/datastore/pg-write-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -176,27 +176,25 @@ export class PgWriteStore extends PgStore {
return store;
}

async storeRawEventRequest(eventPath: string, payload: PgJsonb): Promise<void> {
// To avoid depending on the DB more than once and to allow the query transaction to settle,
// we'll take the complete insert result and move that to the output TSV file instead of taking
// only the `id` and performing a `COPY` of that row later.
const insertResult = await this.sql<
{
id: string;
receive_timestamp: string;
event_path: string;
payload: string;
}[]
>`INSERT INTO event_observer_requests(
event_path, payload
) values(${eventPath}, ${payload})
RETURNING id, receive_timestamp::text, event_path, payload::text
`;
if (insertResult.length !== 1) {
throw new Error(
`Unexpected row count ${insertResult.length} when storing event_observer_requests entry`
);
}
async storeRawEventRequest(eventPath: string, payload: any): Promise<void> {
await this.sqlWriteTransaction(async sql => {
const insertResult = await sql<
{
id: string;
receive_timestamp: string;
event_path: string;
}[]
>`INSERT INTO event_observer_requests(
event_path, payload
) values(${eventPath}, ${payload})
RETURNING id, receive_timestamp::text, event_path
`;
if (insertResult.length !== 1) {
throw new Error(
`Unexpected row count ${insertResult.length} when storing event_observer_requests entry`
);
}
});
}

async update(data: DataStoreBlockUpdateData): Promise<void> {
Expand Down
49 changes: 21 additions & 28 deletions src/event-stream/event-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -926,7 +926,7 @@ export async function startEventServer(opts: {

const app = express();

const handleRawEventRequest = asyncHandler(async req => {
const handleRawEventRequest = async (req: express.Request) => {
await messageHandler.handleRawEventRequest(req.path, req.body, db);

if (logger.level === 'debug') {
Expand All @@ -938,10 +938,9 @@ export async function startEventServer(opts: {
}
logger.debug(`${eventPath} ${payload}`, { component: 'stacks-node-event' });
}
});
};

app.use(loggerMiddleware);

app.use(bodyParser.json({ type: 'application/json', limit: '500MB' }));

const ibdHeight = getIbdBlockHeight();
Expand All @@ -952,7 +951,7 @@ export async function startEventServer(opts: {
if (chainTip.block_height > ibdHeight) {
next();
} else {
handleRawEventRequest(req, res, next);
await handleRawEventRequest(req);
res.status(200).send(`IBD`);
}
} catch (error) {
Expand All @@ -971,101 +970,95 @@ export async function startEventServer(opts: {

app.post(
'/new_block',
asyncHandler(async (req, res, next) => {
asyncHandler(async (req, res) => {
try {
const blockMessage: CoreNodeBlockMessage = req.body;
await messageHandler.handleBlockMessage(opts.chainId, blockMessage, db);
if (blockMessage.block_height === 1) {
await handleBnsImport(db);
}
await handleRawEventRequest(req);
res.status(200).json({ result: 'ok' });
next();
} catch (error) {
logger.error(error, 'error processing core-node /new_block');
res.status(500).json({ error: error });
}
}),
handleRawEventRequest
})
zone117x marked this conversation as resolved.
Show resolved Hide resolved
);

app.post(
'/new_burn_block',
asyncHandler(async (req, res, next) => {
asyncHandler(async (req, res) => {
try {
const msg: CoreNodeBurnBlockMessage = req.body;
await messageHandler.handleBurnBlock(msg, db);
await handleRawEventRequest(req);
res.status(200).json({ result: 'ok' });
next();
} catch (error) {
logger.error(error, 'error processing core-node /new_burn_block');
res.status(500).json({ error: error });
}
}),
handleRawEventRequest
})
);

app.post(
'/new_mempool_tx',
asyncHandler(async (req, res, next) => {
asyncHandler(async (req, res) => {
try {
const rawTxs: string[] = req.body;
await messageHandler.handleMempoolTxs(rawTxs, db);
await handleRawEventRequest(req);
res.status(200).json({ result: 'ok' });
next();
} catch (error) {
logger.error(error, 'error processing core-node /new_mempool_tx');
res.status(500).json({ error: error });
}
}),
handleRawEventRequest
})
);

app.post(
'/drop_mempool_tx',
asyncHandler(async (req, res, next) => {
asyncHandler(async (req, res) => {
try {
const msg: CoreNodeDropMempoolTxMessage = req.body;
await messageHandler.handleDroppedMempoolTxs(msg, db);
await handleRawEventRequest(req);
res.status(200).json({ result: 'ok' });
next();
} catch (error) {
logger.error(error, 'error processing core-node /drop_mempool_tx');
res.status(500).json({ error: error });
}
}),
handleRawEventRequest
})
);

app.post(
'/attachments/new',
asyncHandler(async (req, res, next) => {
asyncHandler(async (req, res) => {
try {
const msg: CoreNodeAttachmentMessage[] = req.body;
await messageHandler.handleNewAttachment(msg, db);
await handleRawEventRequest(req);
res.status(200).json({ result: 'ok' });
next();
} catch (error) {
logger.error(error, 'error processing core-node /attachments/new');
res.status(500).json({ error: error });
}
}),
handleRawEventRequest
})
);

app.post(
'/new_microblocks',
asyncHandler(async (req, res, next) => {
asyncHandler(async (req, res) => {
try {
const msg: CoreNodeMicroblockMessage = req.body;
await messageHandler.handleMicroblockMessage(opts.chainId, msg, db);
await handleRawEventRequest(req);
res.status(200).json({ result: 'ok' });
next();
} catch (error) {
logger.error(error, 'error processing core-node /new_microblocks');
res.status(500).json({ error: error });
}
}),
handleRawEventRequest
})
);

app.post('*', (req, res, next) => {
Expand Down
65 changes: 65 additions & 0 deletions src/tests-event-replay/raw-event-request-tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,69 @@ describe('Events table', () => {
}
);
});

test('Large event requests are stored correctly', async () => {
const getRawEventCount = async () => {
const [row] = await client<{ count: string }[]>`SELECT count(*) from event_observer_requests`;
return Number(row.count);
};

await useWithCleanup(
async () => {
const eventServer = await startEventServer({
datastore: db,
chainId: ChainID.Mainnet,
serverHost: '127.0.0.1',
serverPort: 0,
});
return [eventServer, eventServer.closeAsync] as const;
},
async eventServer => {
// split the tsv file into lines, split each line by tab, find the first line that has a cell value of `/new_block`
const sampleTsv = fs
.readFileSync('src/tests-event-replay/tsv/mainnet-block0.tsv', 'utf8')
.split('\n')
.map(line => line.split('\t'))
.find(line => line[2] === '/new_block');
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const sampleNewBlock = JSON.parse(sampleTsv![3]);
console.log(sampleTsv);
// Create a huge JSON object, 10000 nodes, 20 layers deep, some nodes containing 4 megabytes of data
function generateNestedObject(depth: number, nodesPerLevel: number, currentDepth = 1): any {
if (currentDepth > depth) {
// Return a leaf object instead of trying to link back to the top-level node
return { info: `Leaf at depth ${currentDepth}` };
}
// Create a new object for each call to ensure uniqueness
const currentNode: any = {};
for (let i = 0; i < nodesPerLevel; i++) {
currentNode[`node_${currentDepth}_${i}`] =
currentDepth === depth
? { info: `Simulated large node leaf at ${currentDepth}_${i}` }
: generateNestedObject(depth, nodesPerLevel, currentDepth + 1);
}
return currentNode;
}
let hugeJsonObject = generateNestedObject(10, 3);
hugeJsonObject = Object.assign(hugeJsonObject, sampleNewBlock);
hugeJsonObject['very_large_value'] = 'x'.repeat(100 * 1024 * 1024); // 100 megabytes
const rawEvent = {
event_path: '/new_block',
payload: JSON.stringify(hugeJsonObject),
};
const rawEventRequestCountBefore = await getRawEventCount();
const response = await httpPostRequest({
host: '127.0.0.1',
port: eventServer.serverAddress.port,
path: rawEvent.event_path,
headers: { 'Content-Type': 'application/json' },
body: Buffer.from(rawEvent.payload, 'utf8'),
throwOnNotOK: false,
});
expect(response.statusCode).toBe(200);
const rawEventRequestCountAfter = await getRawEventCount();
expect(rawEventRequestCountAfter).toEqual(rawEventRequestCountBefore + 1);
}
);
});
});
Loading