Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[copy] fix the TimeoutError and ServerDisconnected issues in copy #11830

Merged
merged 5 commits into from
May 13, 2022

Conversation

danking
Copy link
Contributor

@danking danking commented May 11, 2022

cc: @daniel-goldstein, this is a tricky asyncio situation which you should also keep in mind

OK, there were two problems:

  1. A timeout of 5s appears to be now too short for Google Cloud Storage. I am not sure why but we
    timeout substantially more frequently. I have observed this myself on my laptop. Just this
    morning I saw it happen to Daniel.

  2. When using an aiohttp.AsyncIterablePayload, it is critical to always check if the coroutine
    which actually writes to GCS (which is stashed in the variable request_task) is still
    alive. In the current main, we do not do this which causes hangs (in particular the timeout
    exceptions are never thrown ergo we never retry).

To understand the second problem, you must first recall how writing works in aiogoogle. There are
two Tasks and an asyncio.Queue. The terms "writer" and "reader" are somewhat confusing, so let's
use left and right. The left Task has the owning reference to both the source "file" and the
destination "file". In particular, it is the left Task which closes both "files". Moreover, the
left Task reads chunks from the source file and places those chunks on the asyncio.Queue. The
right Task takes chunks off the queue and writes those chunks to the destination file.

This situation can go awry in two ways.

First, if the right Task encounters any kind of failure, it will stop taking chunks off of the
queue. When the queue (which has a size limit of one) is full, the left Task will hang. The system
is stuck. The left Task will wait forever for the right Task to empty the queue.

The second scenario is exactly the same except that the left Task is trying to add the "stop"
message to the queue rather than a chunk.

In either case, it is critical that the left Task waits simultaneously on the queue operation and
on the right Task completing. If the right Task has died, no further writes can occur and the left
Task must raise an exception. In the first scenario, we do not observe the right Task's exception
because that will be done when we close the InsertObjectStream (which represents the destination
"file").


I also added several types, assertions, and a few missing async with ... as resp: blocks.

OK, there were two problems:

1. A timeout of 5s appears to be now too short for Google Cloud Storage. I am not sure why but we
   timeout substantially more frequently. I have observed this myself on my laptop. Just this
   morning I saw it happen to Daniel.

2. When using an `aiohttp.AsyncIterablePayload`, it is *critical* to always check if the coroutine
   which actually writes to GCS (which is stashed in the variable `request_task`) is still
   alive. In the current `main`, we do not do this which causes hangs (in particular the timeout
   exceptions are never thrown ergo we never retry).

To understand the second problem, you must first recall how writing works in aiogoogle. There are
two Tasks and an `asyncio.Queue`. The terms "writer" and "reader" are somewhat confusing, so let's
use left and right. The left Task has the owning reference to both the source "file" and the
destination "file". In particular, it is the *left* Task which closes both "files". Moreover, the
left Task reads chunks from the source file and places those chunks on the `asyncio.Queue`. The
right Task takes chunks off the queue and writes those chunks to the destination file.

This situation can go awry in two ways.

First, if the right Task encounters any kind of failure, it will stop taking chunks off of the
queue. When the queue (which has a size limit of one) is full, the left Task will hang. The system
is stuck. The left Task will wait forever for the right Task to empty the queue.

The second scenario is exactly the same except that the left Task is trying to add the "stop"
message to the queue rather than a chunk.

In either case, it is critical that the left Task waits simultaneously on the queue operation *and*
on the right Task completing. If the right Task has died, no further writes can occur and the left
Task must raise an exception. In the first scenario, we do not observe the right Task's exception
because that will be done when we close the `InsertObjectStream` (which represents the destination
"file").

---

I also added several types, assertions, and a few missing `async with ... as resp:` blocks.
jigold
jigold previously requested changes May 11, 2022
@@ -101,7 +101,7 @@ def __init__(self,
assert 'connector' not in kwargs

if timeout is None:
timeout = aiohttp.ClientTimeout(total=5)
timeout = aiohttp.ClientTimeout(total=20)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we set this timeout explicitly from the aiogoogle code? I'm worried this will make other places where we use this Client have long timeouts like in Batch and we don't want that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible to set the timeout during construction of the session for the StorageClient.

I'm somewhat disinclined to use different timeouts for different parts of our system. That seems like it will be harder to keep track of when we're debugging things. I kind of think our original 5s timeout is quite aggressive. I'm not sure what to think. I just really don't want to think about multiple different parts of our system using differing lengths of timeout.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that's the case, then we need to make sure every batch-driver / worker interaction has the correct timeouts. We cannot wait 20 seconds to schedule a job as that will gum up the scheduler.

return await self._session.post(
f'https://storage.googleapis.com/upload/storage/v1/b/{bucket}/o',
**kwargs)
assert 'data' not in params
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume we never reach this case in our current code...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's totally unused.

await asyncio.wait([fut, self._request_task], return_when=asyncio.FIRST_COMPLETED)
if fut.done():
return len(b)
raise ValueError(f'request task finished early')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the implication of this Exception? Does this show up in user logs? Is it retried at all as a transient error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a transient error and is not retried. It could show up anywhere that someone tries to use aiogoogle. That includes input and output of batch.

As long as the client of this code is correctly calling close on an InsertObjectStream, then you'll see that while handling this ValueError, you encountered the error produced by the _request_task, which is the actual cause of all this. As a result, if you're doing something like:

async def foo():
    async with await fs.create(...) as obj:
        await obj.write(...)
await retry_transient_errors(foo)

The retry_transient_errors will see the transient error from the _request_task (which will have the ValueError as a suppressed exception) and will appropriately retry foo.

@danking danking mentioned this pull request May 13, 2022
@danking
Copy link
Contributor Author

danking commented May 13, 2022

Let's try to get this merged today, I don't have meetings so I can respond quickly to changes

@jigold
Copy link
Contributor

jigold commented May 13, 2022

I think you need non-default timeouts in job.py unschedule_job and in instance.py check_is_active_healthy

@jigold
Copy link
Contributor

jigold commented May 13, 2022

Also, in worker.py when you construct Worker()

@danking
Copy link
Contributor Author

danking commented May 13, 2022

Hmm. I really am not a fan of heterogeneity of timeout. OK, if you really think its critical that we have 5s timeouts in Batch, then I'll just put the 20 second timeout into the storage_client.

@danking danking merged commit 5365520 into hail-is:main May 13, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants