diff --git a/multicall/call.py b/multicall/call.py index fbb9d93..d20b1c6 100644 --- a/multicall/call.py +++ b/multicall/call.py @@ -7,7 +7,7 @@ from web3 import Web3 from multicall import Signature -from multicall.constants import Network, w3 +from multicall.constants import Network, w3, ASYNC_SEMAPHORE from multicall.exceptions import StateOverrideNotSupported from multicall.loggers import setup_logger from multicall.utils import (chain_id, get_async_w3, run_in_subprocess, @@ -100,6 +100,9 @@ def __call__(self, args: Optional[Any] = None, _w3: Optional[Web3] = None) -> An self.signature, self.returns, ) + + async def __await__(self) -> Any: + return await self.coroutine() @eth_retry.auto_retry async def coroutine(self, args: Optional[Any] = None, _w3: Optional[Web3] = None) -> Any: @@ -108,17 +111,18 @@ async def coroutine(self, args: Optional[Any] = None, _w3: Optional[Web3] = None if self.state_override_code and not state_override_supported(_w3): raise StateOverrideNotSupported(f'State override is not supported on {Network(chain_id(_w3)).__repr__()[1:-1]}.') - args = await run_in_subprocess( - prep_args, - self.target, - self.signature, - args or self.args, - self.block_id, - self.gas_limit, - self.state_override_code, - ) - - output = await get_async_w3(_w3).eth.call(*args) + async with ASYNC_SEMAPHORE: + output = await get_async_w3(_w3).eth.call( + *await run_in_subprocess( + prep_args, + self.target, + self.signature, + args or self.args, + self.block_id, + self.gas_limit, + self.state_override_code, + ) + ) return await run_in_subprocess(Call.decode_output, output, self.signature, self.returns) diff --git a/multicall/constants.py b/multicall/constants.py index 105f7a0..e97c27d 100644 --- a/multicall/constants.py +++ b/multicall/constants.py @@ -1,3 +1,4 @@ +import asyncio import os from enum import IntEnum from typing import Dict @@ -176,3 +177,7 @@ class Network(IntEnum): NUM_PROCESSES = min(user_choice, parallelism_capacity) NO_STATE_OVERRIDE = [ Network.Gnosis, Network.Harmony, Network.Moonbeam, Network.Moonriver, Network.Kovan, Network.Fuse ] + +# NOTE: If we run too many async calls at once, we'll have memory issues. +# Feel free to increase this with the "MULTICALL_CALL_SEMAPHORE" env var if you know what you're doing. +ASYNC_SEMAPHORE = asyncio.Semaphore(int(os.environ.get("MULTICALL_CALL_SEMAPHORE", 1000))) diff --git a/multicall/multicall.py b/multicall/multicall.py index 8ffc81c..2a9f19c 100644 --- a/multicall/multicall.py +++ b/multicall/multicall.py @@ -7,8 +7,9 @@ from web3 import Web3 from multicall import Call -from multicall.constants import (GAS_LIMIT, MULTICALL2_ADDRESSES, MULTICALL2_BYTECODE, - MULTICALL3_BYTECODE, MULTICALL3_ADDRESSES, w3) +from multicall.constants import (ASYNC_SEMAPHORE, GAS_LIMIT, + MULTICALL2_ADDRESSES, MULTICALL3_ADDRESSES, + MULTICALL3_BYTECODE, w3) from multicall.loggers import setup_logger from multicall.utils import (await_awaitable, chain_id, gather, run_in_subprocess, state_override_supported) @@ -54,9 +55,12 @@ def __init__( def __call__(self) -> Dict[str,Any]: start = time() - response = await_awaitable(self.coroutine()) + response = await_awaitable(self) logger.debug(f"Multicall took {time() - start}s") return response + + async def __await__(self) -> Dict[str,Any]: + return await self.coroutine() async def coroutine(self) -> Dict[str,Any]: batches = await gather([ @@ -77,21 +81,22 @@ async def fetch_outputs(self, calls: List[Call], ConnErr_retries: int = 0, id: s if calls is None: calls = self.calls - try: - args = await run_in_subprocess(get_args, calls, self.require_success) - if self.require_success is True: - _, outputs = await self.aggregate.coroutine(args) - outputs = await run_in_subprocess(unpack_aggregate_outputs, outputs) - else: - _, _, outputs = await self.aggregate.coroutine(args) - outputs = await gather([ - run_in_subprocess(Call.decode_output, output, call.signature, call.returns, success) - for call, (success, output) in zip(calls, outputs) - ]) - logger.debug(f"coroutine {id} finished") - return outputs - except Exception as e: - _raise_or_proceed(e, len(calls), ConnErr_retries=ConnErr_retries) + async with ASYNC_SEMAPHORE: + try: + args = await run_in_subprocess(get_args, calls, self.require_success) + if self.require_success is True: + _, outputs = await self.aggregate.coroutine(args) + outputs = await run_in_subprocess(unpack_aggregate_outputs, outputs) + else: + _, _, outputs = await self.aggregate.coroutine(args) + outputs = await gather([ + run_in_subprocess(Call.decode_output, output, call.signature, call.returns, success) + for call, (success, output) in zip(calls, outputs) + ]) + logger.debug(f"coroutine {id} finished") + return outputs + except Exception as e: + _raise_or_proceed(e, len(calls), ConnErr_retries=ConnErr_retries) # Failed, we need to rebatch the calls and try again. batch_results = await gather([ diff --git a/multicall/signature.py b/multicall/signature.py index 994f544..f8de537 100644 --- a/multicall/signature.py +++ b/multicall/signature.py @@ -1,3 +1,4 @@ +from functools import lru_cache from typing import Any, List, Optional, Tuple # For eth_abi versions < 2.2.0, `decode` and `encode` have not yet been added. @@ -11,6 +12,8 @@ from eth_utils import function_signature_to_4byte_selector +get_4byte_selector = lru_cache(maxsize=None)(function_signature_to_4byte_selector) + def parse_signature(signature: str) -> Tuple[str, List[TypeStr], List[TypeStr]]: """ Breaks 'func(address)(uint256)' into ['func', ['address'], ['uint256']] @@ -58,7 +61,10 @@ class Signature: def __init__(self, signature: str) -> None: self.signature = signature self.function, self.input_types, self.output_types = parse_signature(signature) - self.fourbyte = function_signature_to_4byte_selector(self.function) + + @property + def fourbyte(self) -> bytes: + return get_4byte_selector(self.function) def encode_data(self, args: Optional[Any] = None) -> bytes: return self.fourbyte + encode(self.input_types, args) if args else self.fourbyte diff --git a/tests/test_call.py b/tests/test_call.py index 1150565..1db7b31 100644 --- a/tests/test_call.py +++ b/tests/test_call.py @@ -29,7 +29,7 @@ def test_call_with_predefined_args(): def test_call_async(): call = Call(CHAI, 'name()(string)', [['name', None]]) - assert await_awaitable(call.coroutine()) == {'name': 'Chai'} + assert await_awaitable(call) == {'name': 'Chai'} def test_call_with_args_async(): @@ -39,7 +39,7 @@ def test_call_with_args_async(): def test_call_with_predefined_args_async(): call = Call(CHAI, ['balanceOf(address)(uint256)', CHAI], [['balance', from_wei]]) - assert isinstance(await_awaitable(call.coroutine())['balance'], float) + assert isinstance(await_awaitable(call)['balance'], float) def test_call_threading(): diff --git a/tests/test_multicall.py b/tests/test_multicall.py index 80e0df7..9e057e1 100644 --- a/tests/test_multicall.py +++ b/tests/test_multicall.py @@ -56,7 +56,7 @@ def test_multicall_async(): Call(CHAI, 'totalSupply()(uint256)', [['supply', from_wei]]), Call(CHAI, ['balanceOf(address)(uint256)', CHAI], [['balance', from_ray]]), ]) - result = await_awaitable(multi.coroutine()) + result = await_awaitable(multi) print(result) assert isinstance(result['supply'], float) assert isinstance(result['balance'], float) @@ -69,7 +69,7 @@ def test_multicall_no_success_async(): ], require_success=False ) - result = await_awaitable(multi.coroutine()) + result = await_awaitable(multi) print(result) assert isinstance(result['success'], tuple) assert isinstance(result['balance'], tuple)