diff --git a/.github/workflows/build-publish-eth-rpc.yml b/.github/workflows/build-publish-eth-rpc.yml index a98b3881a1453..621ac3cd44c3c 100644 --- a/.github/workflows/build-publish-eth-rpc.yml +++ b/.github/workflows/build-publish-eth-rpc.yml @@ -13,7 +13,6 @@ concurrency: env: ETH_RPC_IMAGE_NAME: "docker.io/paritypr/eth-rpc" - ETH_INDEXER_IMAGE_NAME: "docker.io/paritypr/eth-indexer" jobs: set-variables: @@ -53,15 +52,6 @@ jobs: tags: | ${{ env.ETH_RPC_IMAGE_NAME }}:${{ env.VERSION }} - - name: Build eth-indexer Docker image - uses: docker/build-push-action@v6 - with: - context: . - file: ./substrate/frame/revive/rpc/dockerfiles/eth-indexer/Dockerfile - push: false - tags: | - ${{ env.ETH_INDEXER_IMAGE_NAME }}:${{ env.VERSION }} - build_push_docker: name: Build and push docker images runs-on: parity-large @@ -88,11 +78,3 @@ jobs: tags: | ${{ env.ETH_RPC_IMAGE_NAME }}:${{ env.VERSION }} - - name: Build eth-indexer Docker image - uses: docker/build-push-action@v6 - with: - context: . - file: ./substrate/frame/revive/rpc/dockerfiles/eth-indexer/Dockerfile - push: true - tags: | - ${{ env.ETH_INDEXER_IMAGE_NAME }}:${{ env.VERSION }} diff --git a/Cargo.lock b/Cargo.lock index 57c70150a2072..38857f568fff3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8674,9 +8674,9 @@ checksum = "add0ab9360ddbd88cfeb3bd9574a1d85cfdfa14db10b3e21d3700dbc4328758f" [[package]] name = "httparse" -version = "1.8.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904" +checksum = "f2d708df4e7140240a16cd6ab0ab65c972d7433ab77819ea693fde9c43811e2a" [[package]] name = "httpdate" @@ -8716,9 +8716,9 @@ dependencies = [ [[package]] name = "hyper" -version = "1.3.1" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe575dd17d0862a9a33781c8c4696a55c320909004a67a00fb286ba8b1bc496d" +checksum = "cc2b571658e38e0c01b1fdca3bbbe93c00d3d71693ff2770043f8c29bc7d6f80" dependencies = [ "bytes", "futures-channel", @@ -8759,7 +8759,7 @@ checksum = "08afdbb5c31130e3034af566421053ab03787c640246a446327f550d11bcb333" dependencies = [ "futures-util", "http 1.1.0", - "hyper 1.3.1", + "hyper 1.6.0", "hyper-util", "log", "rustls 0.23.18", @@ -8798,20 +8798,19 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.5" +version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b875924a60b96e5d7b9ae7b066540b1dd1cbd90d1828f54c92e02a283351c56" +checksum = "df2dcfbe0677734ab2f3ffa7fa7bfd4706bfdc1ef393f2ee30184aed67e631b4" dependencies = [ "bytes", "futures-channel", "futures-util", "http 1.1.0", "http-body 1.0.0", - "hyper 1.3.1", + "hyper 1.6.0", "pin-project-lite", "socket2 0.5.7", "tokio", - "tower", "tower-service", "tracing", ] @@ -9415,7 +9414,7 @@ dependencies = [ "async-trait", "base64 0.22.1", "http-body 1.0.0", - "hyper 1.3.1", + "hyper 1.6.0", "hyper-rustls 0.27.3", "hyper-util", "jsonrpsee-core", @@ -9454,7 +9453,7 @@ dependencies = [ "http 1.1.0", "http-body 1.0.0", "http-body-util", - "hyper 1.3.1", + "hyper 1.6.0", "hyper-util", "jsonrpsee-core", "jsonrpsee-types", @@ -17903,7 +17902,7 @@ dependencies = [ "futures", "futures-timer", "http-body-util", - "hyper 1.3.1", + "hyper 1.6.0", "hyper-util", "log", "parity-scale-codec", @@ -21343,7 +21342,7 @@ dependencies = [ "http 1.1.0", "http-body 1.0.0", "http-body-util", - "hyper 1.3.1", + "hyper 1.6.0", "hyper-rustls 0.27.3", "hyper-util", "ipnet", @@ -23352,7 +23351,7 @@ dependencies = [ "futures", "futures-timer", "http-body-util", - "hyper 1.3.1", + "hyper 1.6.0", "hyper-rustls 0.27.3", "hyper-util", "log", @@ -23462,7 +23461,7 @@ dependencies = [ "governor", "http 1.1.0", "http-body-util", - "hyper 1.3.1", + "hyper 1.6.0", "ip_network", "jsonrpsee", "log", @@ -28686,7 +28685,7 @@ name = "substrate-prometheus-endpoint" version = "0.17.0" dependencies = [ "http-body-util", - "hyper 1.3.1", + "hyper 1.6.0", "hyper-util", "log", "prometheus", diff --git a/prdoc/pr_7493.prdoc b/prdoc/pr_7493.prdoc new file mode 100644 index 0000000000000..e613cb049551f --- /dev/null +++ b/prdoc/pr_7493.prdoc @@ -0,0 +1,9 @@ +title: '[pallet-revive] fix eth-rpc indexing' +doc: +- audience: Runtime Dev + description: |- + - Fix a deadlock on the RWLock cache + - Remove eth-indexer, we won't need it anymore, the indexing will be started from within eth-rpc directly +crates: +- name: pallet-revive-eth-rpc + bump: minor diff --git a/substrate/frame/revive/rpc/.sqlx/query-027a434a38822c2ba4439e8f9f9c1135227c1150f2c5083d1c7c6086b717ada0.json b/substrate/frame/revive/rpc/.sqlx/query-027a434a38822c2ba4439e8f9f9c1135227c1150f2c5083d1c7c6086b717ada0.json deleted file mode 100644 index 016276144901a..0000000000000 --- a/substrate/frame/revive/rpc/.sqlx/query-027a434a38822c2ba4439e8f9f9c1135227c1150f2c5083d1c7c6086b717ada0.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n\t\t\t\tINSERT INTO transaction_hashes (transaction_hash, block_hash, transaction_index)\n\t\t\t\tVALUES ($1, $2, $3)\n\n\t\t\t\tON CONFLICT(transaction_hash) DO UPDATE SET\n\t\t\t\tblock_hash = EXCLUDED.block_hash,\n\t\t\t\ttransaction_index = EXCLUDED.transaction_index\n\t\t\t\t", - "describe": { - "columns": [], - "parameters": { - "Right": 3 - }, - "nullable": [] - }, - "hash": "027a434a38822c2ba4439e8f9f9c1135227c1150f2c5083d1c7c6086b717ada0" -} diff --git a/substrate/frame/revive/rpc/.sqlx/query-29af64347f700919dc2ee12463f332be50096d4e37be04ed8b6f46ac5c242043.json b/substrate/frame/revive/rpc/.sqlx/query-29af64347f700919dc2ee12463f332be50096d4e37be04ed8b6f46ac5c242043.json index 2443035c433d7..b125d2401d831 100644 --- a/substrate/frame/revive/rpc/.sqlx/query-29af64347f700919dc2ee12463f332be50096d4e37be04ed8b6f46ac5c242043.json +++ b/substrate/frame/revive/rpc/.sqlx/query-29af64347f700919dc2ee12463f332be50096d4e37be04ed8b6f46ac5c242043.json @@ -6,7 +6,7 @@ { "name": "block_hash", "ordinal": 0, - "type_info": "Text" + "type_info": "Blob" }, { "name": "transaction_index", diff --git a/substrate/frame/revive/rpc/.sqlx/query-de639fe0ac0c3cfdfbbfa07e81a61dae3a284ad4f7979dacb0ccff32cdc39051.json b/substrate/frame/revive/rpc/.sqlx/query-de639fe0ac0c3cfdfbbfa07e81a61dae3a284ad4f7979dacb0ccff32cdc39051.json new file mode 100644 index 0000000000000..6df5453213956 --- /dev/null +++ b/substrate/frame/revive/rpc/.sqlx/query-de639fe0ac0c3cfdfbbfa07e81a61dae3a284ad4f7979dacb0ccff32cdc39051.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "\n\t\t\t\tINSERT OR REPLACE INTO transaction_hashes (transaction_hash, block_hash, transaction_index)\n\t\t\t\tVALUES ($1, $2, $3)\n\t\t\t\t", + "describe": { + "columns": [], + "parameters": { + "Right": 3 + }, + "nullable": [] + }, + "hash": "de639fe0ac0c3cfdfbbfa07e81a61dae3a284ad4f7979dacb0ccff32cdc39051" +} diff --git a/substrate/frame/revive/rpc/Cargo.toml b/substrate/frame/revive/rpc/Cargo.toml index 014231f7f3e55..c333c9816e579 100644 --- a/substrate/frame/revive/rpc/Cargo.toml +++ b/substrate/frame/revive/rpc/Cargo.toml @@ -13,10 +13,6 @@ default-run = "eth-rpc" name = "eth-rpc" path = "src/main.rs" -[[bin]] -name = "eth-indexer" -path = "src/eth-indexer.rs" - [[bin]] name = "eth-rpc-tester" path = "src/eth-rpc-tester.rs" diff --git a/substrate/frame/revive/rpc/dockerfiles/eth-indexer/Dockerfile b/substrate/frame/revive/rpc/dockerfiles/eth-indexer/Dockerfile deleted file mode 100644 index 77fa846a145ce..0000000000000 --- a/substrate/frame/revive/rpc/dockerfiles/eth-indexer/Dockerfile +++ /dev/null @@ -1,28 +0,0 @@ -FROM rust AS builder - -RUN apt-get update && \ - DEBIAN_FRONTEND=noninteractive apt-get install -y \ - protobuf-compiler \ - clang libclang-dev - -WORKDIR /polkadot -COPY . /polkadot -RUN rustup component add rust-src -RUN cargo build --locked --profile production -p pallet-revive-eth-rpc --bin eth-indexer - -FROM docker.io/parity/base-bin:latest -COPY --from=builder /polkadot/target/production/eth-indexer /usr/local/bin - -USER root -RUN useradd -m -u 1001 -U -s /bin/sh -d /polkadot polkadot && \ -# unclutter and minimize the attack surface - rm -rf /usr/bin /usr/sbin && \ -# check if executable works in this container - /usr/local/bin/eth-indexer --help - -USER polkadot - -ENTRYPOINT ["/usr/local/bin/eth-indexer"] - -# We call the help by default -CMD ["--help"] diff --git a/substrate/frame/revive/rpc/examples/js/.prettierrc.json b/substrate/frame/revive/rpc/examples/js/.prettierrc.json index e74ed9ff35785..123daf08ec5d7 100644 --- a/substrate/frame/revive/rpc/examples/js/.prettierrc.json +++ b/substrate/frame/revive/rpc/examples/js/.prettierrc.json @@ -1,6 +1,6 @@ { - "trailingComma": "es5", - "tabWidth": 4, - "semi": false, - "singleQuote": true + "trailingComma": "es5", + "tabWidth": 4, + "semi": false, + "singleQuote": true } diff --git a/substrate/frame/revive/rpc/examples/js/contracts/.solhint.json b/substrate/frame/revive/rpc/examples/js/contracts/.solhint.json index ce2220e0b7560..2614a969da39b 100644 --- a/substrate/frame/revive/rpc/examples/js/contracts/.solhint.json +++ b/substrate/frame/revive/rpc/examples/js/contracts/.solhint.json @@ -1,3 +1,3 @@ { - "extends": "solhint:recommended" + "extends": "solhint:recommended" } diff --git a/substrate/frame/revive/rpc/examples/js/package.json b/substrate/frame/revive/rpc/examples/js/package.json index f2c4b8d780932..e181461cf8615 100644 --- a/substrate/frame/revive/rpc/examples/js/package.json +++ b/substrate/frame/revive/rpc/examples/js/package.json @@ -6,7 +6,8 @@ "scripts": { "dev": "vite", "build": "tsc && vite build", - "preview": "vite preview" + "preview": "vite preview", + "prettier": "prettier --write ." }, "dependencies": { "@parity/revive": "^0.0.9", diff --git a/substrate/frame/revive/rpc/examples/js/src/geth-diff.test.ts b/substrate/frame/revive/rpc/examples/js/src/geth-diff.test.ts index 2a4ff2edcdf52..51bbad3c17966 100644 --- a/substrate/frame/revive/rpc/examples/js/src/geth-diff.test.ts +++ b/substrate/frame/revive/rpc/examples/js/src/geth-diff.test.ts @@ -6,16 +6,16 @@ import { waitForHealth, polkadotSdkPath, } from './util.ts' -import { afterAll, afterEach, beforeAll, describe, expect, test } from 'bun:test' -import { encodeFunctionData, Hex, parseEther } from 'viem' +import { afterAll, afterEach, describe, expect, test } from 'bun:test' +import { encodeFunctionData, Hex, parseEther, decodeEventLog, keccak256, toHex } from 'viem' import { ErrorsAbi } from '../abi/Errors' -import { FlipperCallerAbi } from '../abi/FlipperCaller' -import { FlipperAbi } from '../abi/Flipper' +import { EventExampleAbi } from '../abi/EventExample' import { Subprocess, spawn } from 'bun' import { fail } from 'node:assert' const procs: Subprocess[] = [] -if (!process.env.USE_LIVE_SERVERS) { +if (process.env.START_GETH) { + process.env.USE_ETH_RPC = 'true' procs.push( // Run geth on port 8546 await (async () => { @@ -30,7 +30,12 @@ if (!process.env.USE_LIVE_SERVERS) { await waitForHealth('http://localhost:8546').catch() return proc - })(), + })() + ) +} + +if (process.env.START_SUBSTRATE_NODE) { + procs.push( //Run the substate node (() => { killProcessOnPort(9944) @@ -42,13 +47,19 @@ if (!process.env.USE_LIVE_SERVERS) { '-l=error,evm=debug,sc_rpc_server=info,runtime::revive=debug', ], { - stdout: Bun.file('/tmp/kitchensink.out.log'), - stderr: Bun.file('/tmp/kitchensink.err.log'), + stdout: Bun.file('/tmp/substrate-node.out.log'), + stderr: Bun.file('/tmp/substrate-node.err.log'), cwd: polkadotSdkPath, } ) - })(), - // Run eth-rpc on 8545 + })() + ) +} + +if (process.env.START_ETH_RPC) { + process.env.USE_ETH_RPC = 'true' + // Run eth-rpc on 8545 + procs.push( await (async () => { killProcessOnPort(8545) console.log('Starting eth-rpc') @@ -79,53 +90,49 @@ afterAll(async () => { procs.forEach((proc) => proc.kill()) }) -const envs = await Promise.all([createEnv('geth'), createEnv('kitchensink')]) +const envs = await Promise.all([ + ...(process.env.USE_GETH ? [createEnv('geth')] : []), + ...(process.env.USE_ETH_RPC ? [createEnv('eth-rpc')] : []), +]) for (const env of envs) { describe(env.serverWallet.chain.name, () => { - let errorsAddr: Hex = '0x' - let flipperAddr: Hex = '0x' - let flipperCallerAddr: Hex = '0x' - beforeAll(async () => { - { + const getErrorTesterAddr = (() => { + let contractAddress: Hex = '0x' + return async () => { + if (contractAddress !== '0x') { + return contractAddress + } const hash = await env.serverWallet.deployContract({ abi: ErrorsAbi, bytecode: getByteCode('Errors', env.evm), }) const deployReceipt = await env.serverWallet.waitForTransactionReceipt({ hash }) - if (!deployReceipt.contractAddress) - throw new Error('Contract address should be set') - errorsAddr = deployReceipt.contractAddress - } - - { - const hash = await env.serverWallet.deployContract({ - abi: FlipperAbi, - bytecode: getByteCode('Flipper', env.evm), - }) - const deployReceipt = await env.serverWallet.waitForTransactionReceipt({ hash }) - if (!deployReceipt.contractAddress) - throw new Error('Contract address should be set') - flipperAddr = deployReceipt.contractAddress + contractAddress = deployReceipt.contractAddress! + return contractAddress } + })() - { + const getEventExampleAddr = (() => { + let contractAddress: Hex = '0x' + return async () => { + if (contractAddress !== '0x') { + return contractAddress + } const hash = await env.serverWallet.deployContract({ - abi: FlipperCallerAbi, - args: [flipperAddr], - bytecode: getByteCode('FlipperCaller', env.evm), + abi: EventExampleAbi, + bytecode: getByteCode('EventExample', env.evm), }) const deployReceipt = await env.serverWallet.waitForTransactionReceipt({ hash }) - if (!deployReceipt.contractAddress) - throw new Error('Contract address should be set') - flipperCallerAddr = deployReceipt.contractAddress + contractAddress = deployReceipt.contractAddress! + return contractAddress } - }) + })() test('triggerAssertError', async () => { try { await env.accountWallet.readContract({ - address: errorsAddr, + address: await getErrorTesterAddr(), abi: ErrorsAbi, functionName: 'triggerAssertError', }) @@ -143,7 +150,7 @@ for (const env of envs) { test('triggerRevertError', async () => { try { await env.accountWallet.readContract({ - address: errorsAddr, + address: await getErrorTesterAddr(), abi: ErrorsAbi, functionName: 'triggerRevertError', }) @@ -161,7 +168,7 @@ for (const env of envs) { test('triggerDivisionByZero', async () => { try { await env.accountWallet.readContract({ - address: errorsAddr, + address: await getErrorTesterAddr(), abi: ErrorsAbi, functionName: 'triggerDivisionByZero', }) @@ -181,7 +188,7 @@ for (const env of envs) { test('triggerOutOfBoundsError', async () => { try { await env.accountWallet.readContract({ - address: errorsAddr, + address: await getErrorTesterAddr(), abi: ErrorsAbi, functionName: 'triggerOutOfBoundsError', }) @@ -201,7 +208,7 @@ for (const env of envs) { test('triggerCustomError', async () => { try { await env.accountWallet.readContract({ - address: errorsAddr, + address: await getErrorTesterAddr(), abi: ErrorsAbi, functionName: 'triggerCustomError', }) @@ -219,7 +226,7 @@ for (const env of envs) { test('eth_call (not enough funds)', async () => { try { await env.emptyWallet.simulateContract({ - address: errorsAddr, + address: await getErrorTesterAddr(), abi: ErrorsAbi, functionName: 'valueMatch', value: parseEther('10'), @@ -255,7 +262,7 @@ for (const env of envs) { test('eth_estimate (not enough funds)', async () => { try { await env.emptyWallet.estimateContractGas({ - address: errorsAddr, + address: await getErrorTesterAddr(), abi: ErrorsAbi, functionName: 'valueMatch', value: parseEther('10'), @@ -273,7 +280,7 @@ for (const env of envs) { test('eth_estimate call caller (not enough funds)', async () => { try { await env.emptyWallet.estimateContractGas({ - address: errorsAddr, + address: await getErrorTesterAddr(), abi: ErrorsAbi, functionName: 'valueMatch', value: parseEther('10'), @@ -291,7 +298,7 @@ for (const env of envs) { test('eth_estimate (revert)', async () => { try { await env.serverWallet.estimateContractGas({ - address: errorsAddr, + address: await getErrorTesterAddr(), abi: ErrorsAbi, functionName: 'valueMatch', value: parseEther('11'), @@ -322,7 +329,7 @@ for (const env of envs) { expect(balance).toBe(0n) try { await env.emptyWallet.estimateContractGas({ - address: errorsAddr, + address: await getErrorTesterAddr(), abi: ErrorsAbi, functionName: 'setState', args: [true], @@ -352,7 +359,7 @@ for (const env of envs) { { data, from: env.emptyWallet.account.address, - to: errorsAddr, + to: await getErrorTesterAddr(), }, ], }) diff --git a/substrate/frame/revive/rpc/examples/js/src/spammer.ts b/substrate/frame/revive/rpc/examples/js/src/spammer.ts index c038afa71f0aa..29bdf20d935c7 100644 --- a/substrate/frame/revive/rpc/examples/js/src/spammer.ts +++ b/substrate/frame/revive/rpc/examples/js/src/spammer.ts @@ -11,7 +11,7 @@ import { import { FlipperAbi } from '../abi/Flipper' //Run the substate node -console.log('🚀 Start kitchensink...') +console.log('🚀 Start substrate-node...') killProcessOnPort(9944) spawn( [ @@ -20,8 +20,8 @@ spawn( '-l=error,evm=debug,sc_rpc_server=info,runtime::revive=debug', ], { - stdout: Bun.file('/tmp/kitchensink.out.log'), - stderr: Bun.file('/tmp/kitchensink.err.log'), + stdout: Bun.file('/tmp/substrate-node.out.log'), + stderr: Bun.file('/tmp/substrate-node.err.log'), cwd: polkadotSdkPath, } ) @@ -60,7 +60,7 @@ spawn( ) await waitForHealth('http://localhost:8545').catch() -const env = await createEnv('kitchensink') +const env = await createEnv('eth-rpc') const wallet = env.accountWallet console.log('🚀 Deploy flipper...') diff --git a/substrate/frame/revive/rpc/examples/js/src/util.ts b/substrate/frame/revive/rpc/examples/js/src/util.ts index 2991bdfe6367b..3a488da67d801 100644 --- a/substrate/frame/revive/rpc/examples/js/src/util.ts +++ b/substrate/frame/revive/rpc/examples/js/src/util.ts @@ -35,10 +35,10 @@ export function killProcessOnPort(port: number) { } export let jsonRpcErrors: JsonRpcError[] = [] -export async function createEnv(name: 'geth' | 'kitchensink') { +export async function createEnv(name: 'geth' | 'eth-rpc') { const gethPort = process.env.GETH_PORT || '8546' - const kitchensinkPort = process.env.KITCHENSINK_PORT || '8545' - const url = `http://localhost:${name == 'geth' ? gethPort : kitchensinkPort}` + const ethRpcPort = process.env.ETH_RPC_PORT || '8545' + const url = `http://localhost:${name == 'geth' ? gethPort : ethRpcPort}` const chain = defineChain({ id: name == 'geth' ? 1337 : 420420420, name, diff --git a/substrate/frame/revive/rpc/migrations/0001_create_transaction_hashes.sql b/substrate/frame/revive/rpc/migrations/0001_create_transaction_hashes.sql new file mode 100644 index 0000000000000..8fd6b353faa82 --- /dev/null +++ b/substrate/frame/revive/rpc/migrations/0001_create_transaction_hashes.sql @@ -0,0 +1,22 @@ +-- Useful commands: +-- +-- Set DATABASE_URL environment variable. +-- export DATABASE_URL=sqlite:///$HOME/eth_rpc.db +-- +-- Create DB: +-- cargo sqlx database create +-- +-- Run migration manually: +-- cargo sqlx migrate run +-- +-- Update compile time artifacts: +-- cargo sqlx prepare +CREATE TABLE IF NOT EXISTS transaction_hashes ( + transaction_hash BLOB NOT NULL PRIMARY KEY, + transaction_index INTEGER NOT NULL, + block_hash BLOB NOT NULL +); + +CREATE INDEX IF NOT EXISTS idx_block_hash ON transaction_hashes ( + block_hash +); diff --git a/substrate/frame/revive/rpc/migrations/20241205165418_create_transaction_hashes.sql b/substrate/frame/revive/rpc/migrations/20241205165418_create_transaction_hashes.sql deleted file mode 100644 index 43405bea9d046..0000000000000 --- a/substrate/frame/revive/rpc/migrations/20241205165418_create_transaction_hashes.sql +++ /dev/null @@ -1,15 +0,0 @@ --- Create DB: --- DATABASE_URL="..." cargo sqlx database create --- --- Run migration: --- DATABASE_URL="..." cargo sqlx migrate run --- --- Update compile time artifacts: --- DATABASE_URL="..." cargo sqlx prepare -CREATE TABLE transaction_hashes ( - transaction_hash CHAR(64) NOT NULL PRIMARY KEY, - transaction_index INTEGER NOT NULL, - block_hash CHAR(64) NOT NULL -); - -CREATE INDEX idx_block_hash ON transaction_hashes (block_hash); diff --git a/substrate/frame/revive/rpc/src/block_info_provider.rs b/substrate/frame/revive/rpc/src/block_info_provider.rs index 0e91869cddaa2..675a83ed6558b 100644 --- a/substrate/frame/revive/rpc/src/block_info_provider.rs +++ b/substrate/frame/revive/rpc/src/block_info_provider.rs @@ -69,30 +69,23 @@ impl BlockInfoProviderImpl { ) -> Self { Self { api, rpc, cache: Arc::new(RwLock::new(BlockCache::new(cache_size))) } } - - async fn cache(&self) -> tokio::sync::RwLockReadGuard<'_, BlockCache> { - self.cache.read().await - } } #[async_trait] impl BlockInfoProvider for BlockInfoProviderImpl { async fn cache_block(&self, block: SubstrateBlock) -> Option { - let mut cache = self.cache.write().await; - cache.insert(block) + self.cache.write().await.insert(block) } async fn latest_block(&self) -> Option> { - let cache = self.cache().await; - cache.buffer.back().cloned() + self.cache.read().await.buffer.back().cloned() } async fn block_by_number( &self, block_number: SubstrateBlockNumber, ) -> Result>, ClientError> { - let cache = self.cache().await; - if let Some(block) = cache.blocks_by_number.get(&block_number).cloned() { + if let Some(block) = self.cache.read().await.blocks_by_number.get(&block_number).cloned() { return Ok(Some(block)); } @@ -104,8 +97,7 @@ impl BlockInfoProvider for BlockInfoProviderImpl { } async fn block_by_hash(&self, hash: &H256) -> Result>, ClientError> { - let cache = self.cache().await; - if let Some(block) = cache.blocks_by_hash.get(hash).cloned() { + if let Some(block) = self.cache.read().await.blocks_by_hash.get(hash).cloned() { return Ok(Some(block)); } diff --git a/substrate/frame/revive/rpc/src/cli.rs b/substrate/frame/revive/rpc/src/cli.rs index 7ebf53e7fbfe0..e40f3b1d053ce 100644 --- a/substrate/frame/revive/rpc/src/cli.rs +++ b/substrate/frame/revive/rpc/src/cli.rs @@ -16,7 +16,7 @@ // limitations under the License. //! The Ethereum JSON-RPC server. use crate::{ - client::{connect, native_to_eth_ratio, Client}, + client::{connect, native_to_eth_ratio, Client, SubscriptionType, SubstrateBlockNumber}, BlockInfoProvider, BlockInfoProviderImpl, CacheReceiptProvider, DBReceiptProvider, EthRpcServer, EthRpcServerImpl, ReceiptExtractor, ReceiptProvider, SystemHealthRpcServer, SystemHealthRpcServerImpl, LOG_TARGET, @@ -55,10 +55,9 @@ pub struct CliCommand { #[clap(long, env = "DATABASE_URL")] pub database_url: Option, - /// If true, we will only read from the database and not write to it. - /// Only useful if `--database-url` is specified. - #[clap(long, default_value = "true")] - pub database_read_only: bool, + /// If not provided, only new blocks will be indexed + #[clap(long)] + pub index_until_block: Option, #[allow(missing_docs)] #[clap(flatten)] @@ -94,6 +93,51 @@ fn init_logger(params: &SharedParams) -> anyhow::Result<()> { Ok(()) } +fn build_client( + tokio_handle: &tokio::runtime::Handle, + cache_size: usize, + node_rpc_url: &str, + database_url: Option<&str>, + abort_signal: Signals, +) -> anyhow::Result { + let fut = async { + let (api, rpc_client, rpc) = connect(node_rpc_url).await?; + let block_provider: Arc = + Arc::new(BlockInfoProviderImpl::new(cache_size, api.clone(), rpc.clone())); + + let receipt_extractor = ReceiptExtractor::new(native_to_eth_ratio(&api).await?); + let receipt_provider: Arc = if let Some(database_url) = database_url { + log::info!(target: LOG_TARGET, "🔗 Connecting to provided database"); + Arc::new(( + CacheReceiptProvider::default(), + DBReceiptProvider::new( + database_url, + block_provider.clone(), + receipt_extractor.clone(), + ) + .await?, + )) + } else { + log::info!(target: LOG_TARGET, "🔌 No database provided, using in-memory cache"); + Arc::new(CacheReceiptProvider::default()) + }; + + let client = + Client::new(api, rpc_client, rpc, block_provider, receipt_provider, receipt_extractor) + .await?; + + Ok(client) + } + .fuse(); + pin_mut!(fut); + + match tokio_handle.block_on(abort_signal.try_until_signal(fut)) { + Ok(Ok(client)) => Ok(client), + Ok(Err(err)) => Err(err), + Err(_) => anyhow::bail!("Process interrupted"), + } +} + /// Start the JSON-RPC server using the given command line arguments. pub fn run(cmd: CliCommand) -> anyhow::Result<()> { let CliCommand { @@ -102,7 +146,7 @@ pub fn run(cmd: CliCommand) -> anyhow::Result<()> { node_rpc_url, cache_size, database_url, - database_read_only, + index_until_block, shared_params, .. } = cmd; @@ -138,58 +182,14 @@ pub fn run(cmd: CliCommand) -> anyhow::Result<()> { let tokio_runtime = sc_cli::build_runtime()?; let tokio_handle = tokio_runtime.handle(); let mut task_manager = TaskManager::new(tokio_handle.clone(), prometheus_registry)?; - let essential_spawn_handle = task_manager.spawn_essential_handle(); - - let gen_rpc_module = || { - let signals = tokio_runtime.block_on(async { Signals::capture() })?; - let fut = async { - let (api, rpc_client, rpc) = connect(&node_rpc_url).await?; - let block_provider: Arc = - Arc::new(BlockInfoProviderImpl::new(cache_size, api.clone(), rpc.clone())); - - let receipt_extractor = ReceiptExtractor::new(native_to_eth_ratio(&api).await?); - let receipt_provider: Arc = - if let Some(database_url) = database_url.as_ref() { - log::info!(target: LOG_TARGET, "🔗 Connecting to provided database"); - Arc::new(( - CacheReceiptProvider::default(), - DBReceiptProvider::new( - database_url, - database_read_only, - block_provider.clone(), - receipt_extractor.clone(), - ) - .await?, - )) - } else { - log::info!(target: LOG_TARGET, "🔌 No database provided, using in-memory cache"); - Arc::new(CacheReceiptProvider::default()) - }; - - let client = Client::new( - api, - rpc_client, - rpc, - block_provider, - receipt_provider, - receipt_extractor, - ) - .await?; - client.subscribe_and_cache_blocks(&essential_spawn_handle); - Ok::<_, crate::ClientError>(client) - } - .fuse(); - pin_mut!(fut); - - match tokio_handle.block_on(signals.try_until_signal(fut)) { - Ok(Ok(client)) => rpc_module(is_dev, client), - Ok(Err(err)) => { - log::error!("Error initializing: {err:?}"); - Err(sc_service::Error::Application(err.into())) - }, - Err(_) => Err(sc_service::Error::Application("Client connection interrupted".into())), - } - }; + + let client = build_client( + tokio_handle, + cache_size, + &node_rpc_url, + database_url.as_deref(), + tokio_runtime.block_on(async { Signals::capture() })?, + )?; // Prometheus metrics. if let Some(PrometheusConfig { port, registry }) = prometheus_config.clone() { @@ -200,8 +200,25 @@ pub fn run(cmd: CliCommand) -> anyhow::Result<()> { ); } - let rpc_server_handle = - start_rpc_servers(&rpc_config, prometheus_registry, tokio_handle, gen_rpc_module, None)?; + let rpc_server_handle = start_rpc_servers( + &rpc_config, + prometheus_registry, + tokio_handle, + || rpc_module(is_dev, client.clone()), + None, + )?; + + task_manager + .spawn_essential_handle() + .spawn("block-subscription", None, async move { + let fut1 = client.subscribe_and_cache_new_blocks(SubscriptionType::BestBlocks); + if let Some(index_until_block) = index_until_block { + let fut2 = client.cache_old_blocks(index_until_block); + tokio::join!(fut1, fut2); + } else { + fut1.await; + } + }); task_manager.keep_alive(rpc_server_handle); let signals = tokio_runtime.block_on(async { Signals::capture() })?; diff --git a/substrate/frame/revive/rpc/src/client.rs b/substrate/frame/revive/rpc/src/client.rs index e312114e90e30..e538259ca8b74 100644 --- a/substrate/frame/revive/rpc/src/client.rs +++ b/substrate/frame/revive/rpc/src/client.rs @@ -47,7 +47,7 @@ use subxt::{ Config, OnlineClient, }; use thiserror::Error; -use tokio::{sync::RwLock, try_join}; +use tokio::sync::RwLock; use crate::subxt_client::{self, SrcChainConfig}; @@ -212,12 +212,12 @@ pub async fn connect( (OnlineClient, ReconnectingRpcClient, LegacyRpcMethods), ClientError, > { - log::info!(target: LOG_TARGET, "Connecting to node at: {node_rpc_url} ..."); + log::info!(target: LOG_TARGET, "🌐 Connecting to node at: {node_rpc_url} ..."); let rpc_client = ReconnectingRpcClient::builder() .retry_policy(ExponentialBackoff::from_millis(100).max_delay(Duration::from_secs(10))) .build(node_rpc_url.to_string()) .await?; - log::info!(target: LOG_TARGET, "Connected to node at: {node_rpc_url}"); + log::info!(target: LOG_TARGET, "🌟 Connected to node at: {node_rpc_url}"); let api = OnlineClient::::from_rpc_client(rpc_client.clone()).await?; let rpc = LegacyRpcMethods::::new(RpcClient::new(rpc_client.clone())); @@ -259,18 +259,18 @@ impl Client { F: Fn(SubstrateBlock) -> Fut + Send + Sync, Fut: std::future::Future, ClientError>> + Send, { - log::info!(target: LOG_TARGET, "Subscribing to past blocks"); + log::info!(target: LOG_TARGET, "🔍 Subscribing to past blocks"); let mut block = self.api.blocks().at_latest().await.inspect_err(|err| { log::error!(target: LOG_TARGET, "Failed to fetch latest block: {err:?}"); })?; loop { let block_number = block.number(); - log::debug!(target: LOG_TARGET, "Processing block {block_number}"); + log::trace!(target: LOG_TARGET, "Processing past block #{block_number}"); let parent_hash = block.header().parent_hash; let control_flow = callback(block).await.inspect_err(|err| { - log::error!(target: LOG_TARGET, "Failed to process block {block_number}: {err:?}"); + log::error!(target: LOG_TARGET, "Failed to process past block #{block_number}: {err:?}"); })?; match control_flow { @@ -302,7 +302,7 @@ impl Client { F: Fn(SubstrateBlock) -> Fut + Send + Sync, Fut: std::future::Future> + Send, { - log::info!(target: LOG_TARGET, "Subscribing to new blocks"); + log::info!(target: LOG_TARGET, "🔍 Subscribing to new blocks"); let mut block_stream = match subscription_type { SubscriptionType::BestBlocks => self.api.blocks().subscribe_best().await, SubscriptionType::FinalizedBlocks => self.api.blocks().subscribe_finalized().await, @@ -339,56 +339,42 @@ impl Client { } /// Start the block subscription, and populate the block cache. - pub fn subscribe_and_cache_blocks(&self, spawn_handle: &sc_service::SpawnEssentialTaskHandle) { - let client = self.clone(); - spawn_handle.spawn("subscribe-blocks", None, async move { - let res = client - .subscribe_new_blocks(SubscriptionType::BestBlocks, |block| async { - let receipts = client.receipt_extractor.extract_from_block(&block).await?; - - client.receipt_provider.insert(&block.hash(), &receipts).await; - if let Some(pruned) = client.block_provider.cache_block(block).await { - client.receipt_provider.remove(&pruned).await; - } - - Ok(()) - }) - .await; - - if let Err(err) = res { - log::error!(target: LOG_TARGET, "Block subscription error: {err:?}"); - } - }); - } + pub async fn subscribe_and_cache_new_blocks(&self, subscription_type: SubscriptionType) { + let res = self + .subscribe_new_blocks(subscription_type, |block| async { + let receipts = self.receipt_extractor.extract_from_block(&block).await?; - /// Start the block subscription, and populate the block cache. - pub async fn subscribe_and_cache_receipts( - &self, - oldest_block: Option, - ) -> Result<(), ClientError> { - let new_blocks_fut = - self.subscribe_new_blocks(SubscriptionType::FinalizedBlocks, |block| async move { - let receipts = - self.receipt_extractor.extract_from_block(&block).await.inspect_err(|err| { - log::error!(target: LOG_TARGET, "Failed to extract receipts from block: {err:?}"); - })?; self.receipt_provider.insert(&block.hash(), &receipts).await; + if let Some(pruned) = self.block_provider.cache_block(block).await { + self.receipt_provider.remove(&pruned).await; + } + Ok(()) - }); + }) + .await; - let Some(oldest_block) = oldest_block else { return new_blocks_fut.await }; + if let Err(err) = res { + log::error!(target: LOG_TARGET, "Block subscription error: {err:?}"); + } + } - let old_blocks_fut = self.subscribe_past_blocks(|block| async move { - let receipts = self.receipt_extractor.extract_from_block(&block).await?; - self.receipt_provider.insert(&block.hash(), &receipts).await; - if block.number() == oldest_block { - Ok(ControlFlow::Break(())) - } else { - Ok(ControlFlow::Continue(())) - } - }); + /// Cache old blocks up to the given block number. + pub async fn cache_old_blocks(&self, oldest_block: SubstrateBlockNumber) { + let res = self + .subscribe_past_blocks(|block| async move { + let receipts = self.receipt_extractor.extract_from_block(&block).await?; + self.receipt_provider.archive(&block.hash(), &receipts).await; + if block.number() <= oldest_block { + Ok(ControlFlow::Break(())) + } else { + Ok(ControlFlow::Continue(())) + } + }) + .await; - try_join!(new_blocks_fut, old_blocks_fut).map(|_| ()) + if let Err(err) = res { + log::error!(target: LOG_TARGET, "Past Block subscription error: {err:?}"); + } } /// Expose the storage API. diff --git a/substrate/frame/revive/rpc/src/eth-indexer.rs b/substrate/frame/revive/rpc/src/eth-indexer.rs deleted file mode 100644 index 6e3dc0b4bc5cf..0000000000000 --- a/substrate/frame/revive/rpc/src/eth-indexer.rs +++ /dev/null @@ -1,98 +0,0 @@ -// This file is part of Substrate. - -// Copyright (C) Parity Technologies (UK) Ltd. -// SPDX-License-Identifier: Apache-2.0 - -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -//! The Ethereum JSON-RPC server. -use clap::Parser; -use pallet_revive_eth_rpc::{ - client::{connect, native_to_eth_ratio, Client, SubstrateBlockNumber}, - BlockInfoProvider, BlockInfoProviderImpl, DBReceiptProvider, ReceiptExtractor, ReceiptProvider, -}; -use sc_cli::SharedParams; -use std::sync::Arc; - -// Parsed command instructions from the command line -#[derive(Parser, Debug)] -#[clap(author, about, version)] -pub struct CliCommand { - /// The node url to connect to - #[clap(long, default_value = "ws://127.0.0.1:9944")] - pub node_rpc_url: String, - - /// Specifies the block number to start indexing from, going backwards from the current block. - /// If not provided, only new blocks will be indexed - #[clap(long)] - pub oldest_block: Option, - - /// The database used to store Ethereum transaction hashes. - #[clap(long, env = "DATABASE_URL")] - pub database_url: String, - - #[allow(missing_docs)] - #[clap(flatten)] - pub shared_params: SharedParams, -} - -/// Initialize the logger -#[cfg(not(test))] -fn init_logger(params: &SharedParams) -> anyhow::Result<()> { - let mut logger = sc_cli::LoggerBuilder::new(params.log_filters().join(",")); - logger - .with_log_reloading(params.enable_log_reloading) - .with_detailed_output(params.detailed_log_output); - - if let Some(tracing_targets) = ¶ms.tracing_targets { - let tracing_receiver = params.tracing_receiver.into(); - logger.with_profiling(tracing_receiver, tracing_targets); - } - - if params.disable_log_color { - logger.with_colors(false); - } - - logger.init()?; - Ok(()) -} - -#[tokio::main] -pub async fn main() -> anyhow::Result<()> { - let CliCommand { - node_rpc_url, database_url, shared_params: _shared_params, oldest_block, .. - } = CliCommand::parse(); - - #[cfg(not(test))] - init_logger(&_shared_params)?; - - let (api, rpc_client, rpc) = connect(&node_rpc_url).await?; - let block_provider: Arc = - Arc::new(BlockInfoProviderImpl::new(0, api.clone(), rpc.clone())); - let receipt_extractor = ReceiptExtractor::new(native_to_eth_ratio(&api).await?); - let receipt_provider: Arc = Arc::new( - DBReceiptProvider::new( - &database_url, - false, - block_provider.clone(), - receipt_extractor.clone(), - ) - .await?, - ); - - let client = - Client::new(api, rpc_client, rpc, block_provider, receipt_provider, receipt_extractor) - .await?; - client.subscribe_and_cache_receipts(oldest_block).await?; - - Ok(()) -} diff --git a/substrate/frame/revive/rpc/src/eth-rpc-tester.rs b/substrate/frame/revive/rpc/src/eth-rpc-tester.rs index 7cea1a303e383..460812602fe20 100644 --- a/substrate/frame/revive/rpc/src/eth-rpc-tester.rs +++ b/substrate/frame/revive/rpc/src/eth-rpc-tester.rs @@ -30,6 +30,10 @@ const DOCKER_CONTAINER_NAME: &str = "eth-rpc-test"; #[derive(Parser, Debug)] #[clap(author, about, version)] pub struct CliCommand { + /// The eth-rpc url to connect to + #[clap(long, default_value = "http://127.0.0.1:8545")] + pub rpc_url: String, + /// The parity docker image e.g eth-rpc:master-fb2e414f #[clap(long, default_value = "eth-rpc:master-fb2e414f")] docker_image: String, @@ -42,7 +46,11 @@ pub struct CliCommand { #[tokio::main] async fn main() -> anyhow::Result<()> { - let CliCommand { docker_bin, docker_image, .. } = CliCommand::parse(); + let CliCommand { docker_bin, rpc_url, docker_image, .. } = CliCommand::parse(); + + if std::env::var("SKIP_DOCKER").is_ok() { + return test_eth_rpc(&rpc_url).await; + } let mut docker_process = start_docker(&docker_bin, &docker_image)?; let stderr = docker_process.stderr.take().unwrap(); @@ -54,7 +62,7 @@ async fn main() -> anyhow::Result<()> { _ = interrupt() => { kill_docker().await?; } - _ = test_eth_rpc(stderr) => { + _ = wait_and_test_eth_rpc(stderr, &rpc_url) => { kill_docker().await?; } } @@ -101,7 +109,7 @@ async fn kill_docker() -> anyhow::Result<()> { Ok(()) } -async fn test_eth_rpc(stderr: ChildStderr) -> anyhow::Result<()> { +async fn wait_and_test_eth_rpc(stderr: ChildStderr, rpc_url: &str) -> anyhow::Result<()> { let mut reader = BufReader::new(stderr).lines(); while let Some(line) = reader.next_line().await? { println!("{line}"); @@ -110,6 +118,10 @@ async fn test_eth_rpc(stderr: ChildStderr) -> anyhow::Result<()> { } } + test_eth_rpc(rpc_url).await +} + +async fn test_eth_rpc(rpc_url: &str) -> anyhow::Result<()> { let account = Account::default(); let data = vec![]; let (bytes, _) = pallet_revive_fixtures::compile_module("dummy")?; @@ -117,7 +129,7 @@ async fn test_eth_rpc(stderr: ChildStderr) -> anyhow::Result<()> { println!("Account:"); println!("- address: {:?}", account.address()); - let client = Arc::new(HttpClientBuilder::default().build("http://localhost:8545")?); + let client = Arc::new(HttpClientBuilder::default().build(rpc_url)?); let nonce = client.get_transaction_count(account.address(), BlockTag::Latest.into()).await?; let balance = client.get_balance(account.address(), BlockTag::Latest.into()).await?; @@ -136,8 +148,8 @@ async fn test_eth_rpc(stderr: ChildStderr) -> anyhow::Result<()> { println!("\nReceipt:"); println!("Block explorer: https://westend-asset-hub-eth-explorer.parity.io/{:?}", tx.hash()); println!("- Block number: {block_number}"); - println!("- Gas used: {gas_used}"); - println!("- Address: {contract_address:?}"); + println!("- Gas used: {gas_used}"); + println!("- Address: {contract_address:?}"); println!("\n\n=== Calling dummy contract ===\n\n"); let tx = TransactionBuilder::new(&client).to(contract_address).send().await?; @@ -149,7 +161,7 @@ async fn test_eth_rpc(stderr: ChildStderr) -> anyhow::Result<()> { println!("\nReceipt:"); println!("Block explorer: https://westend-asset-hub-eth-explorer.parity.io/{:?}", tx.hash()); println!("- Block number: {block_number}"); - println!("- Gas used: {gas_used}"); - println!("- To: {to:?}"); + println!("- Gas used: {gas_used}"); + println!("- To: {to:?}"); Ok(()) } diff --git a/substrate/frame/revive/rpc/src/lib.rs b/substrate/frame/revive/rpc/src/lib.rs index 3599083dcd431..536678a97ac63 100644 --- a/substrate/frame/revive/rpc/src/lib.rs +++ b/substrate/frame/revive/rpc/src/lib.rs @@ -251,10 +251,10 @@ impl EthRpcServer for EthRpcServerImpl { async fn get_block_by_number( &self, - block: BlockNumberOrTag, + block_number: BlockNumberOrTag, hydrated_transactions: bool, ) -> RpcResult> { - let Some(block) = self.client.block_by_number_or_tag(&block).await? else { + let Some(block) = self.client.block_by_number_or_tag(&block_number).await? else { return Ok(None); }; let block = self.client.evm_block(block, hydrated_transactions).await; diff --git a/substrate/frame/revive/rpc/src/receipt_extractor.rs b/substrate/frame/revive/rpc/src/receipt_extractor.rs index e53f98639671c..6338f42ee0cc7 100644 --- a/substrate/frame/revive/rpc/src/receipt_extractor.rs +++ b/substrate/frame/revive/rpc/src/receipt_extractor.rs @@ -93,11 +93,11 @@ impl ReceiptExtractor { address: event.contract, topics: event.topics, data: Some(event.data.into()), - block_number: Some(block_number), + block_number, transaction_hash, - transaction_index: Some(transaction_index.into()), - block_hash: Some(block_hash), - log_index: Some(event_details.index().into()), + transaction_index: transaction_index.into(), + block_hash, + log_index: event_details.index().into(), ..Default::default() }) }) diff --git a/substrate/frame/revive/rpc/src/receipt_provider.rs b/substrate/frame/revive/rpc/src/receipt_provider.rs index 7c5e33cf01e39..bbed54a94b7dc 100644 --- a/substrate/frame/revive/rpc/src/receipt_provider.rs +++ b/substrate/frame/revive/rpc/src/receipt_provider.rs @@ -31,7 +31,10 @@ pub trait ReceiptProvider: Send + Sync { /// Insert receipts into the provider. async fn insert(&self, block_hash: &H256, receipts: &[(TransactionSigned, ReceiptInfo)]); - /// Remove receipts with the given block hash. + /// Similar to `insert`, but intended for archiving receipts from historical blocks. + async fn archive(&self, block_hash: &H256, receipts: &[(TransactionSigned, ReceiptInfo)]); + + /// Deletes receipts associated with the specified block hash. async fn remove(&self, block_hash: &H256); /// Get the receipt for the given block hash and transaction index. @@ -52,13 +55,17 @@ pub trait ReceiptProvider: Send + Sync { } #[async_trait] -impl ReceiptProvider for (Main, Fallback) { +impl ReceiptProvider for (Cache, Archive) { async fn insert(&self, block_hash: &H256, receipts: &[(TransactionSigned, ReceiptInfo)]) { join!(self.0.insert(block_hash, receipts), self.1.insert(block_hash, receipts)); } + async fn archive(&self, block_hash: &H256, receipts: &[(TransactionSigned, ReceiptInfo)]) { + self.1.insert(block_hash, receipts).await; + } + async fn remove(&self, block_hash: &H256) { - join!(self.0.remove(block_hash), self.1.remove(block_hash)); + self.0.remove(block_hash).await; } async fn receipt_by_block_hash_and_index( diff --git a/substrate/frame/revive/rpc/src/receipt_provider/cache.rs b/substrate/frame/revive/rpc/src/receipt_provider/cache.rs index a4741d18a3b34..765c12f890106 100644 --- a/substrate/frame/revive/rpc/src/receipt_provider/cache.rs +++ b/substrate/frame/revive/rpc/src/receipt_provider/cache.rs @@ -35,6 +35,8 @@ impl CacheReceiptProvider { #[async_trait] impl ReceiptProvider for CacheReceiptProvider { + async fn archive(&self, _block_hash: &H256, _receipts: &[(TransactionSigned, ReceiptInfo)]) {} + async fn insert(&self, block_hash: &H256, receipts: &[(TransactionSigned, ReceiptInfo)]) { let mut cache = self.cache.write().await; cache.insert(block_hash, receipts); diff --git a/substrate/frame/revive/rpc/src/receipt_provider/db.rs b/substrate/frame/revive/rpc/src/receipt_provider/db.rs index 15f4119b4aefd..42ffe93a9f8b6 100644 --- a/substrate/frame/revive/rpc/src/receipt_provider/db.rs +++ b/substrate/frame/revive/rpc/src/receipt_provider/db.rs @@ -32,24 +32,22 @@ pub struct DBReceiptProvider { block_provider: Arc, /// A means to extract receipts from extrinsics. receipt_extractor: ReceiptExtractor, - /// weather or not we should write to the DB. - read_only: bool, } impl DBReceiptProvider { /// Create a new `DBReceiptProvider` with the given database URL and block provider. pub async fn new( database_url: &str, - read_only: bool, block_provider: Arc, receipt_extractor: ReceiptExtractor, ) -> Result { let pool = SqlitePool::connect(database_url).await?; - Ok(Self { pool, block_provider, read_only, receipt_extractor }) + sqlx::migrate!().run(&pool).await?; + Ok(Self { pool, block_provider, receipt_extractor }) } async fn fetch_row(&self, transaction_hash: &H256) -> Option<(H256, usize)> { - let transaction_hash = hex::encode(transaction_hash); + let transaction_hash = transaction_hash.as_ref(); let result = query!( r#" SELECT block_hash, transaction_index @@ -62,7 +60,7 @@ impl DBReceiptProvider { .await .ok()??; - let block_hash = result.block_hash.parse::().ok()?; + let block_hash = H256::from_slice(&result.block_hash[..]); let transaction_index = result.transaction_index.try_into().ok()?; Some((block_hash, transaction_index)) } @@ -70,45 +68,38 @@ impl DBReceiptProvider { #[async_trait] impl ReceiptProvider for DBReceiptProvider { - async fn insert(&self, block_hash: &H256, receipts: &[(TransactionSigned, ReceiptInfo)]) { - if self.read_only { - return - } + async fn remove(&self, _block_hash: &H256) {} + + async fn archive(&self, block_hash: &H256, receipts: &[(TransactionSigned, ReceiptInfo)]) { + self.insert(block_hash, receipts).await; + } - let block_hash_str = hex::encode(block_hash); + async fn insert(&self, block_hash: &H256, receipts: &[(TransactionSigned, ReceiptInfo)]) { + let block_hash = block_hash.as_ref(); for (_, receipt) in receipts { - let transaction_hash = hex::encode(receipt.transaction_hash); + let transaction_hash: &[u8] = receipt.transaction_hash.as_ref(); let transaction_index = receipt.transaction_index.as_u32() as i32; let result = query!( r#" - INSERT INTO transaction_hashes (transaction_hash, block_hash, transaction_index) + INSERT OR REPLACE INTO transaction_hashes (transaction_hash, block_hash, transaction_index) VALUES ($1, $2, $3) - - ON CONFLICT(transaction_hash) DO UPDATE SET - block_hash = EXCLUDED.block_hash, - transaction_index = EXCLUDED.transaction_index "#, transaction_hash, - block_hash_str, + block_hash, transaction_index ) .execute(&self.pool) .await; if let Err(err) = result { - log::error!( - "Error inserting transaction for block hash {block_hash:?}: {:?}", - err - ); + log::error!("Error inserting transaction for block hash {block_hash:?}: {err:?}"); } } } - async fn remove(&self, _block_hash: &H256) {} - async fn receipts_count_per_block(&self, block_hash: &H256) -> Option { - let block_hash = hex::encode(block_hash); + let block_hash = block_hash.as_ref(); let row = query!( r#" SELECT COUNT(*) as count @@ -152,7 +143,7 @@ impl ReceiptProvider for DBReceiptProvider { } async fn signed_tx_by_hash(&self, transaction_hash: &H256) -> Option { - let transaction_hash = hex::encode(transaction_hash); + let transaction_hash = transaction_hash.as_ref(); let result = query!( r#" SELECT block_hash, transaction_index @@ -165,7 +156,7 @@ impl ReceiptProvider for DBReceiptProvider { .await .ok()??; - let block_hash = result.block_hash.parse::().ok()?; + let block_hash = H256::from_slice(&result.block_hash[..]); let transaction_index = result.transaction_index.try_into().ok()?; let block = self.block_provider.block_by_hash(&block_hash).await.ok()??; @@ -191,7 +182,6 @@ mod tests { pool, block_provider: Arc::new(MockBlockInfoProvider {}), receipt_extractor: ReceiptExtractor::new(1_000_000), - read_only: false, } } diff --git a/substrate/frame/revive/src/evm/api/rpc_types_gen.rs b/substrate/frame/revive/src/evm/api/rpc_types_gen.rs index 5d31613ca314b..e7003ee7c1891 100644 --- a/substrate/frame/revive/src/evm/api/rpc_types_gen.rs +++ b/substrate/frame/revive/src/evm/api/rpc_types_gen.rs @@ -377,17 +377,17 @@ pub struct Log { /// address pub address: Address, /// block hash - #[serde(rename = "blockHash", skip_serializing_if = "Option::is_none")] - pub block_hash: Option, + #[serde(rename = "blockHash")] + pub block_hash: H256, /// block number - #[serde(rename = "blockNumber", skip_serializing_if = "Option::is_none")] - pub block_number: Option, + #[serde(rename = "blockNumber")] + pub block_number: U256, /// data #[serde(skip_serializing_if = "Option::is_none")] pub data: Option, /// log index - #[serde(rename = "logIndex", skip_serializing_if = "Option::is_none")] - pub log_index: Option, + #[serde(rename = "logIndex")] + pub log_index: U256, /// removed #[serde(skip_serializing_if = "Option::is_none")] pub removed: Option, @@ -398,8 +398,8 @@ pub struct Log { #[serde(rename = "transactionHash")] pub transaction_hash: H256, /// transaction index - #[serde(rename = "transactionIndex", skip_serializing_if = "Option::is_none")] - pub transaction_index: Option, + #[serde(rename = "transactionIndex")] + pub transaction_index: U256, } /// Syncing progress