Skip to content
This repository has been archived by the owner on Nov 27, 2024. It is now read-only.

Geth support for inspect #112

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ Example:
export RPC_URL="http://111.111.111.111:8546"
```

**Note**: mev-inspect-py currently requires an RPC of a full archive node with support for Erigon traces and receipts. Geth additions have been added to translate geth traces and receipts to Erigon ones and can be accessed using `--geth` flag.

Next, start all services with:

Expand All @@ -65,6 +66,7 @@ On first startup, you'll need to apply database migrations with:
### Inspect a single block

Inspecting block [12914944](https://twitter.com/mevalphaleak/status/1420416437575901185):
**Note**: Add `--geth` at the end if RPC_URL points to a geth / geth like node.

```
./mev inspect 12914944
Expand All @@ -73,6 +75,7 @@ Inspecting block [12914944](https://twitter.com/mevalphaleak/status/142041643757
### Inspect many blocks

Inspecting blocks 12914944 to 12914954:
**Note**: Add `--geth` at the end if RPC_URL points to a geth / geth like node.

```
./mev inspect-many 12914944 12914954
Expand Down
30 changes: 26 additions & 4 deletions cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from mev_inspect.concurrency import coro
from mev_inspect.db import get_inspect_session, get_trace_session
from mev_inspect.inspector import MEVInspector
from mev_inspect.utils import RPCType

RPC_URL_ENV = "RPC_URL"

Expand All @@ -21,15 +22,29 @@ def cli():
@cli.command()
@click.argument("block_number", type=int)
@click.option("--rpc", default=lambda: os.environ.get(RPC_URL_ENV, ""))
@click.option(
"--type",
type=click.Choice(list(map(lambda x: x.name, RPCType)), case_sensitive=False),
default=RPCType.parity.name,
)
@coro
async def inspect_block_command(block_number: int, rpc: str):
async def inspect_block_command(block_number: int, rpc: str, type: str):
type_e = convert_str_to_enum(type)
inspect_db_session = get_inspect_session()
trace_db_session = get_trace_session()

inspector = MEVInspector(rpc, inspect_db_session, trace_db_session)
inspector = MEVInspector(rpc, inspect_db_session, trace_db_session, type_e)
await inspector.inspect_single_block(block=block_number)


def convert_str_to_enum(type: str) -> RPCType:
if type == "parity":
return RPCType.parity
elif type == "geth":
return RPCType.geth
raise ValueError


@cli.command()
@click.argument("block_number", type=int)
@click.option("--rpc", default=lambda: os.environ.get(RPC_URL_ENV, ""))
Expand All @@ -38,7 +53,7 @@ async def fetch_block_command(block_number: int, rpc: str):
inspect_db_session = get_inspect_session()
trace_db_session = get_trace_session()

inspector = MEVInspector(rpc, inspect_db_session, trace_db_session)
inspector = MEVInspector(rpc, inspect_db_session, trace_db_session, RPCType.parity)
block = await inspector.create_from_block(block_number=block_number)
print(block.json())

Expand All @@ -47,6 +62,11 @@ async def fetch_block_command(block_number: int, rpc: str):
@click.argument("after_block", type=int)
@click.argument("before_block", type=int)
@click.option("--rpc", default=lambda: os.environ.get(RPC_URL_ENV, ""))
@click.option(
"--type",
type=click.Choice(list(map(lambda x: x.name, RPCType)), case_sensitive=False),
default=RPCType.parity.name,
)
@click.option(
"--max-concurrency",
type=int,
Expand All @@ -63,14 +83,16 @@ async def inspect_many_blocks_command(
rpc: str,
max_concurrency: int,
request_timeout: int,
type: str,
):
type_e = convert_str_to_enum(type)
inspect_db_session = get_inspect_session()
trace_db_session = get_trace_session()

inspector = MEVInspector(
rpc,
inspect_db_session,
trace_db_session,
type_e,
max_concurrency=max_concurrency,
request_timeout=request_timeout,
)
Expand Down
193 changes: 178 additions & 15 deletions mev_inspect/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,17 @@
from mev_inspect.schemas.blocks import Block
from mev_inspect.schemas.receipts import Receipt
from mev_inspect.schemas.traces import Trace, TraceType
from mev_inspect.utils import hex_to_int
from mev_inspect.utils import RPCType, hex_to_int


logger = logging.getLogger(__name__)
_calltype_mapping = {
"CALL": "call",
"DELEGATECALL": "delegateCall",
"CREATE": "create",
"SUICIDE": "suicide",
"REWARD": "reward",
}


async def get_latest_block_number(base_provider) -> int:
Expand All @@ -27,6 +34,7 @@ async def get_latest_block_number(base_provider) -> int:
async def create_from_block_number(
base_provider,
w3: Web3,
type: RPCType,
block_number: int,
trace_db_session: Optional[orm.Session],
) -> Block:
Expand All @@ -36,13 +44,19 @@ async def create_from_block_number(
block = _find_block(trace_db_session, block_number)

if block is None:
block = await _fetch_block(w3, base_provider, block_number)
return block
else:
return block
if type is RPCType.parity:
block = await _fetch_block_parity(w3, base_provider, block_number)
elif type is RPCType.geth:
block = await _fetch_block_geth(w3, base_provider, block_number)
else:
logger.error(f"RPCType not known - {type}")
raise ValueError
return block


async def _fetch_block(w3, base_provider, block_number: int, retries: int = 0) -> Block:
async def _fetch_block_parity(
w3, base_provider, block_number: int, retries: int = 0
) -> Block:
block_json, receipts_json, traces_json, base_fee_per_gas = await asyncio.gather(
w3.eth.get_block(block_number),
base_provider.make_request("eth_getBlockReceipts", [block_number]),
Expand All @@ -55,24 +69,56 @@ async def _fetch_block(w3, base_provider, block_number: int, retries: int = 0) -
Receipt(**receipt) for receipt in receipts_json["result"]
]
traces = [Trace(**trace_json) for trace_json in traces_json["result"]]
return Block(
block_number=block_number,
block_timestamp=block_json["timestamp"],
miner=block_json["miner"],
base_fee_per_gas=base_fee_per_gas,
traces=traces,
receipts=receipts,
)
except KeyError as e:
logger.warning(
f"Failed to create objects from block: {block_number}: {e}, retrying: {retries + 1} / 3"
)
if retries < 3:
await asyncio.sleep(5)
return await _fetch_block(w3, base_provider, block_number, retries)
return await _fetch_block_parity(w3, base_provider, block_number, retries)
else:
raise

return Block(
block_number=block_number,
block_timestamp=block_json["timestamp"],
miner=block_json["miner"],
base_fee_per_gas=base_fee_per_gas,
traces=traces,
receipts=receipts,
)

async def _fetch_block_geth(
w3, base_provider, block_number: int, retries: int = 0
) -> Block:
block_json = await asyncio.gather(w3.eth.get_block(block_number))

try:
# Separate calls to help with load during block tracing
traces = await geth_get_tx_traces_parity_format(base_provider, block_json[0])
geth_tx_receipts = await geth_get_tx_receipts_async(
base_provider, block_json[0]["transactions"]
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these can be grouped with a gather to make in parallel

Copy link
Author

@supragya supragya Dec 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have seen extra load while tracing is being done hamper tracer in polygon. Sometimes the tracer runs out of time (internal timeout) and errors. Better to keep separate.

receipts = geth_receipts_translator(block_json[0], geth_tx_receipts)
base_fee_per_gas = 0 # Polygon specific, TODO for other chains

return Block(
block_number=block_number,
block_timestamp=block_json[0]["timestamp"],
miner=block_json[0]["miner"],
base_fee_per_gas=base_fee_per_gas,
traces=traces,
receipts=receipts,
)
except KeyError as e:
logger.warning(
f"Failed to create objects from block: {block_number}: {e}, retrying: {retries + 1} / 3"
)
if retries < 3:
await asyncio.sleep(5)
return await _fetch_block_geth(w3, base_provider, block_number, retries)
else:
raise


def _find_block(
Expand Down Expand Up @@ -191,3 +237,120 @@ def get_transaction_hashes(calls: List[Trace]) -> List[str]:
result.append(call.transaction_hash)

return result


# Geth specific additions


async def geth_get_tx_traces_parity_format(base_provider, block_json: dict):
# print(block_json['hash'].hex())
block_hash = block_json["hash"]
block_trace = await geth_get_tx_traces(base_provider, block_hash)
# print(block_trace)
parity_traces = []
for idx, trace in enumerate(block_trace["result"]):
if "result" in trace:
parity_traces.extend(
unwrap_tx_trace_for_parity(block_json, idx, trace["result"])
)
return parity_traces


async def geth_get_tx_traces(base_provider, block_hash):
block_trace = await base_provider.make_request(
"debug_traceBlockByHash", [block_hash.hex(), {"tracer": "callTracer"}]
)
return block_trace


def unwrap_tx_trace_for_parity(
block_json, tx_pos_in_block, tx_trace, position=[]
) -> List[Trace]:
response_list = []
try:
if tx_trace["type"] == "STATICCALL":
return []
action_dict = dict()
action_dict["callType"] = _calltype_mapping[tx_trace["type"]]
if action_dict["callType"] == "call":
action_dict["value"] = tx_trace["value"]
for key in ["from", "to", "gas", "input"]:
action_dict[key] = tx_trace[key]

result_dict = dict()
for key in ["gasUsed", "output"]:
result_dict[key] = tx_trace[key]

response_list.append(
Trace(
action=action_dict,
block_hash=str(block_json["hash"]),
block_number=int(block_json["number"]),
result=result_dict,
subtraces=len(tx_trace["calls"]) if "calls" in tx_trace.keys() else 0,
trace_address=position,
transaction_hash=block_json["transactions"][tx_pos_in_block].hex(),
transaction_position=tx_pos_in_block,
type=TraceType(_calltype_mapping[tx_trace["type"]]),
)
)
except Exception as e:
logger.warn(f"error while unwraping tx trace for parity {e}")
return []
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what causes this to get hit

Copy link
Author

@supragya supragya Dec 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from my experience, CREATE2. Not used in analysis

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that's the case, want to move this to the same as you have for STATICCALL where it checks for it explicitly?

It's better on our side to have it just blow up with an exception for now. We'd rather know if we're missing cases.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Added create2
  2. Added logger warning.
    Haven't yet made it to throw exception when stuff happens since it helps us back-fill in "best effort" form. Yet, person who wants to work on this further for translation to be "exact" can remove this. Can add a comment around here though. What do you think? @lukevs

Copy link
Collaborator

@lukevs lukevs Dec 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For 1, should there be a line like this?

        if tx_trace["type"] == "CREATE2":
            return []

For 2, is there anything that still makes it throw now that create2 is supported?

I'm a little hesitant to blanket accept all Exception - if there are known failures I'm ok with skipping those blocks for now, but I think we should at least limit the scope of exceptions to what we know

Otherwise for geth users they could be unknowingly missing blocks


if "calls" in tx_trace.keys():
for idx, subcall in enumerate(tx_trace["calls"]):
response_list.extend(
unwrap_tx_trace_for_parity(
block_json, tx_pos_in_block, subcall, position + [idx]
)
)
return response_list


async def geth_get_tx_receipts_task(base_provider, tx):
receipt = await base_provider.make_request("eth_getTransactionReceipt", [tx.hex()])
return receipt


async def geth_get_tx_receipts_async(base_provider, transactions):
geth_tx_receipts = []
tasks = [
asyncio.create_task(geth_get_tx_receipts_task(base_provider, tx))
for tx in transactions
]
geth_tx_receipts = await asyncio.gather(*tasks)
# return [json.loads(tx_receipts) for tx_receipts in geth_tx_receipts]
return geth_tx_receipts


def geth_receipts_translator(block_json, geth_tx_receipts) -> List[Receipt]:
json_decoded_receipts = [
tx_receipt["result"]
if tx_receipt != None and ("result" in tx_receipt.keys())
else None
for tx_receipt in geth_tx_receipts
]
results = []
for idx, tx_receipt in enumerate(json_decoded_receipts):
if tx_receipt != None:
results.append(unwrap_tx_receipt_for_parity(block_json, idx, tx_receipt))
return results


def unwrap_tx_receipt_for_parity(block_json, tx_pos_in_block, tx_receipt) -> Receipt:
if tx_pos_in_block != int(tx_receipt["transactionIndex"], 16):
logger.info(
"Alert the position of transaction in block is mismatched ",
tx_pos_in_block,
tx_receipt["transactionIndex"],
)
return Receipt(
block_number=block_json["number"],
transaction_hash=tx_receipt["transactionHash"],
transaction_index=tx_pos_in_block,
gas_used=tx_receipt["gasUsed"],
effective_gas_price=tx_receipt["effectiveGasPrice"],
cumulative_gas_used=tx_receipt["cumulativeGasUsed"],
to=tx_receipt["to"],
)
Loading