Skip to content

VGFP/get-pending-transactions-evm-python

Repository files navigation

How to get pending transactions using python

Pending transactions are transactions that have not been included in a block. They are signed so EVM will try to execute them but they may fail for a number of reasons (sender cancelled it by front running cancel transaction, smart contract logic will fail or execution's cost will exceed the gas limit).

Install required libraries

pip install --no-cache-dir -r requirements.txt

Create websocket client

If we want to lisen to pending transactions, we need to create a websocket connection. You gonna need websocket node endpoint address.

import json
from pprint import pprint
from web3 import Web3
from dotenv import load_dotenv
import os

# Loading the .env file
load_dotenv()

wss_url = os.getenv('WSS_URL')

web3_client = Web3(Web3.WebsocketProvider(wss_url))

pprint(web3_client.isConnected())

After that we will get all pending transactions and listen to them. We can get transaction and after that filter them by value or by function.

for tx in pending_transactions:
    transaction = web3_client.eth.getTransaction(tx)
    if transaction and transaction.value > 2:
        pprint(dict(transaction))

Filter transactions by contract address and get input data

To get data from transaction we need to create contract object.

web3_client = Web3(Web3.WebsocketProvider(wss_url))

uniswap_router = web3_client.eth.contract(
    address='0x68b3465833fb72A70ecDF485E0e4C7bD8665Fc45',
    abi=json.loads(open('uniswap_router_2.json').read())
)

quickswap_router = web3_client.eth.contract(
    address='0xa5E0829CaCEd8fFDD4De3c43696c57F7D7A678ff',
    abi=json.loads(open('quickswap_router.json').read())
)

After that we lisen to pending transactions and like before and filter by address. If address is equal to contract address, we can get input data by using decode_function_input from contract object.

def handle_event(event):
    try:
        transaction = dict(web3_client.eth.getTransaction(event))
    except Exception:
        return

    # Here you can add filtering
    if not transaction.get('to'):
        return
    if transaction.get('to') == uniswap_router.address:
        pprint(transaction)
        decoded_data = uniswap_router.decode_function_input(transaction.get('data'))
        pprint(decoded_data)
    elif transaction.get('to') == quickswap_router.address:
        pprint(transaction)
        decoded_data = quickswap_router.decode_function_input(transaction.get('input'))
        pprint(decoded_data)

Sources and useful links

In case of futher questions, please create Issue with label "Question".

About

Simple example how to get pending transactions from EVM using Python

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages