From 168e299291951b4f012a32919c0887b9cbd7696d Mon Sep 17 00:00:00 2001 From: BobTheBuidler Date: Tue, 18 Apr 2023 15:34:54 +0000 Subject: [PATCH 1/8] feat: async call semaphore --- multicall/call.py | 6 +++--- multicall/constants.py | 4 ++++ 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/multicall/call.py b/multicall/call.py index fbb9d932..663990e4 100644 --- a/multicall/call.py +++ b/multicall/call.py @@ -7,7 +7,7 @@ from web3 import Web3 from multicall import Signature -from multicall.constants import Network, w3 +from multicall.constants import Network, w3, ASYNC_SEMAPHORE from multicall.exceptions import StateOverrideNotSupported from multicall.loggers import setup_logger from multicall.utils import (chain_id, get_async_w3, run_in_subprocess, @@ -117,8 +117,8 @@ async def coroutine(self, args: Optional[Any] = None, _w3: Optional[Web3] = None self.gas_limit, self.state_override_code, ) - - output = await get_async_w3(_w3).eth.call(*args) + async with ASYNC_SEMAPHORE: + output = await get_async_w3(_w3).eth.call(*args) return await run_in_subprocess(Call.decode_output, output, self.signature, self.returns) diff --git a/multicall/constants.py b/multicall/constants.py index 105f7a0d..e72aa780 100644 --- a/multicall/constants.py +++ b/multicall/constants.py @@ -1,3 +1,4 @@ +import asyncio import os from enum import IntEnum from typing import Dict @@ -176,3 +177,6 @@ class Network(IntEnum): NUM_PROCESSES = min(user_choice, parallelism_capacity) NO_STATE_OVERRIDE = [ Network.Gnosis, Network.Harmony, Network.Moonbeam, Network.Moonriver, Network.Kovan, Network.Fuse ] + +# If we gather too many calls at once, we'll have memory issues. This only impacts Calls, not Multicalls. +ASYNC_SEMAPHORE = asyncio.Semaphore(int(os.environ.get("ASYNC_SEMAPHORE", 100_000))) From 55979e129fde3be69beb717302b860cfdd6c137a Mon Sep 17 00:00:00 2001 From: BobTheBuidler Date: Tue, 18 Apr 2023 15:36:17 +0000 Subject: [PATCH 2/8] chore: renamve env var --- multicall/constants.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/multicall/constants.py b/multicall/constants.py index e72aa780..043f0212 100644 --- a/multicall/constants.py +++ b/multicall/constants.py @@ -179,4 +179,4 @@ class Network(IntEnum): NO_STATE_OVERRIDE = [ Network.Gnosis, Network.Harmony, Network.Moonbeam, Network.Moonriver, Network.Kovan, Network.Fuse ] # If we gather too many calls at once, we'll have memory issues. This only impacts Calls, not Multicalls. -ASYNC_SEMAPHORE = asyncio.Semaphore(int(os.environ.get("ASYNC_SEMAPHORE", 100_000))) +ASYNC_SEMAPHORE = asyncio.Semaphore(int(os.environ.get("MULTICALL_CALL_SEMAPHORE", 100_000))) From a1e019adcad54f66891b67c1a96f9a2704b5a23d Mon Sep 17 00:00:00 2001 From: BobTheBuidler <70677534+BobTheBuidler@users.noreply.github.com> Date: Wed, 19 Apr 2023 09:15:10 -0400 Subject: [PATCH 3/8] feat: JIT call args to reduce memory --- multicall/call.py | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/multicall/call.py b/multicall/call.py index 663990e4..ffac445b 100644 --- a/multicall/call.py +++ b/multicall/call.py @@ -108,17 +108,18 @@ async def coroutine(self, args: Optional[Any] = None, _w3: Optional[Web3] = None if self.state_override_code and not state_override_supported(_w3): raise StateOverrideNotSupported(f'State override is not supported on {Network(chain_id(_w3)).__repr__()[1:-1]}.') - args = await run_in_subprocess( - prep_args, - self.target, - self.signature, - args or self.args, - self.block_id, - self.gas_limit, - self.state_override_code, - ) async with ASYNC_SEMAPHORE: - output = await get_async_w3(_w3).eth.call(*args) + output = await get_async_w3(_w3).eth.call( + *await run_in_subprocess( + prep_args, + self.target, + self.signature, + args or self.args, + self.block_id, + self.gas_limit, + self.state_override_code, + ) + ) return await run_in_subprocess(Call.decode_output, output, self.signature, self.returns) From d273771672f4596530df1a536bf100368028663b Mon Sep 17 00:00:00 2001 From: BobTheBuidler <70677534+BobTheBuidler@users.noreply.github.com> Date: Wed, 19 Apr 2023 09:16:11 -0400 Subject: [PATCH 4/8] feat: await method --- multicall/call.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/multicall/call.py b/multicall/call.py index ffac445b..d20b1c68 100644 --- a/multicall/call.py +++ b/multicall/call.py @@ -100,6 +100,9 @@ def __call__(self, args: Optional[Any] = None, _w3: Optional[Web3] = None) -> An self.signature, self.returns, ) + + async def __await__(self) -> Any: + return await self.coroutine() @eth_retry.auto_retry async def coroutine(self, args: Optional[Any] = None, _w3: Optional[Web3] = None) -> Any: From 125432fce5d8fce93692a085278d049a2febdfaf Mon Sep 17 00:00:00 2001 From: BobTheBuidler <70677534+BobTheBuidler@users.noreply.github.com> Date: Wed, 19 Apr 2023 09:17:27 -0400 Subject: [PATCH 5/8] feat: await method --- multicall/multicall.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/multicall/multicall.py b/multicall/multicall.py index 8ffc81ce..f9c67d56 100644 --- a/multicall/multicall.py +++ b/multicall/multicall.py @@ -57,6 +57,9 @@ def __call__(self) -> Dict[str,Any]: response = await_awaitable(self.coroutine()) logger.debug(f"Multicall took {time() - start}s") return response + + async def __await__(self) -> Dict[str,Any]: + return await self.coroutine() async def coroutine(self) -> Dict[str,Any]: batches = await gather([ From 3e9dc5624e8d1fac3a465bb57486e4334313e757 Mon Sep 17 00:00:00 2001 From: BobTheBuidler <70677534+BobTheBuidler@users.noreply.github.com> Date: Wed, 19 Apr 2023 10:03:09 -0400 Subject: [PATCH 6/8] feat: 4byte cache --- multicall/signature.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/multicall/signature.py b/multicall/signature.py index 994f5442..911a81de 100644 --- a/multicall/signature.py +++ b/multicall/signature.py @@ -11,6 +11,8 @@ from eth_utils import function_signature_to_4byte_selector +get_4byte_selector = lru_cache(maxsize=None)(function_signature_to_4byte_selector) + def parse_signature(signature: str) -> Tuple[str, List[TypeStr], List[TypeStr]]: """ Breaks 'func(address)(uint256)' into ['func', ['address'], ['uint256']] @@ -58,7 +60,10 @@ class Signature: def __init__(self, signature: str) -> None: self.signature = signature self.function, self.input_types, self.output_types = parse_signature(signature) - self.fourbyte = function_signature_to_4byte_selector(self.function) + + @property + def fourbyte(self) -> bytes: + return get_4byte_selector(self.function) def encode_data(self, args: Optional[Any] = None) -> bytes: return self.fourbyte + encode(self.input_types, args) if args else self.fourbyte From 7cdab6188dc48c5eb321bff07cfb9ff6950417a9 Mon Sep 17 00:00:00 2001 From: BobTheBuidler <70677534+BobTheBuidler@users.noreply.github.com> Date: Wed, 19 Apr 2023 10:24:02 -0400 Subject: [PATCH 7/8] fix: missing import --- multicall/signature.py | 1 + 1 file changed, 1 insertion(+) diff --git a/multicall/signature.py b/multicall/signature.py index 911a81de..f8de5374 100644 --- a/multicall/signature.py +++ b/multicall/signature.py @@ -1,3 +1,4 @@ +from functools import lru_cache from typing import Any, List, Optional, Tuple # For eth_abi versions < 2.2.0, `decode` and `encode` have not yet been added. From f6c6169e54ec9bd1eb8a5393f31b8936901e7f70 Mon Sep 17 00:00:00 2001 From: BobTheBuidler Date: Fri, 21 Apr 2023 01:16:36 +0000 Subject: [PATCH 8/8] feat: multicall semaphore --- multicall/constants.py | 5 +++-- multicall/multicall.py | 38 ++++++++++++++++++++------------------ tests/test_call.py | 4 ++-- tests/test_multicall.py | 4 ++-- 4 files changed, 27 insertions(+), 24 deletions(-) diff --git a/multicall/constants.py b/multicall/constants.py index 043f0212..e97c27dd 100644 --- a/multicall/constants.py +++ b/multicall/constants.py @@ -178,5 +178,6 @@ class Network(IntEnum): NO_STATE_OVERRIDE = [ Network.Gnosis, Network.Harmony, Network.Moonbeam, Network.Moonriver, Network.Kovan, Network.Fuse ] -# If we gather too many calls at once, we'll have memory issues. This only impacts Calls, not Multicalls. -ASYNC_SEMAPHORE = asyncio.Semaphore(int(os.environ.get("MULTICALL_CALL_SEMAPHORE", 100_000))) +# NOTE: If we run too many async calls at once, we'll have memory issues. +# Feel free to increase this with the "MULTICALL_CALL_SEMAPHORE" env var if you know what you're doing. +ASYNC_SEMAPHORE = asyncio.Semaphore(int(os.environ.get("MULTICALL_CALL_SEMAPHORE", 1000))) diff --git a/multicall/multicall.py b/multicall/multicall.py index f9c67d56..2a9f19c2 100644 --- a/multicall/multicall.py +++ b/multicall/multicall.py @@ -7,8 +7,9 @@ from web3 import Web3 from multicall import Call -from multicall.constants import (GAS_LIMIT, MULTICALL2_ADDRESSES, MULTICALL2_BYTECODE, - MULTICALL3_BYTECODE, MULTICALL3_ADDRESSES, w3) +from multicall.constants import (ASYNC_SEMAPHORE, GAS_LIMIT, + MULTICALL2_ADDRESSES, MULTICALL3_ADDRESSES, + MULTICALL3_BYTECODE, w3) from multicall.loggers import setup_logger from multicall.utils import (await_awaitable, chain_id, gather, run_in_subprocess, state_override_supported) @@ -54,7 +55,7 @@ def __init__( def __call__(self) -> Dict[str,Any]: start = time() - response = await_awaitable(self.coroutine()) + response = await_awaitable(self) logger.debug(f"Multicall took {time() - start}s") return response @@ -80,21 +81,22 @@ async def fetch_outputs(self, calls: List[Call], ConnErr_retries: int = 0, id: s if calls is None: calls = self.calls - try: - args = await run_in_subprocess(get_args, calls, self.require_success) - if self.require_success is True: - _, outputs = await self.aggregate.coroutine(args) - outputs = await run_in_subprocess(unpack_aggregate_outputs, outputs) - else: - _, _, outputs = await self.aggregate.coroutine(args) - outputs = await gather([ - run_in_subprocess(Call.decode_output, output, call.signature, call.returns, success) - for call, (success, output) in zip(calls, outputs) - ]) - logger.debug(f"coroutine {id} finished") - return outputs - except Exception as e: - _raise_or_proceed(e, len(calls), ConnErr_retries=ConnErr_retries) + async with ASYNC_SEMAPHORE: + try: + args = await run_in_subprocess(get_args, calls, self.require_success) + if self.require_success is True: + _, outputs = await self.aggregate.coroutine(args) + outputs = await run_in_subprocess(unpack_aggregate_outputs, outputs) + else: + _, _, outputs = await self.aggregate.coroutine(args) + outputs = await gather([ + run_in_subprocess(Call.decode_output, output, call.signature, call.returns, success) + for call, (success, output) in zip(calls, outputs) + ]) + logger.debug(f"coroutine {id} finished") + return outputs + except Exception as e: + _raise_or_proceed(e, len(calls), ConnErr_retries=ConnErr_retries) # Failed, we need to rebatch the calls and try again. batch_results = await gather([ diff --git a/tests/test_call.py b/tests/test_call.py index 1150565d..1db7b31a 100644 --- a/tests/test_call.py +++ b/tests/test_call.py @@ -29,7 +29,7 @@ def test_call_with_predefined_args(): def test_call_async(): call = Call(CHAI, 'name()(string)', [['name', None]]) - assert await_awaitable(call.coroutine()) == {'name': 'Chai'} + assert await_awaitable(call) == {'name': 'Chai'} def test_call_with_args_async(): @@ -39,7 +39,7 @@ def test_call_with_args_async(): def test_call_with_predefined_args_async(): call = Call(CHAI, ['balanceOf(address)(uint256)', CHAI], [['balance', from_wei]]) - assert isinstance(await_awaitable(call.coroutine())['balance'], float) + assert isinstance(await_awaitable(call)['balance'], float) def test_call_threading(): diff --git a/tests/test_multicall.py b/tests/test_multicall.py index 80e0df7a..9e057e15 100644 --- a/tests/test_multicall.py +++ b/tests/test_multicall.py @@ -56,7 +56,7 @@ def test_multicall_async(): Call(CHAI, 'totalSupply()(uint256)', [['supply', from_wei]]), Call(CHAI, ['balanceOf(address)(uint256)', CHAI], [['balance', from_ray]]), ]) - result = await_awaitable(multi.coroutine()) + result = await_awaitable(multi) print(result) assert isinstance(result['supply'], float) assert isinstance(result['balance'], float) @@ -69,7 +69,7 @@ def test_multicall_no_success_async(): ], require_success=False ) - result = await_awaitable(multi.coroutine()) + result = await_awaitable(multi) print(result) assert isinstance(result['success'], tuple) assert isinstance(result['balance'], tuple)