-
Notifications
You must be signed in to change notification settings - Fork 0
/
hunt_abandoned_bucket.py
135 lines (115 loc) · 4.88 KB
/
hunt_abandoned_bucket.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#!/usr/bin/env python3
"""Search for possible subdomain takeover via abandoned Amazon S3 bucket.
Requires an existing domains.txt file in the working directory containing 1 domain per line.
This can be generated by amass, or any other tool of your liking.
Usage:
./hunt_abandoned_bucket.py
Author:
Ob1lan - 22-APRIL-2023
"""
import os.path
import time
import sys
import asyncio
import aiofiles
from tqdm import tqdm
import aiohttp
import dns.resolver
from aioretry import (
retry,
# Tuple[bool, Union[int, float]]
RetryPolicyStrategy,
RetryInfo
)
# Defines the retry policy used by aioretry
def retry_policy(info: RetryInfo) -> RetryPolicyStrategy:
"""
- It will always retry until succeeded
- If fails for the first time, it will retry immediately,
- If it fails again,
aioretry will perform a 100ms delay before the second retry,
200ms delay before the 3rd retry,
the 4th retry immediately,
100ms delay before the 5th retry,
etc...
"""
return False, (info.fails - 1) % 3 * 0.1
# Check if the domains.txt file is present in the working directory, if not, exit
if not os.path.exists("domains.txt"):
print("No domain.txt file detected.")
sys.exit(1)
# Check if the excluded.txt file is present, if not, create it
if not os.path.exists("excluded.txt"):
with open("excluded.txt", 'w', encoding="utf-8") as excludedfile:
excludedfile.close()
# Count the number of lines in domains.txt. Variable 'count' will be used in progress bar
with open(r"domains.txt", 'r', encoding="utf-8") as file:
COUNT = 0
for line in file:
if line != "\n":
COUNT += 1
file.close()
# This Semaphore declaration limits the number of concurrent requests, and prevents some errors
sem = asyncio.Semaphore(30)
# Set the header of the requests
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 '
'(KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36'}
# Handles the excluded.txt file to populate the variable used to exclude domains to be checked
with open("excluded.txt", 'r', encoding="utf-8") as exclusions:
excluded = exclusions.read()
exclusions.close()
@retry(retry_policy)
async def get(domain, session):
"""Function that formats domains into URL and queries them async to hunt for 404 responses.
Once a 404 response code is found, the function search for the NoSuchBucket keyword to determine
whether it could be an abandoned Amazon S3 Bucket or not.
"""
if domain.strip() not in excluded:
try:
url = "http://" + domain.strip()
async with sem:
async with session.get(url=url, timeout=10) as response:
await response.read()
if response.status == 404:
text = await response.text()
if "NoSuchBucket" in text:
print("Might be an Abandoned Amazon S3 Bucket: ", url)
answer: \
dns.resolver.Answer = dns.resolver.resolve(
domain.strip(), 'CNAME')
for rdata in answer:
print(" -> ", rdata)
findingsfile = open(
"findings.txt", "a", encoding="utf-8")
findingsfile.write(url + "\n")
findingsfile.close()
except (aiohttp.ClientConnectorError, asyncio.TimeoutError, aiohttp.ClientOSError,
aiohttp.ClientResponseError, aiohttp.ClientError, UnicodeError) as e:
async with aiofiles.open("errors.txt", mode='a') as errorfile:
await errorfile.write(f"{type(e).__name__}: {domain.strip()}\n")
else:
print("Excluded:", domain.strip())
async def main():
"""The main function that calls the get and log progress in a tqdm progress bar."""
async with aiohttp.ClientSession(headers=headers) as session:
ret = [get(domain, session) for domain in domains]
return [await f for f in tqdm(asyncio.as_completed(ret),
total=len(ret),
desc="Progress",
unit=" domains")]
# Start the process and log the time it takes to complete it.
with open(r"domains.txt", 'r', encoding="utf-8") as file:
domains = file
start = time.time()
# In case the script is executed on a Windows machine, this is needed.
try:
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
except AttributeError:
pass
asyncio.run(main())
end = time.time()
# Just for fun, so we know how long it took to query the domains
print(f'It took {end - start} seconds to query {COUNT} domains.')
# Close the file(s) we opened previously (might not be needed)
file.close()