Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress #56

Closed
yjdwbj opened this issue Dec 26, 2016 · 22 comments

Comments

@yjdwbj
Copy link

yjdwbj commented Dec 26, 2016

Hi, here is my code ,please help resovle the problem.thanks a lot.

from sanic import Sanic
from sanic.response import json
import asyncpg
import asyncio
import uvloop
loop = uvloop.new_event_loop()
app = Sanic()
app.debug = True

async def initdb_pool():
    dbdict = {"database":"mqtt","user":"mqtt","password":"mqtt123",
            "host":"192.168.25.100","port":5433}
    return await asyncpg.create_pool(**dbdict)

@app.route("/iot/v1.0/app/auth/<user:[A-z0-9]\w+>/<pwd:[A-z0-9]\w+>/")
async def applogin(request,user,pwd):
    async with engine.acquire() as connection:
        #async with connection.transaction():
        stmt   = await connection.prepare('select key from user_manager_appuser where uname = $1 or email = $2 or phone = $3')
        result =  await stmt.fetchval(user,user,user)
        #    result = await connection.fetchval('select key from user_manager_appuser where uname = $1 or email = $2 or phone = $3',
        #                                        user,user,user)
        if not result:
            return json({'ok':False,'err':'Pwd error'})
        print("result is ",result)
        return json({'ok':True,'data':str(result)})

if __name__ == "__main__":
    engine = loop.run_until_complete(initdb_pool())
    app.run(host="0.0.0.0",port=8000,debug=True)

 curl "http://127.0.0.1:8000/iot/v1.0/app/auth/abc/123456/"
Error: cannot perform operation: another operation is in progress
Exception: Traceback (most recent call last):
  File "/media/yjdwbj/E/py360dev/lib/python3.6/site-packages/sanic/sanic.py", line 194, in handle_request
    response = await response
  File "/opt/python360/lib/python3.6/asyncio/coroutines.py", line 128, in throw
    return self.gen.throw(type, value, traceback)
  File "main.py", line 32, in applogin
    return json({'ok':True,'data':str(result)})
  File "/opt/python360/lib/python3.6/asyncio/coroutines.py", line 109, in __next__
    return self.gen.send(None)
  File "/media/yjdwbj/E/py360dev/lib/python3.6/site-packages/asyncpg/pool.py", line 252, in __aexit__
    await self.pool.release(con)
  File "/opt/python360/lib/python3.6/asyncio/coroutines.py", line 109, in __next__
    return self.gen.send(None)
  File "/media/yjdwbj/E/py360dev/lib/python3.6/site-packages/asyncpg/pool.py", line 184, in release
    await connection.reset()
  File "/opt/python360/lib/python3.6/asyncio/coroutines.py", line 109, in __next__
    return self.gen.send(None)
  File "/media/yjdwbj/E/py360dev/lib/python3.6/site-packages/asyncpg/connection.py", line 411, in reset
    ''')
  File "/opt/python360/lib/python3.6/asyncio/coroutines.py", line 109, in __next__
    return self.gen.send(None)
  File "/media/yjdwbj/E/py360dev/lib/python3.6/site-packages/asyncpg/connection.py", line 170, in execute
    return await self._protocol.query(query, timeout)
  File "asyncpg/protocol/protocol.pyx", line 247, in query (asyncpg/protocol/protocol.c:51830)
  File "asyncpg/protocol/protocol.pyx", line 352, in asyncpg.protocol.protocol.BaseProtocol._ensure_clear_state (asyncpg/protocol/protocol.c:54136)
asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress
@elprans
Copy link
Member

elprans commented Dec 26, 2016

I'm not sure your snippet is complete. Where does engine come from? Are you using asyncpgsa, if so, do you get the same result by using asyncpg directly?

Thanks.

@yjdwbj
Copy link
Author

yjdwbj commented Dec 27, 2016

Hi, this code is simple clearly, and it was connected database, just use asyncpg to create_pool .

if __name__ == "__main__":
    engine = loop.run_until_complete(initdb_pool())
    app.run(host="0.0.0.0",port=8000,debug=True)

@yjdwbj yjdwbj closed this as completed Dec 27, 2016
@jbcurtin
Copy link

jbcurtin commented Jan 11, 2017

@yjdwbj, you'll have to pass the loop into sanic manually.

if __name__ == "__main__":
    engine = loop.run_until_complete(initdb_pool())
    app.run(host="0.0.0.0",port=8000, debug=True, loop=loop)

@cctse
Copy link

cctse commented Feb 16, 2017

@yjdwbj why closed?I met the same problem

@yjdwbj
Copy link
Author

yjdwbj commented Feb 16, 2017

@cctse Because the authors not care this bugs.I found openresty instead of it.

@cctse
Copy link

cctse commented Feb 16, 2017

@elprans please reopen this issue, full code as follows

from sanic import Sanic
from sanic.response import text
import asyncio
import uvloop
import asyncpg


asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
loop = asyncio.get_event_loop()


app = Sanic(__name__)

async def init_pgpool():
    return await asyncpg.create_pool('postgres://postgres:ssdd@localhost:5433/postgres')

pgpool = loop.run_until_complete(init_pgpool())


@app.route('/')
async def index(req):
    # async with asyncpg.create_pool('postgres://postgres:ssdd@localhost:5433/tmp') as pgpool:   # this works fine
    async with pgpool.acquire() as conn:
        rs = await conn.fetchval('select 1')
        print('----', rs)
    return text('done')

app.run(port=8084)

error when curl http://127.0.0.1:8084

ERROR: Traceback (most recent call last):
  File "/usr/local/var/pyenv/versions/anaconda3-2.5.0/lib/python3.5/site-packages/sanic/sanic.py", line 329, in handle_request
    response = await response
  File "/Users/akc/project/py/tsweb/sweb.py", line 25, in index
    print('----', rs)
  File "/usr/local/var/pyenv/versions/anaconda3-2.5.0/lib/python3.5/site-packages/asyncpg/pool.py", line 252, in __aexit__
    await self.pool.release(con)
  File "/usr/local/var/pyenv/versions/anaconda3-2.5.0/lib/python3.5/site-packages/asyncpg/pool.py", line 184, in release
    await connection.reset()
  File "/usr/local/var/pyenv/versions/anaconda3-2.5.0/lib/python3.5/site-packages/asyncpg/connection.py", line 411, in reset
    ''')
  File "/usr/local/var/pyenv/versions/anaconda3-2.5.0/lib/python3.5/site-packages/asyncpg/connection.py", line 170, in execute
    return await self._protocol.query(query, timeout)
  File "asyncpg/protocol/protocol.pyx", line 249, in query (asyncpg/protocol/protocol.c:57280)
  File "asyncpg/protocol/protocol.pyx", line 354, in asyncpg.protocol.protocol.BaseProtocol._ensure_clear_state (asyncpg/protocol/protocol.c:59586)
asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress

@amsb
Copy link

amsb commented Feb 16, 2017

@cctse what version of sanic are you using? It still looks like asyncpg and sanic are running a different loops.

@cctse
Copy link

cctse commented Feb 17, 2017

@amsb
sanic 0.3.1
I think i found the problem, sanic tries to create the loop https://github.com/channelcat/sanic/blob/master/sanic/server.py#L295

@seemethere
Copy link

Hey guys, I'm one of the maintainers of Sanic. Is there no way to pass a loop to asyncpg? You could use a server event to pass the loop sanic creates to asyncpg and establish the db connection like so.

@seemethere
Copy link

We even have an asyncpg example on our main repo: https://github.com/channelcat/sanic/blob/master/examples/sanic_asyncpg_example.py

@elprans
Copy link
Member

elprans commented Feb 17, 2017

Hey guys, I'm one of the maintainers of Sanic. Is there no way to pass a loop to asyncpg?

Both create_pool and Connection accept loop as argument.

@seemethere
Copy link

That's what I thought. So it's really a non-issue here. Just a confusion on implementation.

@jbcurtin
Copy link

Hey @seemethere, there was a time where Sanic supported passing a loop into the run command. After doing so searching, I've found that @1st1 has commented in Sanics issues about a common issue.

The problem maybe lies in the uvloop library?

There were a couple things I ran into while trying to utilize Sanic and asyncPG. You'll have to manually pass in a loop into Sanic. The problem being Sanic pulls a new loop out from asyncio. asyncio in turn calls get_event_loop_policy. uvloop as the default policy, returns a new event_loop while asyncio would return the a running event_loop

@seemethere
Copy link

@jbcurtin Passing a loop was deprecated because event loops cannot be forked which makes it impossible to fully support multiprocessing.

As of python 3.6 asyncio.get_event_loop() returns the currently running event loop so there really shouldn't be any issues with getting the incorrect event loop unless you define a loop to use before you call app.run

@jbcurtin
Copy link

jbcurtin commented Feb 17, 2017

Yes, I'm aware of those limitations. Take a look at the following...
https://github.com/channelcat/sanic/blob/master/sanic/app.py#L58
https://github.com/MagicStack/asyncpg/blob/master/asyncpg/connection.py#L556
https://github.com/MagicStack/uvloop/blob/master/uvloop/__init__.py#L22

I solved this by creating a GLOBAL setting that I run all my logic from. Sure, its a pain to manage, but I was able to move forward and fork the processes.

asyncio.set_event_loop_policy(uvloop.get_event_loop_policy())
EV_LOOP = asyncio.get_event_loop()
EV_LOOP.run_until_complete(...)

Unfortunately, the deprecation of Sanic supporting the loop like you mentioned has forced me to not upgrade until something is fixed.

@Behzad-S
Copy link

Behzad-S commented Dec 15, 2017

hi @elprans ,
I got this error when running a simple aiohttp server for some simple tests on asyncpg,
prepared statment woks flawlessly while max concurrent connections to this server is just ONE,
but when it goes more than one, i.e, 2 or more , this error throws.
ab -c 1 -n 200 127.0.0.1:8000/ --> works fine
ab -c 2 -n 200 127.0.0.1:8000/ --> asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress

db_coro handler coroutine calls on every call of server :

async def db_coro(request):
    stmt= request.app['sql_prepared_stmt']
    print( await stmt.fetchval('Bob') )

Server main code:

if __name__ == '__main__':
	#Some simplecode for running simple aiohttp server
	app['sql_prepared_stmt'] = await conn.prepare('SELECT sid FROM test_tble WHERE name = $1 AND sid NOTNULL AND  sid >5')
    #Some simplecode for running simple aiohttp server

@elprans
Copy link
Member

elprans commented Dec 15, 2017

@Behzad-S seems like you are using a shared connection and are attempting to run queries concurrently on it. Use asyncpg.Pool and acquire/release a connection for each request.

@gary-x0pa
Copy link

I am getting this issue with fastapi when i try to use asyncpg with asynio.gather

@dony585
Copy link

dony585 commented Mar 24, 2021

@gary-x0pa can you paste the coroutines code which you are running using asyncio.gather?

I also faced the same issue using gino(with asyncpg driver). The issue for me was acquiring db connection with reuse=True inside tasks.

To elaborate, the tasks which I ran using asyncio.gather had code async with pool.acquire(reuse=True) as conn which somehow caused a connection to be shared between tasks and hence the error.

Acquiring connections with reuse=False fixed it for me.

For reference python-gino/gino#313

@gary-x0pa
Copy link

Hmm i didnt put any argument when acquiring. I circumvented the issue by just manually awaiting each coroutine.

@healiseu
Copy link

I have run into a similar problem using FastAPI and asyncpg connections pool. I have written a comment in this most relevant issue fastapi/fastapi#1800 with a reference to the solution I found.

@MarkParker5
Copy link

I fixed it by setting poolclass=NullPool in create_async_engine

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests