-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathquery.py
204 lines (168 loc) · 6.57 KB
/
query.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
import sqlite3
from itertools import chain
from contextlib import closing
from constants import SOURCES, SOURCE_BASE, DATABASES
from query_functions import strip_dates, TIME_MAP, CALENDAR_MAP
class Query(object):
# Basic query strings
BASE = 'SELECT word, SUM(count), date as "[date]" FROM hw'
TOTAL = 'GROUP BY word ORDER BY SUM(count) DESC'
DATE = 'WHERE "[date]"=?'
SINCE = 'WHERE "[date]">=?'
TIMESPAN = 'WHERE "[date]">=? AND "[date]"<?'
SINGLE_WORD = ' AND word=?'
SINGLE_WORD_ONLY = 'WHERE word=?'
# Compound query strings
OVERALL_TOTAL = ' '.join([BASE, TOTAL])
SPECIFIC_DATE = ' '.join([BASE, DATE, TOTAL])
SINCE_DATE = ' '.join([BASE, SINCE, TOTAL])
DATE_RANGE = ' '.join([BASE, TIMESPAN, TOTAL])
WORD_ON_DATE = ' '.join([BASE, DATE, SINGLE_WORD])
WORD_SINCE = ' '.join([BASE, SINCE, SINGLE_WORD])
WORD_RANGE = ' '.join([BASE, TIMESPAN, SINGLE_WORD])
WORD_EVER = ' '.join([BASE, SINGLE_WORD_ONLY])
def __init__(self, db):
self.dbname = db
self.conn = sqlite3.connect(self.dbname,
detect_types=sqlite3.PARSE_DECLTYPES
|sqlite3.PARSE_COLNAMES)
self.cur = self.conn.cursor()
def close(self):
if self.conn:
self.conn.commit()
self.conn.close()
def ondate(self, date):
""" Return counts for all words ON date. """
if date in TIME_MAP:
date = TIME_MAP[date]
self.cur.execute(self.__class__.SPECIFIC_DATE, (date,))
return self.cur.fetchall()
def since(self, date):
""" Return counts for all words SINCE date. """
if date in TIME_MAP:
date = TIME_MAP[date]
self.cur.execute(self.__class__.SINCE_DATE, (date,))
return self.cur.fetchall()
def between(self, date1, date2):
""" Return counts for all words BETWEEN the two dates. """
if date1 in CALENDAR_MAP:
date1 = CALENDAR_MAP[date1]
if date2 in CALENDAR_MAP:
date2 = CALENDAR_MAP[date2]
self.cur.execute(self.__class__.DATE_RANGE, (date1, date2))
return self.cur.fetchall()
def ever(self):
""" Return counts for all words EVER. """
self.cur.execute(self.__class__.OVERALL_TOTAL)
return self.cur.fetchall()
def word_ondate(self, date, word):
""" Return count for a single WORD ON DATE. """
if date in TIME_MAP:
date = TIME_MAP[date]
self.cur.execute(self.__class__.WORD_ON_DATE, (date, word))
return self.cur.fetchall()
def word_since(self, date, word):
""" Return counts for single WORD SINCE date. """
if date in TIME_MAP:
date = TIME_MAP[date]
self.cur.execute(self.__class__.WORD_SINCE, (date, word))
return self.cur.fetchall()
def word_between(self, date1, date2, word):
""" Return counts for single WORD BETWEEN the two dates. """
if date1 in CALENDAR_MAP:
date1 = CALENDAR_MAP[date1]
if date2 in CALENDAR_MAP:
date2 = CALENDAR_MAP[date2]
self.cur.execute(self.__class__.WORD_RANGE, (date1, date2, word))
return self.cur.fetchall()
def word_ever(self, word):
""" Return count for a single word EVER. """
self.cur.execute(self.__class__.WORD_EVER, (word,))
return self.cur.fetchall()
def data(dbname, method, date1=None, date2=None, word=None):
"""
Execute the relevant method from the Query class, using the
relevant database and parameters.
dbname : database to query
method : method to call for the query
date1 : single or first date to query
can be a date object or key to a date object
date2 : second date in a query with a range
can be a date object or key to a date object
word : the specific word to query
"""
db = DATABASES[dbname]
with closing( Query(db) ) as opendb:
METHODS = {"ondate": opendb.ondate,
"since": opendb.since,
"between": opendb.between,
"word_ondate": opendb.word_ondate,
"word_since": opendb.word_since,
"word_between": opendb.word_between,
"ever": opendb.ever,
"word_ever": opendb.word_ever}
if date1 and date2 and word:
data = METHODS[method](date1, date2, word)
elif date1 and date2:
data = METHODS[method](date1, date2)
elif date1 and word:
data = METHODS[method](date1, word)
elif date1:
data = METHODS[method](date1)
elif word:
data = METHODS[method](word)
else:
data = METHODS[method]()
return strip_dates(data)
def word_count(db, word, dates):
"""
Return the number of times a single word was used
on each of the supplied dates.
db : the database to query
word : the target word
dates : days to get word counts
Example: searching for 'election' over the last week
>>> word_counts('bbc', 'election', [list of date objects])
>>> [1, 3, 0, 4, 6, 7, 2]
"""
counts = []
for date in dates:
for _, count in data(db, "word_ondate", date, word):
if count:
counts.append(count)
else:
counts.append(0)
return counts
def word_count_period(db, word, dates):
"""
Return number of times a single word was used in each period.
db : database to query
word : target word
dates : pairs of date objects delimiting each query period
"""
counts = []
for date in dates:
start, end = date
for _, count in data(db, "word_between", start, end, word):
if count:
counts.append(count)
else:
counts.append(0)
return counts
def word_count_by_source(word, method, sources, date1=None, date2=None):
"""
Return the number of times a word was used by each of the specified
sources, either ever or since/between the specified dates.
word : target word
method : query object method to use
source : sources to search
date1 : single or first date to query
date2 : second date in a query with a range
"""
counts = {SOURCE_BASE[source]["title"]: 0 for source in sources}
for source in sources:
info = data(source, method, date1=date1, date2=date2, word=word)
*_, count = chain.from_iterable(info)
if count:
counts[SOURCE_BASE[source]["title"]] = count
return counts