forked from Mondego/spacetime-crawler4py
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathretrieve_query.py
196 lines (149 loc) · 6.67 KB
/
retrieve_query.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# Once you have built the inverted index, you are ready to test document retrieval with queries. At the very least, the search should be able to deal with boolean queries: AND only. If you wish, you can sort the retrieved documents based on tf-idf scoring (you are not required to do so now, but it will be required for the final search engine). This can be done using the cosine similarity method. Feel free to use a library to compute cosine similarity once you have the term frequencies and inverse document frequencies (although it should be very easy for you to write your own implementation).
# You may also add other weighting/scoring mechanisms to help refine the search results.
import pickle
from typing import Any
from index_construction import resulting_pickle_file_name, Posting
from multiprocessing import Pool, cpu_count
import threading
token = str
EXTRA_PRINTS_ACTIVE: bool = False
MAX_LINKS_SHOWN: int = 5
TESTING: bool = False
def get_unpickled_document(pickle_file: str) -> Any:
with open(pickle_file, 'rb') as opened_pickle_file:
return pickle.load(opened_pickle_file)
if TESTING:
inverted_index = {
"hello": [Posting(2, 2, 3), Posting(3, 2, 3), Posting(4, 2, 3), Posting(1, 2, 3), Posting(5, 2, 3),],
"run": [Posting(2, 2, 3)],
"walk": [Posting(3, 2, 3)],
"live": [Posting(4, 2, 3)],
"exist": [Posting(5, 2, 3)],
"believe": [Posting(5, 2, 3)],
"goodbye": [Posting(6, 2, 3)],
}
else:
inverted_index: dict[token, list[Posting]
] = get_unpickled_document(resulting_pickle_file_name)
if EXTRA_PRINTS_ACTIVE:
print(f'inverted_index= {inverted_index}')
def get_query_result(query_term: token) -> set[Posting]:
return set(inverted_index.get(query_term)) if inverted_index.get(query_term) else set()
def get_query_results_and(query_terms: list[token]) -> list[Posting]:
result = set()
for query in query_terms:
query_res: list[Posting] = get_query_result(query)
result.intersection_update(query_res)
return sorted(result, key=lambda curr_posting: curr_posting.tf_idf)
def get_query_results_and_multithreaded(query_terms: list[token]) -> list[Posting]:
if not query_terms:
return list()
results: list[Posting] = list()
results_lock: threading.Lock = threading.Lock()
threads: list[threading.Thread] = list()
def get_internal_query_result(query_term: token) -> list[Posting]:
query_res = get_query_result(query_term)
with results_lock:
results.append(query_res)
for query in query_terms:
if EXTRA_PRINTS_ACTIVE:
print(f'Args = {query}')
new_thread = threading.Thread(
target=get_internal_query_result, args=[query])
threads.append(new_thread)
new_thread.start()
for thread in threads:
thread.join()
if not results:
return None
final_result: set[Posting] = results[0]
for result in results[1:]:
final_result.intersection_update(result)
return sorted(final_result, key=lambda curr_posting: curr_posting.tf_idf)
def parse_queries(query_list: list[token]) -> list[list[token]]:
result: list[list[token]] = list()
curr_query_list: list[token] = list()
for query in query_list.split():
match query:
case 'AND':
continue
case 'OR':
result.append(curr_query_list)
curr_query_list = list()
case _:
curr_query_list.append(query.lower())
# this should be altered when/if we take positioning into account
# currently any phrase together is still just two and statements
result.append(curr_query_list)
# if query is AND, skip and keep appending to the same list
# if query is OR, append curr_query_list to result list and then 0 out curr_query_list
# go next
# else query is regular query, keep appending
return result
def get_query_results_from_user_input(queries_list: list[list[token]]) -> list[Posting]:
query_results: list[list[Posting]] = list()
query_results_lock: threading.Lock = threading.Lock()
threads: list[threading.Threads] = list()
def append_query_to_query_results(query_terms):
query_res: list[Posting] = get_query_results_and_multithreaded(
query_terms)
with query_results_lock:
query_results.append(query_res)
for query in queries_list:
new_thread = threading.Thread(
target=append_query_to_query_results, args=[query])
threads.append(new_thread)
new_thread.start()
for thread in threads:
thread.join()
if EXTRA_PRINTS_ACTIVE:
print(f'query_results = {query_results}')
if not query_results:
return list()
final_result = set(query_results[0])
for result in query_results[1:]:
final_result = final_result.union(result)
return sorted(final_result, key=lambda posting: posting.tf_idf)
exit_statements = ["EXIT PLZ", "GOODBYE QUERY"]
def main():
while True:
try:
user_query = input(f'Please input your next query: ')
if user_query in exit_statements:
print("Bye bye.")
break
except EOFError:
print("User entered Ctrl + D. Bye bye.")
break
except KeyboardInterrupt:
print("Goodbye you little interuptee")
break
# if TESTING:
# print(f'{curr_query_from_user}')
# print(f'Getting first query singlethreaded:')
# single_threaded = get_query_results_and(curr_query_from_user[0])
# print(f'{single_threaded}')
# print(f'Getting first query multithreaded:', end='\t')
# result = get_query_results_and_multithreaded(
# curr_query_from_user[0])
# print(f'{result}')
# print(f'Got correct result: single = multi', end='\t')
# print(f'{single_threaded == result}')
# # query_results: list[Posting] = get_query_results_from_user_input(
# # curr_query_from_user)
parsed_user_input = parse_queries(user_query)
if EXTRA_PRINTS_ACTIVE:
print(f'Parsed user query = {parsed_user_input}')
query_results = get_query_results_from_user_input(parsed_user_input)
if not query_results:
query_links = None
else:
query_links = [
curr_posting.doc_id for curr_posting in query_results[:MAX_LINKS_SHOWN]]
if not query_links:
print(f'No results found')
else:
for link in query_links:
print(link)
if __name__ == "__main__":
main()