-
Notifications
You must be signed in to change notification settings - Fork 358
Concurrency in Bittensor
With the release of bittensor 9.0, we finally have a stable asyncio method for using the bittensor SDK. This has been the result of a number of trial-and-error attempts over the past several months that led to our rewrite of py-substrate-interface as async-substrate-interface (which also includes a synchronous implementation).
When initially looking at the btcli, there were many obvious situations where concurrent substrate calls could substantially increase the overall performance of the program. However, the websocket implementation in py-substrate-interface did not allow for this. As someone who has been writing Python asyncio code since 2019, asyncio seemed to be the obvious choice for doing things in a concurrent manner. When btcli and bittensor (SDK) were broken apart in version 8.0, we got the chance to really start experimenting with different implementations for the CLI — initially starting as just a handful of parts of py-substrate-interface rewritten as async to work with the specific CLI functions as needed. When this concept had proven itself, we began work on expanding these changes and implementing them into bittensor (SDK). As we were rewriting it, we came across a few places in the original codebase that could be significantly improved regardless of async, so we wrote a synchronous compatible version of the async-substrate-interface with our non-async improvements added. Eventually, we ported basically the entire library as async-substrate-interface. Though we built it for use in bittensor, there’s nothing about it that makes it inherently need to be tied to bittensor, and as such any substrate chain should function under it.
We had a few goals in mind regarding concurrency. I personally believe the correct approach to doing concurrency in Python is through asyncio, but this can involve changing a codebase significantly to achieve desired results. So, we also wanted concurrency available on the synchronous Subtensor object. We initially designed this to work with the reuse
function below, as well as the other two. However, there were some serious latency/rate-limiting issues, so this functionality has been removed.
import asyncio
from bittensor.core.subtensor import Subtensor
from bittensor.core.async_subtensor import AsyncSubtensor
from concurrent.futures import ThreadPoolExecutor
def reuse():
def reuse_(netuid: int):
return subtensor.tempo(netuid)
subtensor = Subtensor()
with ThreadPoolExecutor() as executor:
for tempo in executor.map(reuse_, [1,2,3]):
print(tempo)
def new():
def new_(netuid: int):
subtensor = Subtensor()
return subtensor.tempo(netuid)
with ThreadPoolExecutor() as executor:
for tempo in executor.map(new_, [1,2,3]):
print(tempo)
def async_use():
async def async_use_():
async def async_reuse(netuid: int):
return await async_subtensor.tempo(netuid)
async with AsyncSubtensor() as async_subtensor:
for tempo in await asyncio.gather(async_reuse(1), async_reuse(2), async_subtensor.tempo(3)):
print(tempo)
asyncio.run(async_use_())
- When gathering numerous queries, get the block hash first, and use this for the queries, e.g.
block_hash = await subtensor.substrate.get_chain_head()
balance, weight_commit_info, delegates = await asyncio.gather(
subtensor.get_balance(HOTKEY, block_hash=block_hash),
subtensor.get_current_weight_commit_info(NETUID, block_hash=block_hash),
subtensor.get_delegate_by_hotkey(HOTKEY, block_hash=block_hash)
)
This is significantly more efficient than it would be without specifying, because when the block_hash is not specified, we must first go get the chain head, and use that. Because there is concurrent execution, this would need to happen three times, rather than just the one, in this example.
- Instantiate your Async* classes with async with:
subtensor = AsyncSubtensor() # this cannot do the initial setup
await subtensor.substrate.initialize() # this does the initial setup
await subtensor.call...
await subtensor.substrate.close() # this ensures a clean closing of the websocket connection
# this can be much more easily simplified with:
async with AsyncSubtensor() as subtensor: # `initialize()` setup occurs in here
await subtensor.call...
# the connection is automatically closed at the end with the `async with` block
There are times at which you may not want to close the connection, which is why the initialize()
method exists independently of the __aenter__
method.
If you do manually initialize, you must await subtensor.close()
afterward, so as to ensure the websocket connection is closed correctly.
If the websocket is inactive for 10+ seconds, the connection closes automatically. This is to ensure we do not leave unused connections hanging indefinitely, while allowing the connection to be reused if used repeatedly. If the connection does close, it is automatically reopened on the next call, so this is not something the user needs to worry about.
- Use
with
for your Subtensor object to ensure proper websocket closing after your calls have been made, similar to tip #2 above. Likewise you can also use.close()
if that's easier for you.
There’s a slight difference between the original py-substrate-interface query results and the new async-substrate-interface query results. Previously, these were typically SCALE-decoded into Python SCALE objects. This was a painfully slow process. We greatly improved this by creating a Rust-based SCALE decoder called bt-decode. We’ve attempted to preserve a decent amount of backwards-compatibility by wrapping these values in a class called ScaleObj, which just contains the value, and can be accessed using the typical methods you would access true Scale objects, such as .value. For most users, this should be a drop-in replacement, but there may be a handful of edge-cases where you previously expected the object to look a bit different than it does now. This is across AsyncSubstrateInterface and SubstrateInterface. In the sync code, a single websocket connection is used per object, and is automatically closed when the websocket use has concluded. While this reconnection for every new call adds a small amount of overhead, it also makes us better citizens of the bittensor ecosystem — no longer leaving stale connections hanging open indefinitely. In addition, it means we no longer need to worry about silent connections failures that existed previously.