-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexample_postgres.py
167 lines (141 loc) · 4.73 KB
/
example_postgres.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
"""
Example queries for AnnotateDB in python3.
For output see `example_postgres.log`.
To run the examples install
pip install -r requirementes.txt
python example_postgres.py
"""
import sys
import logging
import psycopg2
from collections import OrderedDict
from pprint import pprint
def execute_query(query, description=None, fields=None):
""" Executes the given query within database context.
:param query_function:
:return:
"""
try:
connection = psycopg2.connect(
user="adb",
password="adb",
host="localhost",
port="5434",
database="adb"
)
cursor = connection.cursor()
print("-" * 80)
print(query)
print(description)
print("-" * 80)
cursor.execute(query)
mapping_records = cursor.fetchall()
for k, row in enumerate(mapping_records):
print(row)
if fields is not None:
# create dict for access
entry = OrderedDict(zip(fields, row))
pprint(entry)
except (Exception, psycopg2.Error) as error:
logging.error(f"Error while fetching data from PostgreSQL: {error}")
finally:
# close database connection
if connection:
cursor.close()
connection.close()
def example_mappings_table():
""" Query the adb_mapping table.
This is the internal table for mappings (with foreign keys).
Much simpler to query on the mapping_view instead (see next example).
:return:
"""
execute_query(
query="""
SELECT * FROM adb_mapping LIMIT 10
""",
description="select first 10 mappings",
fields=['id', 'qualifier', 'evidence_id', 'source_id', 'target_id']
)
def example_mappings_view():
""" Using the mapping view to query mappings.
:param cursor:
:return:
"""
execute_query(
query="""
SELECT * FROM mapping_view LIMIT 10
""",
description="Select first 10 mappings in dedicated mapping_view",
fields=['id', 'source_collection', 'source_miriam', 'source_term',
'qualifier',
'target_collection', 'target_miriam', 'target_term',
"evidence_source", "evidence_version", "evidence"
]
)
def example_collections():
execute_query(
query="""
SELECT * FROM adb_collection LIMIT 10
""",
description="Select first 10 collection.",
fields=['id', 'namespace', 'miriam', 'name', 'idpattern', 'urlpattern']
)
QUERIES = [
[
"""
SELECT * FROM mapping_view
WHERE (source_term = 'ACKr')
ORDER BY target_namespace, target_term;
""",
"Query all mappings for 'ACKr'."
],
[
"""
SELECT * FROM mapping_view
WHERE (source_term = 'ACKr' AND
source_namespace = 'bigg.reaction' AND
qualifier = 'IS' AND
target_miriam = TRUE)
ORDER BY target_namespace, target_term;
""",
"Query all MIRIAM mappings for the bigg.reaction 'ACKr'"
],
[
"""
SELECT source_term FROM mapping_view
WHERE (target_term = 'CHEBI:698' AND
target_namespace = 'chebi' AND
source_namespace = 'bigg.metabolite' AND
qualifier = 'IS')
ORDER BY target_namespace, target_term;
""",
"Query bigg metabolite for the CHEBI id 'CHEBI:698'"
],
[
"""
SELECT source_term FROM mapping_view
WHERE (target_term = 'CHEBI:17634' AND
target_namespace = 'chebi' AND
source_namespace = 'bigg.metabolite' AND
qualifier = 'IS' AND
evidence_source = 'bigg' AND evidence_version = '1.5' and evidence = 'database'
)
ORDER BY target_namespace, target_term;
""",
"Restrict information to bigg evidence, multiple bigg metabolites for chebi.'"
]
]
if __name__ == "__main__":
old_stdout = sys.stdout
log_file = open("./example_postgres.out", "w")
sys.stdout = log_file
print('*** AnnotateDB SQL queries ***')
# examples with creating hashmaps
example_mappings_table()
example_mappings_view()
example_collections()
# execute additional set of queries and show results
for data in QUERIES:
execute_query(query=data[0], description=data[1])
sys.stdout = old_stdout
log_file.close()