-
Notifications
You must be signed in to change notification settings - Fork 37
/
Copy paths3wipe
executable file
·241 lines (193 loc) · 8.69 KB
/
s3wipe
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
#!/usr/bin/env python3
import argparse, queue, logging, random, sys
import multiprocessing, signal, re
from multiprocessing.pool import ThreadPool
import boto.s3.connection
version = "0.2"
# Make sure we have a semi-recent version of boto installed
try:
import boto.s3.connection
if tuple(map(int, boto.__version__.split("."))) < (2,5,2):
raise Exception
except:
print("ERROR: s3wipe requires boto v2.5.2 or later!")
sys.exit(1)
# Format specifier for our S3 object
def s3Path(path):
match = re.match('s3://([^/]+)(?:/([^/]+(?:/[^/]+)*))?$', path)
if match:
return match.groups()
else:
raise argparse.ArgumentTypeError(
"must be in the 's3://bucket[/path]' format")
# Fetch our command line arguments
def getArgs():
parser = argparse.ArgumentParser(
prog="s3wipe",
description="Recursively delete all keys in an S3 path",
formatter_class=lambda prog:
argparse.HelpFormatter(prog,max_help_position=27))
parser.add_argument("--path", type=s3Path,
help="S3 path to delete (e.g. s3://bucket/path)", required=True)
parser.add_argument("--id",
help="Your AWS access key ID", required=False)
parser.add_argument("--key",
help="Your AWS secret access key", required=False)
parser.add_argument("--dryrun",
help="Don't delete. Print what we would have deleted",
action='store_true')
parser.add_argument("--quiet",
help="Suprress all non-error output", action='store_true')
parser.add_argument("--batchsize",
help="# of keys to batch delete (default 100)",
type=int, default=100)
parser.add_argument("--maxqueue",
help="Max size of deletion queue (default 10k)",
type=int, default=10000)
parser.add_argument("--maxthreads",
help="Max number of threads (default 100)",
type=int, default=100)
parser.add_argument("--delbucket",
help="If S3 path is a bucket path, delete the bucket also",
action='store_true')
return parser.parse_args()
# Set up our logging object
def loggerSetup(args):
# Set our maximum severity level to log (i.e. debug or not)
if args.quiet:
logLevel = logging.ERROR
else:
logLevel = logging.DEBUG
# Log configuration
logging.basicConfig(
level=logLevel,
format="%(asctime)s %(levelname)s: %(message)s",
datefmt="[%Y-%m-%d@%H:%M:%S]"
)
# Create logger and point it at our log file
global logger
logger = logging.getLogger("s3wipe")
# Make the logger emit all unhandled exceptions
sys.excepthook = lambda t, v, x: logger.error(
"Uncaught exception", exc_info=(t,v,x))
# Supress boto debug logging, since it is very chatty
logging.getLogger("boto").setLevel(logging.CRITICAL)
# Our deletion worker, called by Threadpool
def deleter(args, rmQueue, numThreads):
# Set up per-thread boto objects
myconn = boto.s3.connection.S3Connection(
aws_access_key_id=args.id,
aws_secret_access_key=args.key)
bucket, path = args.path
mybucket = myconn.get_bucket(bucket)
done = False
rmKeys = []
while True:
# Snatch a key off our deletion queue and add it
# to our local deletion list
rmKey = rmQueue.get()
rmKeys.append(rmKey)
# Poll our deletion queue until it is empty or
# until we have accumulated enough keys in this
# thread's delete list to justify a batch delete
if len(rmKeys) >= args.batchsize or rmQueue.empty():
try:
if args.dryrun:
for key in rmKeys:
logger.info("Would have deleted '%s'" % key.name)
else:
mybucket.delete_keys(rmKeys)
except:
continue
with keysDeleted.get_lock():
keysDeleted.value += len(rmKeys)
rmKeys = []
# Print some progress info
if random.randint(0,numThreads) == numThreads and not args.dryrun:
logger.info("Deleted %s out of %s keys found thus far.",
keysDeleted.value, keysFound.value)
rmQueue.task_done()
# Set the global vars for our listing threads
def listInit(arg1, arg2):
global args, rmQueue
args = arg1
rmQueue = arg2
# Our listing worker, which will poll the s3 bucket mericlessly and
# insert all objects found into the deletion queue.
def lister(subDir):
# Set up our per-thread boto connection
myconn = boto.s3.connection.S3Connection(
aws_access_key_id=args.id,
aws_secret_access_key=args.key)
bucket, path = args.path
mybucket = myconn.get_bucket(bucket)
# Iterate through bucket and enqueue all keys found in
# our deletion queue
for key in mybucket.list_versions(prefix=subDir.name):
rmQueue.put(key)
with keysFound.get_lock():
keysFound.value += 1
# Our main function
def main():
# Parse arguments
args = getArgs()
# Set up the logging object
loggerSetup(args)
rmQueue = queue.Queue(maxsize=args.maxqueue)
# Catch ctrl-c to exit cleanly
signal.signal(signal.SIGINT, lambda x,y: sys.exit(0))
# Our thread-safe variables, used for progress tracking
global keysFound, keysDeleted
keysFound = multiprocessing.Value("i",0)
keysDeleted = multiprocessing.Value("i",0)
bucket, path = args.path
logger.info("Deleting from bucket: %s, path: %s" % (bucket,path))
logger.info("Getting subdirs to feed to list threads")
# Our main boto object. Really only used to start the
# watcher threads on a per-subdir basis
conn = boto.s3.connection.S3Connection(
aws_access_key_id=args.id,
aws_secret_access_key=args.key)
try:
mybucket = conn.get_bucket(bucket)
except boto.s3.connection.S3ResponseError as e:
shortErr = str(e).split('\n')[0]
logger.error(shortErr)
sys.exit(1)
mybucket.configure_versioning(True)
# Poll the root-level directories in the s3 bucket, and
# start a reader process for each one of them
subDirs = list(mybucket.list_versions(prefix=path, delimiter="/"))
listThreads = len(subDirs)
deleteThreads = listThreads*2
# Now start all of our delete & list threads
if listThreads > 0:
# Limit number of threads to specific maximum.
if (listThreads + deleteThreads) > args.maxthreads:
listThreads = args.maxthreads / 3
deleteThreads = args.maxthreads - listThreads
logger.info("Starting %s delete threads..." % deleteThreads)
deleterPool = ThreadPool(processes=deleteThreads,
initializer=deleter, initargs=(args, rmQueue, deleteThreads))
logger.info("Starting %s list threads..." % listThreads)
listerPool = ThreadPool(processes=listThreads,
initializer=listInit, initargs=(args, rmQueue))
# Feed the root-level subdirs to our listing process, which
# will in-turn populate the deletion queue, which feed the
# deletion threads
listerPool.map(lister, subDirs)
rmQueue.join()
logger.info("Done deleting keys")
if args.delbucket and path is None:
if list(mybucket.list_versions(delimiter='/')):
logger.info("Bucket not empty. Not removing (this can happen " +
"when deleting large amounts of files. It sometimes takes " +
"the S3 service a while (minutes to days) to catch up.")
else:
if args.dryrun:
logger.info("Bucket is empty. Would have removed bucket")
else:
logger.info("Bucket is empty. Attempting to remove bucket")
conn.delete_bucket(mybucket)
if __name__ == "__main__":
main()