-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathelection.py
executable file
·242 lines (189 loc) · 7.86 KB
/
election.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
#!/usr/bin/env python3
#
# This code is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This code is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this code. If not, see <http://www.gnu.org/licenses/>.
#
# Author: Nathaniel Case
# Ralph Bean -- http://threebean.org
# Beau Bouchard -- http://beaubouchard.com
import argparse
import asyncio
import os
import string
from bs4 import BeautifulSoup
import requests
from gitsupport import commit_all
from view import write_html, write_json, tabs, clear_tabs
BASE_URLS = {
'monroe': 'http://enr.monroecounty.gov/',
'suffolk': 'http://apps.suffolkcountyny.gov/boe/eleres/17ge/',
'chautauqua': 'http://72.45.245.14/BOE/Results/',
#'orange': 'http://boe.co.orange.ny.us/',
}
BASE_DIR = os.path.split(os.path.abspath(__file__))[0]
LOOP = asyncio.get_event_loop()
class Election(object):
def __init__(self, county):
self.county = county
self.logo = None
self.results = dict()
self.filepath = os.path.join(BASE_DIR, "data-submodule", self.county)
# Create storage directory if necessary
if not os.path.exists(self.filepath):
os.mkdir(self.filepath)
@asyncio.coroutine
def initial_read(self):
"""
Reads the contents of ElectionEvent.xml.
This file should not change during the election, so should only need to be
read once.
In case results are not yet available, it also zeroes out data.
"""
filename = yield from self.pull_file('ElectionEvent.xml')
self.logo = yield from self.pull_file('logo.jpg')
with open(filename) as file_:
xml = file_.read()
soup = BeautifulSoup(xml, 'lxml')
election = soup.find('election')
if not election:
#Something went wrong... bailing.
return
self.results['election'] = {'nm': election['nm'], 'des': election['des'], \
'jd': election['jd'], 'ts': election['ts'], 'pol': 0, 'clpol': 0}
contests = soup.findAll('contest')
self.results['contest'] = soup_to_dict(contests, 'id', ['nm', 'aid', 'el', 's', 'id'])
seen_aids = set(map(lambda x: x['aid'], self.results['contest'].values()))
areas = soup.findAll('area')
self.results['area'] = soup_to_dict(areas, 'id', ['nm', 'atid', 'el', 's', 'id'])
self.results['area'] = {k: v for k, v in self.results['area'].items() if k in seen_aids}
seen_atids = set(map(lambda x: x['atid'], self.results['area'].values()))
areatypes = soup.findAll('areatype')
self.results['areatype'] = soup_to_dict(areatypes, 'id', ['nm', 's', 'id'])
self.results['areatype'] = {k: v for k, v in self.results['areatype'].items() if k in seen_atids}
parties = soup.findAll('party')
self.results['party'] = soup_to_dict(parties, 'id', ['nm', 'ab', 's', 'id'])
choices = soup.findAll('choice')
self.results['choice'] = soup_to_dict(choices, 'id', ['nm', 'conid', 's', 'id'])
tabs(self.county, ' '.join((self.results['election']['jd'],
self.results['election']['des'])))
return self
@asyncio.coroutine
def scrape_results(self):
"""
Reads the contents of results.xml.
This is the file that has all the changing information, so this is the
method that should get run to update the values.
"""
filename = yield from self.pull_file('results.xml')
with open(filename) as file_:
xml = file_.read()
soup = BeautifulSoup(xml, 'lxml')
election = soup.find('results')
self.results['election'].update({
'ts': election['ts'],
'clpol': election['clpol'],
'pol': election['pol'],
'fin': election['fin']
})
results = soup_to_dict(soup.findAll('area'), 'id', ['bal', 'vot', 'pol', 'clpol'])
for id_ in results:
try:
self.results['area'][id_].update(results[id_])
except KeyError:
# We probably dropped it, so ignore.
pass
results = soup_to_dict(soup.findAll('contest'), 'id', ['bal', 'bl', 'uv', 'ov'])
for id_ in results:
self.results['contest'][id_].update(results[id_])
results = soup_to_dict(soup.findAll('choice'), 'id', ['vot', 'e'])
for id_ in results:
self.results['choice'][id_].update(results[id_])
@asyncio.coroutine
def pull_file(self, filename):
"""Pulls a file from a remote source and saves it to the disk."""
url = "%s%s" % (BASE_URLS[self.county], filename)
filepath = os.path.join(self.filepath, filename)
try:
future = LOOP.run_in_executor(None, requests.get, url)
resp = yield from future
if resp.status_code == 200 and resp.content:
with open(filepath, 'wb') as out_file:
out_file.write(resp.content)
commit_all(self.filepath)
except requests.exceptions.ConnectionError as e:
# Connection timed out, use the last one we have
print(e)
return filepath
def soup_to_dict(soup, key, values):
"""
Reads a bunch of attributes form an XML tag and puts them into a dict.
"""
data = dict()
for item in soup:
if not data.get(item[key]):
data[item[key]] = {}
for value in values:
if item.name == "choice" and value == "vot":
if not data[item[key]].get(value):
data[item[key]][value] = {}
if item.get('tot') == "1":
data[item[key]][value]['tot'] = int(item.get(value, 0))
else:
data[item[key]][value][item['pid']] = int(item.get(value, 0))
elif value == "e":
if item.get(value):
data[item[key]][value] = item[value]
elif value in ["bal", "s"]:
data[item[key]][value] = int(item.get(value, 1))
else:
data[item[key]][value] = string.capwords(item.get(value, '0'))
return data
@asyncio.coroutine
def scrape(election):
print("Scraping results for %s county" % election.county)
yield from election.scrape_results()
print("Writing json.")
write_json(election.results)
print("Writing html.")
write_html(election.county, election.results)
@asyncio.coroutine
def loop_or_not(county, options):
election = Election(county)
print("Reading data for %s" % county)
yield from election.initial_read()
if not election:
return
elif options.loop is False:
yield from scrape(election)
else:
while True:
yield from scrape(election)
yield from asyncio.sleep(options.interval)
def main():
parser = argparse.ArgumentParser()
parser.add_argument("-l", "--loop", dest="loop",
action="store_true", default=False,
help="run in a loop (infinitely)")
parser.add_argument("-i", "--interval", dest="interval",
default=120, type=int,
help="number of seconds to sleep between runs")
options = parser.parse_args()
clear_tabs()
tasks = [
asyncio.async(loop_or_not(county, options))
for county in BASE_URLS
]
LOOP.run_until_complete(asyncio.gather(*tasks))
LOOP.close()
if __name__ == "__main__":
main()