Skip to content

Commit

Permalink
Use asyncio for TCP/TLS comms (#5450)
Browse files Browse the repository at this point in the history
* Use asyncio for TCP/TLS comms

This is a PR for using asyncio instead of tornado for TCP/TLS comms.

There are a few goals here:

- Reduce our dependency on tornado, in favor of the builtin asyncio
support
- Lower latency for small/medium sized messages. Right now the tornado
call stack in the IOStream interface increases our latency for small
messages. We can do better here by making use of asyncio protocols.
- Equal performance for large messages. We should be able to make zero
copy reads/writes just as before. In this case I wouldn't expect a
performance increase from asyncio for large (10 MiB+) messages, but we
also shouldn't see a slowdown.
- Improved TLS performance. The TLS implementation in asyncio (and
more-so in uvloop) is better optimized than the implementation in
tornado. We should see a measurable performance boost here.
- Reduced GIL contention when using TLS. Right now a single write or
read through tornado drops and reaquires the GIL several times. If there
are busy background threads, this can lead to the IO thread having to
wait for the GIL mutex multiple times, leading to lower IO
performance. The implementations in asyncio/uvloop don't drop the GIL
except for IO (instead of also dropping it for TLS operations), which
leads to fewer chances of another thread picking up the GIL and slowing
IO.
  • Loading branch information
jcrist authored Dec 10, 2021
1 parent b9f0517 commit bdc2189
Show file tree
Hide file tree
Showing 9 changed files with 1,075 additions and 52 deletions.
23 changes: 22 additions & 1 deletion distributed/comm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,32 @@
unparse_host_port,
)
from .core import Comm, CommClosedError, connect, listen
from .registry import backends
from .utils import get_tcp_server_address, get_tcp_server_addresses


def _register_transports():
from . import inproc, tcp, ws
import dask.config

from . import inproc, ws

tcp_backend = dask.config.get("distributed.comm.tcp.backend")

if tcp_backend == "asyncio":
from . import asyncio_tcp

backends["tcp"] = asyncio_tcp.TCPBackend()
backends["tls"] = asyncio_tcp.TLSBackend()
elif tcp_backend == "tornado":
from . import tcp

backends["tcp"] = tcp.TCPBackend()
backends["tls"] = tcp.TLSBackend()
else:
raise ValueError(
f"Expected `distributed.comm.tcp.backend` to be in `('asyncio', "
f"'tornado')`, got {tcp_backend}"
)

try:
from . import ucx
Expand Down
Loading

0 comments on commit bdc2189

Please sign in to comment.