-
Notifications
You must be signed in to change notification settings - Fork 21
/
source_code.py
152 lines (124 loc) · 4.82 KB
/
source_code.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import re
import dns.resolver
import smtplib
import requests
import threading
import queue
import dns.reversename
CACHE_TTL = 600
# Initialize a DNS resolver with caching enabled
resolver = dns.resolver.Resolver(configure=False)
resolver.nameservers = ['8.8.8.8']
resolver.cache = dns.resolver.Cache()
# def is_valid_email(email):
# # Check if "@" is present in the email
# if "@" not in email:
# return False
# local_part, domain_part = email.split('@')
# # Check for consecutive dots, hyphens, or underscores in the local part
# if re.search(r'\.{2}|-{2}|_{2}', local_part):
# return False
# # Check for consecutive dots, hyphens, or underscores in the domain part
# if re.search(r'\.{2}|-{2}|_{2}', domain_part):
# return False
# # Check for two consecutive dots, hyphens, or underscores anywhere in the email
# if re.search(r'\.\-|\-\.|\.\.|\_\-|\-\_|\_\_|\.\.|--', email):
# return False
# # Validate email syntax
# pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
# return re.match(pattern, email) is not None
def is_valid_email(email):
# Comprehensive regex for email validation
pattern = r'''
^ # Start of string
(?!.*[._%+-]{2}) # No consecutive special characters
[a-zA-Z0-9._%+-]{1,64} # Local part: allowed characters and length limit
(?<![._%+-]) # No special characters at the end of local part
@ # "@" symbol
[a-zA-Z0-9.-]+ # Domain part: allowed characters
(?<![.-]) # No special characters at the end of domain
\.[a-zA-Z]{2,}$ # Top-level domain with minimum 2 characters
'''
# Match the entire email against the pattern
return re.match(pattern, email, re.VERBOSE) is not None
# mx record validation
# Set the cache TTL (in seconds)
def query_dns(record_type, domain):
try:
# Try to resolve the record from cache first
record_name = domain if record_type == 'MX' else f'{domain}.'
cache_result = resolver.cache.get((record_name, record_type))
if cache_result is not None and (dns.resolver.mtime() - cache_result.time) < CACHE_TTL:
return True
# Otherwise, perform a fresh DNS query
resolver.timeout = 2
resolver.lifetime = 2
resolver.resolve(record_name, record_type)
return True
except dns.resolver.NXDOMAIN:
# The domain does not exist
return False
except dns.resolver.NoAnswer:
# No record of the requested type was found
return False
except dns.resolver.Timeout:
# The query timed out
return False
except:
# An unexpected error occurred
return False
def has_valid_mx_record(domain):
# Define a function to handle each DNS query in a separate thread
def query_mx(results_queue):
results_queue.put(query_dns('MX', domain))
def query_a(results_queue):
results_queue.put(query_dns('A', domain))
# Start multiple threads to query the MX and A records simultaneously
mx_queue = queue.Queue()
a_queue = queue.Queue()
mx_thread = threading.Thread(target=query_mx, args=(mx_queue,))
a_thread = threading.Thread(target=query_a, args=(a_queue,))
mx_thread.start()
a_thread.start()
# Wait for both threads to finish and retrieve the results from the queues
mx_thread.join()
a_thread.join()
mx_result = mx_queue.get()
a_result = a_queue.get()
return mx_result or a_result
# smtp connection
def verify_email(email):
# Split the email address into username and domain parts
domain = email.split('@')[1]
# Check the domain MX records
try:
mx_records = dns.resolver.resolve(domain, 'MX')
except dns.resolver.NoAnswer:
return False
# Connect to the SMTP server and perform the email verification
for mx in mx_records:
try:
smtp_server = smtplib.SMTP(str(mx.exchange))
smtp_server.ehlo()
smtp_server.mail('')
code, message = smtp_server.rcpt(str(email))
smtp_server.quit()
if code == 250:
return True
except:
pass
return False
# temporary domain
def is_disposable(domain):
blacklists = [
'https://raw.githubusercontent.com/andreis/disposable-email-domains/master/domains.txt',
'https://raw.githubusercontent.com/wesbos/burner-email-providers/master/emails.txt'
]
for blacklist_url in blacklists:
try:
blacklist = set(requests.get(blacklist_url).text.strip().split('\n'))
if domain in blacklist:
return True
except Exception as e:
print(f'Error loading blacklist {blacklist_url}: {e}')
return False