-
Notifications
You must be signed in to change notification settings - Fork 1
/
main_threaded.py
124 lines (111 loc) · 5.62 KB
/
main_threaded.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
from colorama import Fore, init
from bs4 import BeautifulSoup
import aiofiles
import asyncio
import aiohttp
#before running for the first time remember to set your headers
headers = {}
async def request_site(session, current_number):
try:
async with session.get(f"https://www.reversephonelookup.com/number/{current_number}/", headers=headers) as response:
html = await response.text()
if "Access your full phone report and unlock all associated data." and "ReversePhoneLookup.com" and "AT&T" in html:
print(Fore.GREEN + f"{current_number} || VALID || AT&T")
soup = BeautifulSoup(html, 'html.parser')
ownername = soup.find("h3")
generalinfo = soup.find("ul")
if generalinfo and ownername:
async with aiofiles.open("data/AT&T_data.txt", "a") as o:
ownername = ownername.get_text().strip()
await o.write(f"{current_number} - {ownername}\n")
if generalinfo:
address = carrier = None
for li in generalinfo.find_all("li"):
text = li.get_text().strip()
if text.startswith(" "):
address = text
elif text.startswith("Carrier Type"):
carrier = text.split(":")[1].strip()
if address:
await o.write(f"{address}\n")
if carrier:
await o.write(f"{carrier}\n")
await o.write("------------------------\n")
else:
print(Fore.RED + f"{current_number} || Element not found.")
elif "Access your full phone report and unlock all associated data." and "ReversePhoneLookup.com" in html:
print(Fore.YELLOW + f"{current_number} || SEMI-VALID || NOT AT&T")
soup = BeautifulSoup(html, 'html.parser')
ownername = soup.find("h3")
generalinfo = soup.find("ul")
if generalinfo and ownername:
async with aiofiles.open("data/non_AT&T_data.txt", "a") as o:
ownername = ownername.get_text().strip()
await o.write(f"{current_number} - {ownername}\n")
if generalinfo:
address = carrier = None
for li in generalinfo.find_all("li"):
text = li.get_text().strip()
if text.startswith(" "):
address = text
elif text.startswith("Carrier Type"):
carrier = text.split(":")[1].strip()
if address:
await o.write(f"{address}\n")
if carrier:
await o.write(f"{carrier}\n")
await o.write("------------------------\n")
else:
print(Fore.RED + f"{current_number} || Element not found.")
else:
if response.status == 403:
print(Fore.RED + f"{current_number} || ERROR || STATUS CODE 403 - POSSIBLE FIX: UPDATE HEADERS - IF THIS DOESNT FIX CHANGE IP AND GET NEW HEADERS")
else:
print(Fore.RED + f"{current_number} || INVALID || DOESNT EXIST")
except Exception as e:
print(Fore.RED + str(e))
async def random_generate(session, input_number, delay_ms):
num_zeros = len(input_number) - len(input_number.rstrip('0'))
upper_bound = int(input_number) + 10**num_zeros - 1
tasks = []
for current_number in range(int(input_number), upper_bound + 1):
task = asyncio.ensure_future(request_site(session, str(current_number)))
tasks.append(task)
await asyncio.sleep(delay_ms / 1000)
await asyncio.gather(*tasks)
async def fromlist(session, delay_ms):
with open("phonenumbers.txt", "r") as o:
tasks = []
for current_number in o:
task = asyncio.ensure_future (request_site(session, current_number.strip()))
tasks.append(task)
await asyncio.sleep(delay_ms / 1000)
await asyncio.gather(*tasks)
async def main():
print(Fore.RED + """
1. Randomly generate US phone numbers - Threaded (you choose speed)
2. Load phone numbers from txt - Threaded (you choose speed)
0. Quit
""")
choice = int(input(Fore.GREEN + ">>"))
if choice == 0:
exit()
else:
pass
delay_ms = int(input(Fore.GREEN + "Enter delay in milliseconds (e.g: 500 = 0.5 seconds): "))
async with aiohttp.ClientSession() as session:
if choice == 1:
input_number = input(Fore.GREEN + "Enter a 10-digit number (e.g: 3343670000): ")
if len(input_number) != 10 or not input_number.isdigit():
print(Fore.RED + "Invalid input! Please enter a 10-digit number.")
return
await random_generate(session, input_number, delay_ms)
elif choice == 2:
await fromlist(session, delay_ms)
else:
print(Fore.RED + "Invalid choice")
def start():
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
if __name__ == "__main__":
start()