Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: update mempool garbage collection logic for 3.0 #2117

Merged
merged 3 commits into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,6 @@ PG_APPLICATION_NAME=stacks-blockchain-api
# (with both Event Server and API endpoints).
# STACKS_API_MODE=

# Stacks nodes automatically perform garbage-collection by dropping transactions from the mempool if they
# are pending for more than 256 blocks. This variable controls the block age threshold at which the API will do
# the same.
# STACKS_MEMPOOL_TX_GARBAGE_COLLECTION_THRESHOLD=256

# To avoid running unnecessary mempool stats during transaction influx, we use a debounce mechanism for the process.
# This variable controls the duration it waits until there are no further mempool updates
# MEMPOOL_STATS_DEBOUNCE_INTERVAL=1000
Expand Down
28 changes: 21 additions & 7 deletions src/datastore/pg-write-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2768,21 +2768,35 @@ export class PgWriteStore extends PgStore {
}

/**
* Deletes mempool txs older than `STACKS_MEMPOOL_TX_GARBAGE_COLLECTION_THRESHOLD` blocks (default 256).
* Deletes mempool txs that should be dropped by block age or time age depending on which Stacks
* epoch we're on.
* @param sql - DB client
* @returns List of deleted `tx_id`s
*/
async deleteGarbageCollectedMempoolTxs(sql: PgSqlClient): Promise<{ deletedTxs: string[] }> {
const blockThreshold = parseInt(
process.env['STACKS_MEMPOOL_TX_GARBAGE_COLLECTION_THRESHOLD'] ?? '256'
);
// TODO: Use DELETE instead of UPDATE once we implement a non-archival API replay mode.
// Is 3.0 active? Check if the latest block was signed by signers.
const nakamotoActive =
(
await sql<{ index_block_hash: string }[]>`
SELECT b.index_block_hash
FROM blocks AS b
INNER JOIN chain_tip AS c ON c.index_block_hash = b.index_block_hash
WHERE b.signer_bitvec IS NOT NULL
LIMIT 1
`
).count > 0;
// If 3.0 is active, drop transactions older than 2560 minutes.
// If 2.5 or earlier is active, drop transactions older than 256 blocks.
const deletedTxResults = await sql<{ tx_id: string }[]>`
WITH pruned AS (
UPDATE mempool_txs
SET pruned = TRUE, status = ${DbTxStatus.DroppedApiGarbageCollect}
WHERE pruned = FALSE
AND receipt_block_height <= (SELECT block_height - ${blockThreshold} FROM chain_tip)
WHERE pruned = FALSE AND
${
nakamotoActive
? sql`receipt_time <= EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - INTERVAL '2560 minutes'))::int`
: sql`receipt_block_height <= (SELECT block_height - 256 FROM chain_tip)`
}
RETURNING tx_id
),
count_update AS (
Expand Down
99 changes: 66 additions & 33 deletions tests/api/mempool.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,41 +42,74 @@ describe('mempool tests', () => {
await migrate('down');
});

test('garbage collection', async () => {
const garbageThresholdOrig = process.env.STACKS_MEMPOOL_TX_GARBAGE_COLLECTION_THRESHOLD;
process.env.STACKS_MEMPOOL_TX_GARBAGE_COLLECTION_THRESHOLD = '2';
try {
// Insert 5 blocks with 1 mempool tx each.
for (let block_height = 1; block_height <= 5; block_height++) {
const block = new TestBlockBuilder({
block_height: block_height,
index_block_hash: `0x0${block_height}`,
parent_index_block_hash: `0x0${block_height - 1}`,
})
.addTx({ tx_id: `0x111${block_height}`, nonce: block_height })
.build();
await db.update(block);
const mempoolTx = testMempoolTx({ tx_id: `0x0${block_height}` });
await db.updateMempoolTxs({ mempoolTxs: [mempoolTx] });
}
test('garbage collection pre 3.0', async () => {
const hexFromHeight = (height: number) => {
const hex = height.toString(16);
return hex.length % 2 == 1 ? `0${hex}` : hex;
};
// Insert more than 256 blocks with 1 mempool tx each.
for (let block_height = 1; block_height <= 259; block_height++) {
const block = new TestBlockBuilder({
block_height: block_height,
index_block_hash: `0x${hexFromHeight(block_height)}`,
parent_index_block_hash: `0x${hexFromHeight(block_height - 1)}`,
})
.addTx({ tx_id: `0x11${hexFromHeight(block_height)}`, nonce: block_height })
.build();
await db.update(block);
const mempoolTx = testMempoolTx({ tx_id: `0x${hexFromHeight(block_height)}` });
await db.updateMempoolTxs({ mempoolTxs: [mempoolTx] });
}
await db.update(
new TestBlockBuilder({
block_height: 260,
index_block_hash: `0xff`,
parent_index_block_hash: `0x0103`,
}).build()
);

// Make sure we only have mempool txs for block_height >= 3
const mempoolTxResult = await db.getMempoolTxList({
limit: 10,
offset: 0,
includeUnanchored: false,
});
const mempoolTxs = mempoolTxResult.results;
expect(mempoolTxs.length).toEqual(3);
const txIds = mempoolTxs.map(e => e.tx_id).sort();
expect(txIds).toEqual(['0x03', '0x04', '0x05']);
} finally {
if (typeof garbageThresholdOrig === 'undefined') {
delete process.env.STACKS_MEMPOOL_TX_GARBAGE_COLLECTION_THRESHOLD;
} else {
process.env.STACKS_MEMPOOL_TX_GARBAGE_COLLECTION_THRESHOLD = garbageThresholdOrig;
}
// Make sure we only have mempool txs for block_height >= 3
const mempoolTxResult = await db.getMempoolTxList({
limit: 10,
offset: 0,
includeUnanchored: false,
});
expect(mempoolTxResult.total).toEqual(257);
});

test('garbage collection post 3.0', async () => {
// Insert 3 txs spaced out so garbage collection kicks in.
for (let block_height = 1; block_height <= 3; block_height++) {
const block = new TestBlockBuilder({
block_height: block_height,
index_block_hash: `0x0${block_height}`,
parent_index_block_hash: `0x0${block_height - 1}`,
signer_bitvec: '1111',
})
.addTx({ tx_id: `0x111${block_height}`, nonce: block_height })
zone117x marked this conversation as resolved.
Show resolved Hide resolved
.build();
await db.update(block);
const mempoolTx = testMempoolTx({ tx_id: `0x0${block_height}`, receipt_time: 1 });
await db.updateMempoolTxs({ mempoolTxs: [mempoolTx] });
}

const mempoolTx = testMempoolTx({ tx_id: `0x0fff` });
await db.updateMempoolTxs({ mempoolTxs: [mempoolTx] });
await db.update(
new TestBlockBuilder({
block_height: 4,
index_block_hash: `0xff`,
parent_index_block_hash: `0x03`,
}).build()
);

// Make sure we only have the latest mempool tx
const mempoolTxResult = await db.getMempoolTxList({
limit: 10,
offset: 0,
includeUnanchored: false,
});
expect(mempoolTxResult.total).toEqual(1);
});

test('mempool stats', async () => {
Expand Down
18 changes: 1 addition & 17 deletions tests/api/socket-io.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -230,19 +230,15 @@ describe('socket-io', () => {
});

test('socket-io > mempool txs', async () => {
process.env.STACKS_MEMPOOL_TX_GARBAGE_COLLECTION_THRESHOLD = '0';

const address = apiServer.address;
const socket = io(`http://${address}`, {
reconnection: false,
query: { subscriptions: 'mempool' },
});
const txWaiters: Waiter<MempoolTransaction | Transaction>[] = [waiter(), waiter()];
const txWaiters: Waiter<MempoolTransaction>[] = [waiter()];
socket.on('mempool', tx => {
if (tx.tx_status === 'pending') {
txWaiters[0].finish(tx);
} else {
txWaiters[1].finish(tx);
}
});

Expand All @@ -258,21 +254,9 @@ describe('socket-io', () => {
await db.updateMempoolTxs({ mempoolTxs: [mempoolTx] });
const pendingResult = await txWaiters[0];

const block2 = new TestBlockBuilder({
block_height: 2,
index_block_hash: '0x02',
parent_index_block_hash: '0x01',
})
.addTx({ tx_id: '0x0201' })
.build();
await db.update(block2);
const droppedResult = await txWaiters[1];

try {
expect(pendingResult.tx_id).toEqual('0x01');
expect(pendingResult.tx_status).toEqual('pending');
expect(droppedResult.tx_id).toEqual('0x01');
expect(droppedResult.tx_status).toEqual('dropped_stale_garbage_collect');
} finally {
socket.emit('unsubscribe', 'mempool');
socket.close();
Expand Down
3 changes: 2 additions & 1 deletion tests/utils/test-builders.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ export interface TestBlockArgs {
parent_microblock_hash?: string;
parent_microblock_sequence?: number;
canonical?: boolean;
signer_bitvec?: string;
}

/**
Expand Down Expand Up @@ -126,7 +127,7 @@ function testBlock(args?: TestBlockArgs): DbBlock {
execution_cost_write_count: 0,
execution_cost_write_length: 0,
tx_count: 1,
signer_bitvec: null,
signer_bitvec: args?.signer_bitvec ?? null,
};
}

Expand Down
Loading