-
Notifications
You must be signed in to change notification settings - Fork 150
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
"Another operation is in progress" error when using transactions and asyncio.gather #313
Comments
So.. i have made investigation and found that without implicit connection passing all nested quieries are made not in connection of db.transaction :( when code is async def t():
async with db.transaction(reuse=False, reusable=False) as tx:
await Model.get(109887, bind=tx.connection)
await asyncio.gather(*[t() for i in range(5)]) i have added
and without
|
Thanks for the report! I'm seeing two critical issues for GINO in Python 3.7:
import asyncio
import gino
db = gino.Gino()
async def main():
await db.set_bind('postgresql://localhost/gino')
async with db.acquire():
pass
async def t():
async with db.acquire(reuse=True):
await db.scalar('SELECT now()')
await asyncio.gather(*[t() for _ in range(5)])
asyncio.run(main())
|
This error also is valid for python 3.6. i've tried as initialization async with db.acquire(reuse=False, reusable=False):
pass with different parameters combination also But it does not work also: async with db.transaction():
pass
async def t():
async with db.transaction(reuse=True):
await Model.get(...)
await asyncio.gather(*[t() for i in range(5)]) Is there anyway to use |
Could you please share your env and errors when above code fails in Python 3.6? I didn't reproduce in Python 3.6.
FYI |
Python 3.6.6 (default, Aug 20 2018, 16:26:45) @app.route('/test')
async def test_save():
async with db.acquire(reuse=False, reusable=False):
pass
async def t():
async with db.transaction():
await Model.get(...)
await asyncio.gather(*[t() for i in range(5)])
return {}
|
Ah yes, thanks for the update! To reproduce this issue in Python 3.6, |
@fantix You know, the same issue is also valid for Sanic import asyncio
from sanic import Sanic
from sanic.exceptions import abort
from sanic.response import json
from gino.ext.sanic import Gino
app = Sanic()
app.config.DB_DSN = '...'
db = Gino()
db.init_app(app)
class Model(db.Model):
__tablename__ = 'model'
id = db.Column(db.Integer, primary_key=True)
@app.route("/test")
async def test(request):
async def t():
async with db.transaction():
await Model.get_or_404(...)
await asyncio.gather(*[t() for i in range(5)])
return json({})
if __name__ == '__main__':
app.run(debug=True)
|
Yes thanks, all web framework plugins in GINO enabled inherit. I'm trying to make that a default in upstream contextvars PEP-567 backport. |
It doesn't matter whether the connection was acquired and released in the main task. It also happens in the following code: import asyncio
import gino
db = gino.Gino()
async def main():
await db.set_bind('postgresql://localhost/gino')
async with db.acquire():
async def t():
async with db.acquire(reuse=True):
print(await db.scalar('SELECT now()'))
await asyncio.gather(*[t() for _ in range(5)])
asyncio.run(main()) The issue is with I don't have a clear answer in mind if gino should create a new connection, or have a lock-like mechanism on the connection. I assume there are valid use cases in both scenarios. |
@wwwjfy My idea is to remove stack instances from shared contexts once stacks became empty. This requires unique remove APIs for Python 3.7 and |
@fantix That can fix the problem raised, but can't prevent that the same reused connection got by subtasks, like the snippet in my previous comment. Because |
Ah, first of all, context should be inherited in subtasks, this was discussed in #84, PEP-550:
and eventually implemented in PEP-567:
Regarding shared connections, I think it should follow the way contexts are inherited, so that people don't have to remember another rule. Therefore if users are spawning subtasks within an In your example, if user was in doubt whether the reusing connection may be accessed concurrently, an explicit async def main():
async with db.acquire():
await asyncio.wait_for(db.status('SELECT pg_sleep(10)'), 1)
Hopefully this explains. 😃 |
I have no issues on context inheritance, but not sure if there is any case user wants to use only one connection for those subtasks or in one request, where we'll need to lock-release it. |
Ah, got it. I think the users should lock-release it themselves manually, likewise, asyncpg didn't provide locking either. |
ok, fair enough. |
No it's not - I'm trying to remove |
Guess this should be backported to GINO 0.7 without introducing aiocontextvars 0.2. |
Wow. I just found that the fixes for this (the upcoming 0.8.0 with aiocontextvars 0.2?) likely also fix my woes with transactions. I'm explicitly not using the aiohttp integration because the web part is just one side of my application, that also reacts to external events, so I didn't have the enable_inherit call anywhere. Long story short, whenever I used asyncio.gather to fork execution from within a transaction context, the current connection wouldn't be reused, so each of the coroutines I ran gather on used their own, transaction-less one, and I got plenty of "race conditions" that I was counting on the serializable isolation level to prevent. I got to this by looking at PostgreSQL's statement logs where I couldn't see savepoints that I expected nested transactions to trigger. I dug all the code up to asyncpg's transaction support to verify they should be there. Eventually realized by the PIDs in the logs that different connections were in use when there should be only one. I've just checked against master head, and now I see shared connections and nested savepoints, so I expect I'll see serialization failure retries now :-) BTW. This is running on Python 3.5.3, and I made sure to have aiocontextvars imported before creating any loops. |
Glad you find it works. One thing I'm thinking is using |
@wwwjfy you're right; I had not thought of different nested transactions being started in different branches. That's not going to work. But I don't need them. I just need to know if a transaction is already active (and let it retry as a whole if it fails) or not (and create a new root one in the spot). I'll likely upgrade my decorator to use aiocontextvars to implement that behavior rather than blindly starting a new (sub) transaction. That will also allow me to register actions to be run once a transaction has been irreversibly committed. But losing the current connection just because I used asyncio.gather was completely unexpected. |
Hello!
When i need to make several operations in parallel with db i'm using asyncio.gather with task and transaction inside it, but i have found that in GINO it does not work correctly - throws an error
asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress
How to reproduce it:
If i will place db.acquire before db.transaction it does not change anything.
I think that problem is that there must be new real connection and transaction in it when using
asyncio.gather
stacktrace
The text was updated successfully, but these errors were encountered: