-
Notifications
You must be signed in to change notification settings - Fork 1
/
conftest.py
111 lines (84 loc) · 2.46 KB
/
conftest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import asyncio
import os
import uuid
from concurrent.futures import ThreadPoolExecutor
import pytest
import pytest_asyncio
import asyncodbc
@pytest_asyncio.fixture
async def conn(connection_maker, database):
connection = await connection_maker()
await connection.execute(f"USE {database};")
await connection.commit()
return connection
@pytest.fixture(scope="session")
def event_loop():
return asyncio.get_event_loop()
@pytest_asyncio.fixture(scope="session", autouse=True)
async def database():
connection = await asyncodbc.connect(dsn=os.getenv("TEST_DSN"), autocommit=True)
db = f"test_{uuid.uuid4()}".replace("-", "")
await connection.execute(f"CREATE DATABASE {db};")
yield db
await connection.execute(f"DROP DATABASE {db};")
await connection.close()
@pytest.fixture
async def connection_maker(dsn, database):
cleanup = []
async def make(**kw):
if kw.get("executor", None) is None:
executor = ThreadPoolExecutor(max_workers=1)
kw["executor"] = executor
else:
executor = kw["executor"]
conn = await asyncodbc.connect(dsn=dsn, database=database, **kw)
cleanup.append((conn, executor))
return conn
try:
yield make
finally:
for conn, executor in cleanup:
await conn.close()
executor.shutdown(True)
@pytest_asyncio.fixture
async def pool(dsn):
p = await asyncodbc.create_pool(dsn=dsn)
try:
yield p
finally:
p.close()
await p.wait_closed()
@pytest.fixture
def dsn():
return os.getenv("TEST_DSN")
@pytest_asyncio.fixture
async def pool_maker():
pool_list = []
async def make(**kw):
pool = await asyncodbc.create_pool(**kw)
pool_list.append(pool)
return pool
try:
yield make
finally:
for pool in pool_list:
pool.close()
await pool.wait_closed()
@pytest.fixture
def executor():
return ThreadPoolExecutor(max_workers=10)
@pytest_asyncio.fixture
async def table(conn):
cur = await conn.cursor()
await cur.execute("CREATE TABLE t1(n INT, v VARCHAR(10));")
await cur.execute("INSERT INTO t1 VALUES (1, '123.45');")
await cur.execute("INSERT INTO t1 VALUES (2, 'foo');")
await conn.commit()
await cur.close()
try:
yield "t1"
finally:
cur = await conn.cursor()
await cur.execute("DROP TABLE t1;")
await cur.commit()
await cur.close()