-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdrugbank.py
105 lines (84 loc) · 3.8 KB
/
drugbank.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# Currently get full xml drugbank db as: curl -L -o filename.zip -u EMAIL:PASSWORD https://www.drugbank.ca/releases/5-0-5/downloads/all-full-database
import xmltodict
import json
import os
import zipfile
import pycurl
temp_folder = './temp'
data_folder = './data'
scope_name = 'drugbank'
USERNAME = os.environ['DRUGBANK_USER']
PASSWORD = os.environ['DRUGBANK_PASSWORD']
source_zip_url = 'https://www.drugbank.ca/releases/5-0-5/downloads/all-full-database'
temp_file = os.path.join(temp_folder, scope_name, "drugbank-full.zip")
if not os.path.isdir(temp_folder):
os.makedirs(temp_folder)
if not os.path.isdir(os.path.join(temp_folder, scope_name)):
os.makedirs(os.path.join(temp_folder, scope_name))
if not os.path.isfile(temp_file):
print("Downloading file ...")
try:
with open(temp_file, 'wb') as current_file:
c = pycurl.Curl()
c.setopt(c.USERPWD, '%s:%s' % (USERNAME, PASSWORD))
c.setopt(c.FOLLOWLOCATION, 1)
c.setopt(c.URL, source_zip_url)
c.setopt(c.WRITEDATA, current_file)
c.perform()
c.close()
except IOError as e:
print("Can't retrieve %r to %r: %s" % (source_zip_url, temp_folder, e))
quit()
print("Unzipping file ...")
try:
with zipfile.ZipFile(temp_file) as fdazip:
for n in fdazip.namelist():
destination = os.path.join(temp_folder, scope_name, n)
destination_dir = os.path.dirname(destination)
if not os.path.isdir(destination_dir):
os.makedirs(destination_dir)
with fdazip.open(n) as file:
with open(destination, 'wb') as f:
f.write(file.read())
f.close()
file.close()
fdazip.close()
except zipfile.error as e:
print("Bad zipfile (from %r): %s" % (source_zip_url, e))
quit()
print("Opening XML database dump")
with open(os.path.join(temp_folder, scope_name, "full database.xml"), 'rb') as f:
print("Opened XML database dump")
print("Parsing XML into dictionaries.. This will take a while!")
d = xmltodict.parse(f, xml_attribs=True)
print("Parsed XML into dictionaries")
if not os.path.isdir(data_folder):
os.makedirs(data_folder)
if not os.path.isdir(os.path.join(data_folder, scope_name)):
os.makedirs(os.path.join(data_folder, scope_name))
with open(os.path.join(data_folder, scope_name, 'drugbank.json'), 'w') as out:
print("Writing JSON representation of database and filtering only wanted fields")
result = []
for drug in d['drugbank']['drug']:
current = {'name': drug['name']}
if isinstance(drug['drugbank-id'], list):
current['drugbankId'] = [e['#text'] for e in drug['drugbank-id'] if isinstance(e, dict) and e['@primary'] == 'true'][0]
current['otherIds'] = [e for e in drug['drugbank-id'] if isinstance(e, str)]
else:
current['drugbankId'] = drug['drugbank-id']['#text']
if drug['synonyms'] is None:
current['synonyms'] = None
elif isinstance(drug['synonyms']['synonym'], list):
current['synonyms'] = list(set([e['#text'] for e in drug['synonyms']['synonym'] if e['#text'] is not None]))
else:
current['synonyms'] = [drug['synonyms']['synonym']['#text']]
if drug['products'] is None:
current['products'] = None
elif isinstance(drug['products']['product'], list):
current['products'] = list(set([e['name'] for e in drug['products']['product'] if e['name'] is not None]))
else:
current['products'] = [drug['products']['product']['name']]
result.append(current)
json.dump(result, out)
out.close()
print("Done.")