Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async call semaphore #71

Merged
merged 8 commits into from
Apr 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 16 additions & 12 deletions multicall/call.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from web3 import Web3

from multicall import Signature
from multicall.constants import Network, w3
from multicall.constants import Network, w3, ASYNC_SEMAPHORE
from multicall.exceptions import StateOverrideNotSupported
from multicall.loggers import setup_logger
from multicall.utils import (chain_id, get_async_w3, run_in_subprocess,
Expand Down Expand Up @@ -100,6 +100,9 @@ def __call__(self, args: Optional[Any] = None, _w3: Optional[Web3] = None) -> An
self.signature,
self.returns,
)

async def __await__(self) -> Any:
return await self.coroutine()

@eth_retry.auto_retry
async def coroutine(self, args: Optional[Any] = None, _w3: Optional[Web3] = None) -> Any:
Expand All @@ -108,17 +111,18 @@ async def coroutine(self, args: Optional[Any] = None, _w3: Optional[Web3] = None
if self.state_override_code and not state_override_supported(_w3):
raise StateOverrideNotSupported(f'State override is not supported on {Network(chain_id(_w3)).__repr__()[1:-1]}.')

args = await run_in_subprocess(
prep_args,
self.target,
self.signature,
args or self.args,
self.block_id,
self.gas_limit,
self.state_override_code,
)

output = await get_async_w3(_w3).eth.call(*args)
async with ASYNC_SEMAPHORE:
output = await get_async_w3(_w3).eth.call(
*await run_in_subprocess(
prep_args,
self.target,
self.signature,
args or self.args,
self.block_id,
self.gas_limit,
self.state_override_code,
)
)

return await run_in_subprocess(Call.decode_output, output, self.signature, self.returns)

Expand Down
5 changes: 5 additions & 0 deletions multicall/constants.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import os
from enum import IntEnum
from typing import Dict
Expand Down Expand Up @@ -176,3 +177,7 @@ class Network(IntEnum):
NUM_PROCESSES = min(user_choice, parallelism_capacity)

NO_STATE_OVERRIDE = [ Network.Gnosis, Network.Harmony, Network.Moonbeam, Network.Moonriver, Network.Kovan, Network.Fuse ]

# NOTE: If we run too many async calls at once, we'll have memory issues.
# Feel free to increase this with the "MULTICALL_CALL_SEMAPHORE" env var if you know what you're doing.
ASYNC_SEMAPHORE = asyncio.Semaphore(int(os.environ.get("MULTICALL_CALL_SEMAPHORE", 1000)))
41 changes: 23 additions & 18 deletions multicall/multicall.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
from web3 import Web3

from multicall import Call
from multicall.constants import (GAS_LIMIT, MULTICALL2_ADDRESSES, MULTICALL2_BYTECODE,
MULTICALL3_BYTECODE, MULTICALL3_ADDRESSES, w3)
from multicall.constants import (ASYNC_SEMAPHORE, GAS_LIMIT,
MULTICALL2_ADDRESSES, MULTICALL3_ADDRESSES,
MULTICALL3_BYTECODE, w3)
from multicall.loggers import setup_logger
from multicall.utils import (await_awaitable, chain_id, gather,
run_in_subprocess, state_override_supported)
Expand Down Expand Up @@ -54,9 +55,12 @@ def __init__(

def __call__(self) -> Dict[str,Any]:
start = time()
response = await_awaitable(self.coroutine())
response = await_awaitable(self)
logger.debug(f"Multicall took {time() - start}s")
return response

async def __await__(self) -> Dict[str,Any]:
return await self.coroutine()

async def coroutine(self) -> Dict[str,Any]:
batches = await gather([
Expand All @@ -77,21 +81,22 @@ async def fetch_outputs(self, calls: List[Call], ConnErr_retries: int = 0, id: s
if calls is None:
calls = self.calls

try:
args = await run_in_subprocess(get_args, calls, self.require_success)
if self.require_success is True:
_, outputs = await self.aggregate.coroutine(args)
outputs = await run_in_subprocess(unpack_aggregate_outputs, outputs)
else:
_, _, outputs = await self.aggregate.coroutine(args)
outputs = await gather([
run_in_subprocess(Call.decode_output, output, call.signature, call.returns, success)
for call, (success, output) in zip(calls, outputs)
])
logger.debug(f"coroutine {id} finished")
return outputs
except Exception as e:
_raise_or_proceed(e, len(calls), ConnErr_retries=ConnErr_retries)
async with ASYNC_SEMAPHORE:
try:
args = await run_in_subprocess(get_args, calls, self.require_success)
if self.require_success is True:
_, outputs = await self.aggregate.coroutine(args)
outputs = await run_in_subprocess(unpack_aggregate_outputs, outputs)
else:
_, _, outputs = await self.aggregate.coroutine(args)
outputs = await gather([
run_in_subprocess(Call.decode_output, output, call.signature, call.returns, success)
for call, (success, output) in zip(calls, outputs)
])
logger.debug(f"coroutine {id} finished")
return outputs
except Exception as e:
_raise_or_proceed(e, len(calls), ConnErr_retries=ConnErr_retries)

# Failed, we need to rebatch the calls and try again.
batch_results = await gather([
Expand Down
8 changes: 7 additions & 1 deletion multicall/signature.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from functools import lru_cache
from typing import Any, List, Optional, Tuple

# For eth_abi versions < 2.2.0, `decode` and `encode` have not yet been added.
Expand All @@ -11,6 +12,8 @@
from eth_utils import function_signature_to_4byte_selector


get_4byte_selector = lru_cache(maxsize=None)(function_signature_to_4byte_selector)

def parse_signature(signature: str) -> Tuple[str, List[TypeStr], List[TypeStr]]:
"""
Breaks 'func(address)(uint256)' into ['func', ['address'], ['uint256']]
Expand Down Expand Up @@ -58,7 +61,10 @@ class Signature:
def __init__(self, signature: str) -> None:
self.signature = signature
self.function, self.input_types, self.output_types = parse_signature(signature)
self.fourbyte = function_signature_to_4byte_selector(self.function)

@property
def fourbyte(self) -> bytes:
return get_4byte_selector(self.function)

def encode_data(self, args: Optional[Any] = None) -> bytes:
return self.fourbyte + encode(self.input_types, args) if args else self.fourbyte
Expand Down
4 changes: 2 additions & 2 deletions tests/test_call.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def test_call_with_predefined_args():

def test_call_async():
call = Call(CHAI, 'name()(string)', [['name', None]])
assert await_awaitable(call.coroutine()) == {'name': 'Chai'}
assert await_awaitable(call) == {'name': 'Chai'}


def test_call_with_args_async():
Expand All @@ -39,7 +39,7 @@ def test_call_with_args_async():

def test_call_with_predefined_args_async():
call = Call(CHAI, ['balanceOf(address)(uint256)', CHAI], [['balance', from_wei]])
assert isinstance(await_awaitable(call.coroutine())['balance'], float)
assert isinstance(await_awaitable(call)['balance'], float)


def test_call_threading():
Expand Down
4 changes: 2 additions & 2 deletions tests/test_multicall.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def test_multicall_async():
Call(CHAI, 'totalSupply()(uint256)', [['supply', from_wei]]),
Call(CHAI, ['balanceOf(address)(uint256)', CHAI], [['balance', from_ray]]),
])
result = await_awaitable(multi.coroutine())
result = await_awaitable(multi)
print(result)
assert isinstance(result['supply'], float)
assert isinstance(result['balance'], float)
Expand All @@ -69,7 +69,7 @@ def test_multicall_no_success_async():
],
require_success=False
)
result = await_awaitable(multi.coroutine())
result = await_awaitable(multi)
print(result)
assert isinstance(result['success'], tuple)
assert isinstance(result['balance'], tuple)
Expand Down