Skip to content

Commit

Permalink
Async call semaphore (#71)
Browse files Browse the repository at this point in the history
* feat: async call semaphore

* chore: renamve env var

* feat: JIT call args to reduce memory

* feat: await method

* feat: await method

* feat: 4byte cache

* fix: missing import

* feat: multicall semaphore
  • Loading branch information
BobTheBuidler authored Apr 21, 2023
1 parent fae20da commit 84cc615
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 35 deletions.
28 changes: 16 additions & 12 deletions multicall/call.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from web3 import Web3

from multicall import Signature
from multicall.constants import Network, w3
from multicall.constants import Network, w3, ASYNC_SEMAPHORE
from multicall.exceptions import StateOverrideNotSupported
from multicall.loggers import setup_logger
from multicall.utils import (chain_id, get_async_w3, run_in_subprocess,
Expand Down Expand Up @@ -100,6 +100,9 @@ def __call__(self, args: Optional[Any] = None, _w3: Optional[Web3] = None) -> An
self.signature,
self.returns,
)

async def __await__(self) -> Any:
return await self.coroutine()

@eth_retry.auto_retry
async def coroutine(self, args: Optional[Any] = None, _w3: Optional[Web3] = None) -> Any:
Expand All @@ -108,17 +111,18 @@ async def coroutine(self, args: Optional[Any] = None, _w3: Optional[Web3] = None
if self.state_override_code and not state_override_supported(_w3):
raise StateOverrideNotSupported(f'State override is not supported on {Network(chain_id(_w3)).__repr__()[1:-1]}.')

args = await run_in_subprocess(
prep_args,
self.target,
self.signature,
args or self.args,
self.block_id,
self.gas_limit,
self.state_override_code,
)

output = await get_async_w3(_w3).eth.call(*args)
async with ASYNC_SEMAPHORE:
output = await get_async_w3(_w3).eth.call(
*await run_in_subprocess(
prep_args,
self.target,
self.signature,
args or self.args,
self.block_id,
self.gas_limit,
self.state_override_code,
)
)

return await run_in_subprocess(Call.decode_output, output, self.signature, self.returns)

Expand Down
5 changes: 5 additions & 0 deletions multicall/constants.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import os
from enum import IntEnum
from typing import Dict
Expand Down Expand Up @@ -176,3 +177,7 @@ class Network(IntEnum):
NUM_PROCESSES = min(user_choice, parallelism_capacity)

NO_STATE_OVERRIDE = [ Network.Gnosis, Network.Harmony, Network.Moonbeam, Network.Moonriver, Network.Kovan, Network.Fuse ]

# NOTE: If we run too many async calls at once, we'll have memory issues.
# Feel free to increase this with the "MULTICALL_CALL_SEMAPHORE" env var if you know what you're doing.
ASYNC_SEMAPHORE = asyncio.Semaphore(int(os.environ.get("MULTICALL_CALL_SEMAPHORE", 1000)))
41 changes: 23 additions & 18 deletions multicall/multicall.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
from web3 import Web3

from multicall import Call
from multicall.constants import (GAS_LIMIT, MULTICALL2_ADDRESSES, MULTICALL2_BYTECODE,
MULTICALL3_BYTECODE, MULTICALL3_ADDRESSES, w3)
from multicall.constants import (ASYNC_SEMAPHORE, GAS_LIMIT,
MULTICALL2_ADDRESSES, MULTICALL3_ADDRESSES,
MULTICALL3_BYTECODE, w3)
from multicall.loggers import setup_logger
from multicall.utils import (await_awaitable, chain_id, gather,
run_in_subprocess, state_override_supported)
Expand Down Expand Up @@ -54,9 +55,12 @@ def __init__(

def __call__(self) -> Dict[str,Any]:
start = time()
response = await_awaitable(self.coroutine())
response = await_awaitable(self)
logger.debug(f"Multicall took {time() - start}s")
return response

async def __await__(self) -> Dict[str,Any]:
return await self.coroutine()

async def coroutine(self) -> Dict[str,Any]:
batches = await gather([
Expand All @@ -77,21 +81,22 @@ async def fetch_outputs(self, calls: List[Call], ConnErr_retries: int = 0, id: s
if calls is None:
calls = self.calls

try:
args = await run_in_subprocess(get_args, calls, self.require_success)
if self.require_success is True:
_, outputs = await self.aggregate.coroutine(args)
outputs = await run_in_subprocess(unpack_aggregate_outputs, outputs)
else:
_, _, outputs = await self.aggregate.coroutine(args)
outputs = await gather([
run_in_subprocess(Call.decode_output, output, call.signature, call.returns, success)
for call, (success, output) in zip(calls, outputs)
])
logger.debug(f"coroutine {id} finished")
return outputs
except Exception as e:
_raise_or_proceed(e, len(calls), ConnErr_retries=ConnErr_retries)
async with ASYNC_SEMAPHORE:
try:
args = await run_in_subprocess(get_args, calls, self.require_success)
if self.require_success is True:
_, outputs = await self.aggregate.coroutine(args)
outputs = await run_in_subprocess(unpack_aggregate_outputs, outputs)
else:
_, _, outputs = await self.aggregate.coroutine(args)
outputs = await gather([
run_in_subprocess(Call.decode_output, output, call.signature, call.returns, success)
for call, (success, output) in zip(calls, outputs)
])
logger.debug(f"coroutine {id} finished")
return outputs
except Exception as e:
_raise_or_proceed(e, len(calls), ConnErr_retries=ConnErr_retries)

# Failed, we need to rebatch the calls and try again.
batch_results = await gather([
Expand Down
8 changes: 7 additions & 1 deletion multicall/signature.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from functools import lru_cache
from typing import Any, List, Optional, Tuple

# For eth_abi versions < 2.2.0, `decode` and `encode` have not yet been added.
Expand All @@ -11,6 +12,8 @@
from eth_utils import function_signature_to_4byte_selector


get_4byte_selector = lru_cache(maxsize=None)(function_signature_to_4byte_selector)

def parse_signature(signature: str) -> Tuple[str, List[TypeStr], List[TypeStr]]:
"""
Breaks 'func(address)(uint256)' into ['func', ['address'], ['uint256']]
Expand Down Expand Up @@ -58,7 +61,10 @@ class Signature:
def __init__(self, signature: str) -> None:
self.signature = signature
self.function, self.input_types, self.output_types = parse_signature(signature)
self.fourbyte = function_signature_to_4byte_selector(self.function)

@property
def fourbyte(self) -> bytes:
return get_4byte_selector(self.function)

def encode_data(self, args: Optional[Any] = None) -> bytes:
return self.fourbyte + encode(self.input_types, args) if args else self.fourbyte
Expand Down
4 changes: 2 additions & 2 deletions tests/test_call.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def test_call_with_predefined_args():

def test_call_async():
call = Call(CHAI, 'name()(string)', [['name', None]])
assert await_awaitable(call.coroutine()) == {'name': 'Chai'}
assert await_awaitable(call) == {'name': 'Chai'}


def test_call_with_args_async():
Expand All @@ -39,7 +39,7 @@ def test_call_with_args_async():

def test_call_with_predefined_args_async():
call = Call(CHAI, ['balanceOf(address)(uint256)', CHAI], [['balance', from_wei]])
assert isinstance(await_awaitable(call.coroutine())['balance'], float)
assert isinstance(await_awaitable(call)['balance'], float)


def test_call_threading():
Expand Down
4 changes: 2 additions & 2 deletions tests/test_multicall.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def test_multicall_async():
Call(CHAI, 'totalSupply()(uint256)', [['supply', from_wei]]),
Call(CHAI, ['balanceOf(address)(uint256)', CHAI], [['balance', from_ray]]),
])
result = await_awaitable(multi.coroutine())
result = await_awaitable(multi)
print(result)
assert isinstance(result['supply'], float)
assert isinstance(result['balance'], float)
Expand All @@ -69,7 +69,7 @@ def test_multicall_no_success_async():
],
require_success=False
)
result = await_awaitable(multi.coroutine())
result = await_awaitable(multi)
print(result)
assert isinstance(result['success'], tuple)
assert isinstance(result['balance'], tuple)
Expand Down

0 comments on commit 84cc615

Please sign in to comment.