-
Notifications
You must be signed in to change notification settings - Fork 0
/
nasal_main.py
324 lines (262 loc) · 10.9 KB
/
nasal_main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
import os
import time
import argparse
from Bio import Entrez
import ssl
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.image import MIMEImage
import datetime
# Function for passing arguments to the bot - currently not used
def get_args():
parser = argparse.ArgumentParser(
description='PubMed scraping bot',
prog='pubmed_bot',
formatter_class=argparse.RawDescriptionHelpFormatter,)
parser.add_argument(
'-doi',
dest='doi_db',
default='doi_db.txt',
help='DOI database location')
parser.add_argument(
'-topic',
dest='topic',
default='topics.txt',
help='Topic .txt file location')
args = parser.parse_args()
doi_db_loc = args.doi_db
topic = args.topic
return doi_db_loc, topic
# PubMed scraping function
def pubmed_scrape(query, bot_email, max_scrapes):
# Create empty variables
article_list = {}
Entrez.email = bot_email
# Perform scrape with the parsed query term
ncbi_scrape = Entrez.esearch(db='pubmed', term=query, retmax=max_scrapes)
# Save the scraped articles in a list
scrape_list = Entrez.read(ncbi_scrape)
list_format = scrape_list['IdList']
# Loop over scrapes, fetch DOI, title, and publication date
for scrape in list_format:
id_num = scrape
summary = Entrez.esummary(db='pubmed', id=id_num)
read_summary_list = Entrez.read(summary)
read_summary = read_summary_list[0] # Access the dictionary within the list
doi = read_summary.get('DOI', 'Unknown DOI') # Unknown DOI if unable to access
title = read_summary.get('Title', 'Unknown Title') # Unknown title if unable to access
pub_date = read_summary.get('PubDate', 'Unknown PubDate')
# Append article information to dictionary
article_list[doi] = {'Title': title, 'PubDate': pub_date}
time.sleep(5)
return article_list
def doi_checker(doi_to_check, doi_db_filename):
temp_dois = []
# Open the doi database, import and strip the entries
with open(doi_db_filename, 'r') as doi_database:
for entry in doi_database:
entry = entry.rstrip()
temp_dois.append(entry)
# Check if the doi is already known and return boolean value
if doi_to_check in temp_dois:
return True, doi_to_check
elif doi_to_check not in temp_dois:
with open(doi_db_filename, 'a') as doi_database:
doi_database.write(doi_to_check + '\n')
return False, doi_to_check
def string_formatter(title, doi):
url = 'https://dx.doi.org/' + str(doi)
internal_str = f'{title}|{url}' # Internal string used for splitting title and url in review_nasal.py
external_str = f'{title} ({url})' # External string used for push emails
return internal_str, external_str
def write_to_rank(directory, rank, text):
# Write the text to the correct rank file
with open(f'{directory}rank{rank}.txt', 'a', encoding='utf-8') as file:
file.write(text + '\n')
def body_format(email_body):
# Splits the email_body variable into a list
email_body_list = email_body.split('\n')
# Formats email body into a numbered list
email_body_html = '<ol style="color: black; font-size: 16px;">'
for item in email_body_list:
if item.strip():
email_body_html += f'<li>{item.strip()}</li>'
email_body_html += '</ol>'
return email_body_html
def html_formatting(email_body, project):
# Fetch the current date for email titling
date = datetime.date.today()
date = date.strftime('%d/%m/%Y')
# Explanation of the search queries used to perform this search
search_queries = 'This search was performed using the terms "nasal decolonisation/decolonization"' \
' and "Staphylococcus aureus/S. aureus/Staph/MSSA/MRSA/methicillin resistant staphylococcus aureus"'
url = 'https://www.destinypharma.com/'
# html formatting of the push email
html = f"""
<!DOCTYPE html>
<html>
<head>
<style>
h1 {{
font-size: 22px;
color: white;
text-align: center;
font-family: 'Tw Cen MT', sans-serif; /* Use Tw Cen MT font */
text-decoration: underline; /* Underline the text */
margin: 0; /* Remove margin from the h1 element */
}}
.container {{
display: flex;
flex-direction: column;
justify-content: center; /* Center vertically */
align-items: center; /* Center horizontally */
background-color: #ff7d1d; /* Banner background color */
padding: 10px; /* Add padding to the banner */
height: auto;
}}
body {{
font-size: 16px;
color: black;
font-family: 'Tw Cen MT', sans-serif; /* Use Tw Cen MT font */
margin: 0; /* Remove default body margin */
padding: 0; /* Remove default body padding */
}}
img {{display: block; margin: 0 auto;
}}
hr {{
background-color: black; /* Black line */
height: 1px; /* Line thickness */
border: none;
}}
.small-text {{
font-size: 12px;
color: black;
text-align: center;
}}
</style>
</head>
<body>
<img src="cid:image">
<div class="container">
<h1 style="text-align: center;">New high priority papers are available on PubMed ({project} {date})</h1>
</div>
{email_body}
<hr> <!-- Black line -->
<p class="small-text">
{search_queries}<br><br>
{url}
</p>
</body>
</html>
"""
return html
# Define email function where a message is sent to the recipient
def send_email(email_text, email_sender, email_password, email_receiver, project):
date = datetime.date.today()
date = date.strftime('%d/%m/%Y')
subject = f'New high priority literature ({project} {date})' # Title of the email
body = email_text
em = MIMEMultipart()
em['From'] = email_sender
em['To'] = email_receiver
em['Subject'] = subject
em.attach(MIMEText(body, 'html')) # Instruct python to expect html content
# Open and attach the logo (image.jpg) to the file
try:
with open('image.JPG', 'rb') as image_file:
image = MIMEImage(image_file.read())
image.add_header('Content-ID', '<image>')
em.attach(image)
except FileNotFoundError:
print('"image.JPG" not identified, continuing without it')
context = ssl.create_default_context()
# Use SMTP to log in to the sender email and send the email
with smtplib.SMTP_SSL('smtp.gmail.com', 465, context=context) as smtp:
smtp.login(email_sender, email_password)
smtp.sendmail(email_sender, email_receiver, em.as_string())
def main():
# Create blank variables
queries = []
email_body = ""
rank = 1
new_paper_count = 0
changes = {}
fail = False
project = 'XF-73 Nasal'
directory = 'nasal_data/'
bot_email = 'automatedscrapingbot@gmail.com'
# Extract search queries to a list
with open(f'{directory}queries.txt', 'r') as search_queries:
for line in search_queries:
line = line.rstrip()
queries.append(line)
# Iterate over search queries and perform scrape
for query in queries:
print(f'Performing Pubmed search for "rank {rank}" topics...')
try:
bot_search = pubmed_scrape(query, bot_email, 20)
# Check DOI for duplicate or non-dupe
for doi, info in bot_search.items():
status, doi = doi_checker(doi, f'{directory}doi_db.txt')
if not status:
# Add 1 to new paper count
new_paper_count += 1
# Format paper string
print(f'{doi} not found in database - recognised as new paper')
title = info['Title']
int_string, ext_string = string_formatter(title, doi)
# Write to rank database
write_to_rank(directory, rank, int_string)
if rank == 1:
# Append to email_body text if rank == 1
email_body += ext_string
email_body += "\n"
elif status:
print(f'{doi} already in database')
else:
print('Logic error please check bot configuration')
# Log the changes of new papers for this rank
changes[rank] = new_paper_count
# Add 1 to the rank variable and reset new paper count
rank += 1
new_paper_count = 0
# Sleep to prevent spam
time.sleep(10)
except (ValueError, RuntimeError):
fail = True
break
# Send email if any high priority papers are recorded
if email_body != "":
# HTML format the email body into a numbered list
formatted_body = body_format(email_body)
# Format the email with the html template
formatted_email = html_formatting(formatted_body, project)
# Send the email and print confirmation
email_sender = bot_email
email_password = os.environ["SECRET_TOKEN"]
email_receiver = 'wrw@destinypharma.com'
send_email(formatted_email, email_sender, email_password, email_receiver, project)
print('High priority papers found - push email sent')
else:
print('No new high priority papers identified')
# Format Rank and New paper values
formatted_string = f''
for key, value in changes.items():
temp_string = f'Rank {key}: {value}, '
formatted_string += temp_string
formatted_string = formatted_string[:-2]
# Format final log string
date = datetime.datetime.now()
date = date.strftime("%H:%M %d/%m/%Y")
changes_log = f'{date}, {formatted_string}'
if fail:
changes_log += 'RUN FAILED'
# Write to file and print
with open(f'{directory}log.txt', 'a') as log:
log.write(changes_log + '\n')
print(changes_log)
# Print closing message
print('Query complete - returning to sleep')
if __name__ == '__main__':
main()