-
Notifications
You must be signed in to change notification settings - Fork 248
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[copy] fix the TimeoutError and ServerDisconnected issues in copy #11830
Conversation
OK, there were two problems: 1. A timeout of 5s appears to be now too short for Google Cloud Storage. I am not sure why but we timeout substantially more frequently. I have observed this myself on my laptop. Just this morning I saw it happen to Daniel. 2. When using an `aiohttp.AsyncIterablePayload`, it is *critical* to always check if the coroutine which actually writes to GCS (which is stashed in the variable `request_task`) is still alive. In the current `main`, we do not do this which causes hangs (in particular the timeout exceptions are never thrown ergo we never retry). To understand the second problem, you must first recall how writing works in aiogoogle. There are two Tasks and an `asyncio.Queue`. The terms "writer" and "reader" are somewhat confusing, so let's use left and right. The left Task has the owning reference to both the source "file" and the destination "file". In particular, it is the *left* Task which closes both "files". Moreover, the left Task reads chunks from the source file and places those chunks on the `asyncio.Queue`. The right Task takes chunks off the queue and writes those chunks to the destination file. This situation can go awry in two ways. First, if the right Task encounters any kind of failure, it will stop taking chunks off of the queue. When the queue (which has a size limit of one) is full, the left Task will hang. The system is stuck. The left Task will wait forever for the right Task to empty the queue. The second scenario is exactly the same except that the left Task is trying to add the "stop" message to the queue rather than a chunk. In either case, it is critical that the left Task waits simultaneously on the queue operation *and* on the right Task completing. If the right Task has died, no further writes can occur and the left Task must raise an exception. In the first scenario, we do not observe the right Task's exception because that will be done when we close the `InsertObjectStream` (which represents the destination "file"). --- I also added several types, assertions, and a few missing `async with ... as resp:` blocks.
hail/python/hailtop/httpx.py
Outdated
@@ -101,7 +101,7 @@ def __init__(self, | |||
assert 'connector' not in kwargs | |||
|
|||
if timeout is None: | |||
timeout = aiohttp.ClientTimeout(total=5) | |||
timeout = aiohttp.ClientTimeout(total=20) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we set this timeout explicitly from the aiogoogle code? I'm worried this will make other places where we use this Client have long timeouts like in Batch and we don't want that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is possible to set the timeout during construction of the session for the StorageClient.
I'm somewhat disinclined to use different timeouts for different parts of our system. That seems like it will be harder to keep track of when we're debugging things. I kind of think our original 5s timeout is quite aggressive. I'm not sure what to think. I just really don't want to think about multiple different parts of our system using differing lengths of timeout.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If that's the case, then we need to make sure every batch-driver / worker interaction has the correct timeouts. We cannot wait 20 seconds to schedule a job as that will gum up the scheduler.
return await self._session.post( | ||
f'https://storage.googleapis.com/upload/storage/v1/b/{bucket}/o', | ||
**kwargs) | ||
assert 'data' not in params |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume we never reach this case in our current code...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it's totally unused.
await asyncio.wait([fut, self._request_task], return_when=asyncio.FIRST_COMPLETED) | ||
if fut.done(): | ||
return len(b) | ||
raise ValueError(f'request task finished early') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the implication of this Exception? Does this show up in user logs? Is it retried at all as a transient error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not a transient error and is not retried. It could show up anywhere that someone tries to use aiogoogle. That includes input and output of batch.
As long as the client of this code is correctly calling close
on an InsertObjectStream, then you'll see that while handling this ValueError, you encountered the error produced by the _request_task, which is the actual cause of all this. As a result, if you're doing something like:
async def foo():
async with await fs.create(...) as obj:
await obj.write(...)
await retry_transient_errors(foo)
The retry_transient_errors will see the transient error from the _request_task (which will have the ValueError as a suppressed exception) and will appropriately retry foo.
Let's try to get this merged today, I don't have meetings so I can respond quickly to changes |
I think you need non-default timeouts in job.py |
Also, in worker.py when you construct Worker() |
Hmm. I really am not a fan of heterogeneity of timeout. OK, if you really think its critical that we have 5s timeouts in Batch, then I'll just put the 20 second timeout into the storage_client. |
cc: @daniel-goldstein, this is a tricky asyncio situation which you should also keep in mind
OK, there were two problems:
A timeout of 5s appears to be now too short for Google Cloud Storage. I am not sure why but we
timeout substantially more frequently. I have observed this myself on my laptop. Just this
morning I saw it happen to Daniel.
When using an
aiohttp.AsyncIterablePayload
, it is critical to always check if the coroutinewhich actually writes to GCS (which is stashed in the variable
request_task
) is stillalive. In the current
main
, we do not do this which causes hangs (in particular the timeoutexceptions are never thrown ergo we never retry).
To understand the second problem, you must first recall how writing works in aiogoogle. There are
two Tasks and an
asyncio.Queue
. The terms "writer" and "reader" are somewhat confusing, so let'suse left and right. The left Task has the owning reference to both the source "file" and the
destination "file". In particular, it is the left Task which closes both "files". Moreover, the
left Task reads chunks from the source file and places those chunks on the
asyncio.Queue
. Theright Task takes chunks off the queue and writes those chunks to the destination file.
This situation can go awry in two ways.
First, if the right Task encounters any kind of failure, it will stop taking chunks off of the
queue. When the queue (which has a size limit of one) is full, the left Task will hang. The system
is stuck. The left Task will wait forever for the right Task to empty the queue.
The second scenario is exactly the same except that the left Task is trying to add the "stop"
message to the queue rather than a chunk.
In either case, it is critical that the left Task waits simultaneously on the queue operation and
on the right Task completing. If the right Task has died, no further writes can occur and the left
Task must raise an exception. In the first scenario, we do not observe the right Task's exception
because that will be done when we close the
InsertObjectStream
(which represents the destination"file").
I also added several types, assertions, and a few missing
async with ... as resp:
blocks.