Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client/Scheduler gather not robust to busy worker #4698

Closed
fbriol opened this issue Apr 13, 2021 · 1 comment · Fixed by #7997 · May be fixed by #5546
Closed

Client/Scheduler gather not robust to busy worker #4698

fbriol opened this issue Apr 13, 2021 · 1 comment · Fixed by #7997 · May be fixed by #5546
Assignees
Labels
bug Something is broken

Comments

@fbriol
Copy link

fbriol commented Apr 13, 2021

When I run a lot of tasks on the CNES HPC with a big Dask cluster (512 threads/128 workers), I sometimes have communication errors between the scheduler and the workers. The error is thrown by the module "distributed/utils_comm.py," because the code tries to read the key 'data' which does not exist in the dictionary.

    /home/ad/briolf/anaconda3/lib/python3.8/site-packages/distributed/utils_comm.py in retry_operation
          383         dask.config.get("distributed.comm.retry.delay.max"), default="s"
          384     )
    ----> 385     return await retry(
          386         partial(coro, *args, **kwargs),
          387         count=retry_count,
    
    /home/ad/briolf/anaconda3/lib/python3.8/site-packages/distributed/utils_comm.py in retry
          368                 delay *= 1 + random.random() * jitter_fraction
          369             await asyncio.sleep(delay)
    ----> 370     return await coro()
          371 
          372 
    
    /home/ad/briolf/anaconda3/lib/python3.8/site-packages/distributed/core.py in send_recv_from_rpc
          860             name, comm.name = comm.name, "ConnectionPool." + key
          861             try:
    ----> 862                 result = await send_recv(comm=comm, op=key, **kwargs)
          863             finally:
          864                 self.pool.reuse(self.addr, comm)
    
    /home/ad/briolf/anaconda3/lib/python3.8/site-packages/distributed/core.py in send_recv
          659         if comm.deserialize:
          660             typ, exc, tb = clean_exception(**response)
    ----> 661             raise exc.with_traceback(tb)
          662         else:
          663             raise Exception(response["text"])
    
    /home/ad/briolf/anaconda3/lib/python3.8/site-packages/distributed/core.py in handle_comm
          499                             result = asyncio.ensure_future(result)
          500                             self._ongoing_coroutines.add(result)
    ----> 501                             result = await result
          502                     except (CommClosedError, CancelledError) as e:
          503                         if self.status == Status.running:
    
    /home/ad/briolf/anaconda3/lib/python3.8/site-packages/distributed/scheduler.py in gather
          5052                 who_has[key] = []
          5053 
    ----> 5054         data, missing_keys, missing_workers = await gather_from_workers(
          5055             who_has, rpc=self.rpc, close=False, serializers=serializers
          5056         )
    
    /home/ad/briolf/anaconda3/lib/python3.8/site-packages/distributed/utils_comm.py in gather_from_workers
          86                     missing_workers.add(worker)
          87                 else:
    ----> 88                     response.update(r["data"])
          89         finally:
          90             for r in rpcs.values():
    
    KeyError: 'data'

I modified the module code, to display the content of this dictionary when this error is thrown. I saw that in this context, the dictionary just contains the following data: dict(status='busy').

I cannot put reproducible code because the software used is not public and I did not discover simple lines of code to reproduce this problem. I can eventually do other tests to give you further information.

Environment:

  • Dask version: 2021.4.0
  • Python version: 3.8.8
  • Operating System: CentOS Linux release 7.6.1810
  • Install method (conda, pip, source): conda, channel conda-forge
@haf
Copy link

haf commented Nov 25, 2021

This was actually really easy to read the source code for, due to the high quality of the source code in this project.

Here's the bug:

https://github.com/dask/distributed/blob/main/distributed/utils_comm.py#L63-L88

It creates a coroutine for comms via send/recv to the worker, opening a comms channel.

It uses this operation:

try:
response = await send_recv(
comm,
serializers=serializers,
deserializers=deserializers,
op="get_data",
keys=keys,
who=who,
max_connections=max_connections,
)

Which creates a RPC request called "get_data" and always returns the response no matter whether the response is successful or not (but it crashes if the protocol is broken).

Findin the invoked worker method was then rather easy:

async def get_data(

Now it's just a question if seeing if it ever returns any message without "data" as a property, which is does in this if statement:

if (
max_connections is not False
and self.outgoing_current_count >= max_connections
):
logger.debug(
"Worker %s has too many open connections to respond to data request "
"from %s (%d/%d).%s",
self.address,
who,
self.outgoing_current_count,
max_connections,
throttle_msg,
)
return {"status": "busy"}

Which actually does log on a debug level that the worker has too many open connections to answer the request right now.

So what's need to be done is to update the line that crashes to check whether status="busy" and if so, do a decorrelated exponential backoff retry until the data could be fetched.

haf added a commit to haf/distributed that referenced this issue Nov 25, 2021
@fjetter fjetter added the bug Something is broken label Nov 29, 2021
@fjetter fjetter changed the title KeyError 'data' Client/Scheduler gather not robust to busy worker Jun 14, 2022
@crusaderky crusaderky self-assigned this Jul 12, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something is broken
Projects
None yet
4 participants