-
Notifications
You must be signed in to change notification settings - Fork 0
/
handler.py
291 lines (235 loc) · 10.8 KB
/
handler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
import os
import re
import time
import json
import aiodns
import aiohttp
import asyncio
import janus
import logging
import ipaddress
import threading
import validators
from dns.resolver import Resolver
from dns.exception import DNSException
logging.basicConfig(level=os.getenv("LOG_LEVEL", logging.DEBUG),
format='%(asctime)s - %(levelname)s - %(name)s - %(message)s')
logger = logging.getLogger(__name__)
# async def resolve_dns_name(dns_name):
# hosts_list = []
# async with aiodns.DNSResolver(loop=asyncio.get_event_loop()) as resolver:
# try:
# async with resolver.query(host=dns_name, ) as response:
# # create a list of ip addresses
# for item in response:
# hosts_list.append(item.host)
# except aiodns.error.DNSError as error:
# logger.warning(f'Error during resolving: {error}')
# return hosts_list
async def resolve_dns_name(dns_name: str) -> list[str]:
hosts_list = []
resolver = aiodns.DNSResolver(loop=asyncio.get_event_loop())
try:
response = await resolver.query(host=dns_name, qtype='A')
for item in response:
hosts_list.append(item.host)
except aiodns.error.DNSError as error:
logger.warning(f'Error resolving DNS name: {error}')
return hosts_list
def resolve_dns_name_blocking(dns_name: str, resolver: Resolver) -> tuple[list, int]:
hosts_list = []
resolving_errors_count = 0
try:
response = resolver.resolve(dns_name, 'A')
for item in response:
hosts_list.append(item.to_text())
except DNSException as error:
logger.warning(f'Error resolving DNS name: {error}')
resolving_errors_count = 1
return hosts_list, resolving_errors_count
def check_if_ip_in_subnet(ip, subnet, dns_name) -> bool:
ip_in_subnet = False
if ipaddress.ip_address(ip) in ipaddress.ip_network(subnet):
ip_in_subnet = True
return ip_in_subnet
def read_file_to_list(path: str) -> list:
file_data_list = []
try:
with open(path, 'r') as file:
for line in file.readlines():
# rstrip() removes any types of trailing whitespace
# including spaces, newlines etc.
file_data_list.append(line.rstrip())
except OSError as error:
logger.error(f'Could not open file {path}. Error: {error}')
# possibly we need to trigger sys.exit here
# I'll do it later
return file_data_list
def subnet_to_ips(subnet: str) -> set:
set_of_ips = set()
for ip in ipaddress.ip_network(subnet):
set_of_ips.add(str(ip))
return set_of_ips
def subnets_to_ips(subnets_set: set) -> set:
full_set_of_ips = set()
for subnet in subnets_set:
for ip in subnet_to_ips(subnet=subnet):
full_set_of_ips.add(ip)
logger.info(f'{len(full_set_of_ips)} blocked ip addresses discovered')
return full_set_of_ips
def is_valid_domain(domain_name: str) -> bool:
is_valid = False
regex = "^((?!-)[_A-Za-z0-9-]{1,63}(?<!-)\\.)+[A-Za-z]{2,6}\\.?$"
p = re.compile(regex)
if domain_name != '':
if re.search(p, domain_name):
is_valid = True
return is_valid
def validate_domains(domains_list: list) -> set:
valid_domains_set = set()
invalid_domains = duplicated_domains = 0
for domain_name in domains_list:
# the best idea is to use validators.domain(domain_name)
# but this function handle underscore as invalid character
# in domain naming. That is why we use simple regex
#
# check if line is a valid domain
if is_valid_domain(domain_name=domain_name):
# check duplicated domains ...
if domain_name in valid_domains_set:
logger.debug(f'{domain_name} domain is already exist')
duplicated_domains += 1
# ... because set().add() method doesn't raise
# error if item is already exist in a set
valid_domains_set.add(domain_name)
else:
logger.debug(f'This line is not a valid domain: {domain_name}')
invalid_domains += 1
if invalid_domains:
logger.warning(f'{invalid_domains} invalid domain(s) discovered! Check input file')
if duplicated_domains:
logger.warning(f'{duplicated_domains} duplicated domain(s) found')
logger.info(f'{len(valid_domains_set)} domains discovered')
return valid_domains_set
def return_domain_metrics(dns_name: str, ips_list: list, blocked_ips_set: set, ip_in_label: bool) -> str:
blocked_ip_count = 0
metrics_str = ''
for ip in ips_list:
blocked = False
if ip in blocked_ips_set:
blocked_ip_count += 1
blocked = True
if ip_in_label:
metrics_str += f'rkn_resolved_ip_blocked{{domain_name="{dns_name}",ip="{ip}"}} {1 if blocked else 0}\n'
metrics_str += f'rkn_resolved_ip_count{{domain_name="{dns_name}"}} {len(ips_list)}\n'
metrics_str += f'rkn_resolved_ip_blocked_count{{domain_name="{dns_name}"}} {blocked_ip_count}\n'
metrics_str += f'rkn_resolved_success{{domain_name="{dns_name}"}} {1 if ips_list else 0}\n'
return metrics_str
def time_diff(time_old: float) -> float:
return round(time.time() - time_old, 5)
def return_metrics(domains_set_queue: janus.Queue, blocked_ips_set: set, resolver: Resolver, ip_in_label: bool) -> str:
# check in what thread we are
thread_id = threading.get_ident()
logger.debug(f'Thread id: {thread_id}; Starting thread...')
metrics = ''
domains_count = resolving_errors_count = 0
time_start = time.time()
while domains_set_queue.sync_q.qsize() != 0:
logger.debug(f'Thread id: {thread_id}; Current queue size is {domains_set_queue.sync_q.qsize()} element(s)')
# a fact that queue is not empty is checked above in while loop
# condition but we should handle Queue.sync_q.get() event because
# if the queue had the last item it might be retrieved from another
# thread between .qsize() and .get() events in the current thread
try:
time_queue = time.time()
dns_name = domains_set_queue.sync_q.get(block=False)
logger.debug(f'Thread id: {thread_id}; Domain {dns_name} retrieved from the queue in {time_diff(time_queue)}s')
time_resolving = time.time()
domains_count += 1
ips_list, errors = resolve_dns_name_blocking(dns_name, resolver)
resolving_errors_count += errors
logger.debug(f'Thread id: {thread_id}; Domain {dns_name} resolved in {time_diff(time_resolving)}s')
time_subnets = time.time()
metrics += return_domain_metrics(dns_name=dns_name,
ips_list=ips_list,
blocked_ips_set=blocked_ips_set,
ip_in_label=ip_in_label)
logger.debug(f'Thread id: {thread_id}; Checked if ip address(es) of domain {dns_name} are blocked in {time_diff(time_subnets)}s')
domains_set_queue.sync_q.task_done()
# empty exception
except janus.SyncQueueEmpty as _:
logger.debug(f'Thread id: {thread_id}; Queue is empty because the last element was retrieved from another thread')
logger.info(f'Thread id: {thread_id}; Thread is finished after {time_diff(time_start)}s. {domains_count} domains processed. {resolving_errors_count} resolving errors')
return metrics
def fill_queue(queue: janus.Queue[str], domains_set: set) -> None:
sync_q = queue.sync_q
if sync_q.qsize() == 0:
logger.debug(f'Starting filling a queue')
for item in domains_set:
sync_q.put(item=item)
logger.debug(f'Queue filling is finished')
else:
logger.error(f'Queue is not empty! Queue with max size {sync_q.maxsize} already has {sync_q.qsize()} element(s)')
async def get_data(url: str) -> json:
# json.loads() requires str beginning with a JSON document
json_body = json.loads('{}')
async with aiohttp.ClientSession() as client:
try:
async with client.get(url) as r:
status = r.status
logger.info(f'Requesting url {url}')
logger.debug(f'Full response: {r}')
if status == 200:
json_body = await r.json()
else:
logger.error(f'Cannot request url {url}! Response status: {status}')
except aiohttp.ClientError as error:
logger.error(f'Connection error to url {url}: {error}')
return json_body
def ip_converter(subnets_list: list) -> set:
"""Input list may include both ipv4/ipv6 or network subnets. One item per line"""
blocked_subnets_set = set()
invalid_string_counter = 0
invalid_strings_list = []
for item in subnets_list:
if validators.ipv6(item):
# ipv6 is not supported yet
pass
elif validators.ipv4(item):
# convert ipv4 to subnet
ipv4_network = str(ipaddress.ip_network(item))
blocked_subnets_set.add(ipv4_network)
elif validators.ipv4_cidr(item):
blocked_subnets_set.add(item)
else:
logger.debug(f'This string is neither IPv4/IPv6 address nor IP subnet: {item}')
invalid_string_counter += 1
invalid_strings_list.append(item)
logger.info(f'{len(blocked_subnets_set)} subnets in blocked subnets list')
if invalid_string_counter:
logger.warning(f'{invalid_string_counter} invalid strings were discovered during input list analyzing')
logger.debug(f'Here a full list of all invalid strings of IPv4/IPv6/subnet: {invalid_strings_list}')
return blocked_subnets_set
async def data_handler(path: str) -> set:
"""check what is path - a valid url or path to a file"""
if validators.url(path):
logger.info(f'Trying to access url {path} to retrieve blocked subnets list')
raw_data = await get_data(url=path)
# blocked_subnets_set = set(ip_converter(raw_data))
else:
logger.info(f'Trying to access file {path} to retrieve blocked subnets list')
raw_data = read_file_to_list(path=path)
return set(ip_converter(raw_data))
def normalize_dns(dns_str: str) -> list:
# remove any whitespaces from string first then split string into a list
# comma is default delimiter
dns_servers_list = (dns_str.replace(' ', '')).split(',')
valid_list = []
for dns_server_str in dns_servers_list:
# check if string is a valid ipv4 address
if validators.ipv4(dns_server_str):
valid_list.append(dns_server_str)
else:
logger.debug(f'{dns_server_str} is not a valid ip address of DNS server!')
logger.debug(f'DNS server(s) to proceed: {valid_list}')
return valid_list