-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclean_all.py
428 lines (358 loc) · 14.7 KB
/
clean_all.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
import datetime
import warnings
from typing import Any, Dict, List
import google
import google.auth
from google.auth.credentials import Credentials
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from loguru import logger
SKIP_LABEL = "please-do-not-kill-me"
DiscoveryEndpoint = Any
class BaseDiscoveryClient:
endpoint = None
version = "v1"
def __init__(self, project_id: str, credentials: Credentials):
self.credentials = credentials
self.project_id = project_id
if self.endpoint is None:
raise Exception(
"Client class has to have `endpoint` attribute set to discovery endpoint name"
)
self.client = build(self.endpoint, self.version, credentials=self.credentials)
@staticmethod
def is_stale(date: str) -> bool:
try:
today = datetime.datetime.today()
time = datetime.datetime.strptime(date, "%Y-%m-%dT%H:%M:%S.%fZ")
except ValueError:
today = datetime.datetime.now(tz=datetime.timezone.utc)
time = datetime.datetime.fromisoformat(date)
return time < today - datetime.timedelta(days=1)
@staticmethod
def _iterate(
endpoint: DiscoveryEndpoint,
payload: Dict,
key: str = "items",
) -> List[Dict]:
"""
Iterates through endpoint.list(...).execute() discovery API endpoint
:param endpoint: An discovery API object for example ``self.client.instances()``
:param key: key used to get objects from list response
:param payload: keyword arguments passed to API list request
"""
request = endpoint.list(**payload)
instance_list = []
while request is not None:
response = request.execute()
instance_list.extend(response.get(key, []))
try:
request = endpoint.list_next( # pylint: disable=no-member
previous_request=request, previous_response=response
)
except AttributeError:
# In some cases API may return all resources in list request
break
return instance_list
@staticmethod
def _singular_name(name: str) -> str:
return name[:-1] if name.endswith("s") else name
def _delete(
self,
resource_name: str,
resource_id: str,
endpoint: DiscoveryEndpoint,
payload: Dict,
):
"""
Calls endpoint.delete(...).execute() to execute discovery API.
:param resource_name: name of object to delete, used for logging, for example ``clusters``
:param resource_id: id fo the resource to delete, used for logging
:param endpoint: An discovery API object for example ``self.client.instances()``
:param payload: keyword arguments passed to API delete request
"""
singular_name = self._singular_name(resource_name)
logger.info(f"Deleting {singular_name}: {resource_id}")
try:
endpoint.delete(**payload).execute()
except Exception as err: # pylint: disable=broad-except
logger.warning(f"Failed to delete {singular_name}: {err}")
def _delete_all_in_location(self, location):
raise NotImplementedError
def _delete_in_all_locations(self, locations, object_name: str):
for location in locations:
logger.debug(f"Deleting {object_name} in: {location}")
try:
self._delete_all_in_location(location=location)
except HttpError as err:
if int(err.resp["status"]) >= 500:
# Ignore server errors
pass
elif "Unexpected location" in str(err):
pass
else:
raise
class ComputeClient(BaseDiscoveryClient):
endpoint = "compute"
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._locations = []
self._zones = []
def _delete_all_in_location(self, location):
raise NotImplementedError(
"This function is not implemented for compute endpoint as there is more than one resource"
)
def _refresh_locations_and_zones(self, force: bool = False):
if self._locations and not force:
return
locations_objects = self._iterate(
endpoint=self.client.regions(), payload={"project": self.project_id}
)
locations, zones = [], []
for loc in locations_objects:
locations.append(loc["name"])
zones.extend([z.split("/")[-1] for z in loc.get("zones", [])])
self._zones = zones
self._locations = locations
@property
def locations(self):
if not self._locations:
self._refresh_locations_and_zones(force=True)
return self._locations
@property
def zones(self) -> List[str]:
if not self._zones:
self._refresh_locations_and_zones(force=True)
return self._zones
def _delete_all(self, endpoint_name: str) -> None:
"""
Iterates through all zones, lists object under given ``endpoint_name``
and then deletes those objects.
"""
for zone in self.zones:
logger.debug(f"Deleting compute {endpoint_name} in {zone}")
endpoint = getattr(self.client, endpoint_name)()
for obj in self._iterate(
endpoint=endpoint, payload={"project": self.project_id, "zone": zone}
):
is_not_labeled = SKIP_LABEL not in obj.get("labels", {})
is_stale = self.is_stale(obj["creationTimestamp"])
# In case of disk we check if it is used by anything else
# in case of other resources we return True
has_no_users = not bool(obj.get("users"))
if is_not_labeled and is_stale and has_no_users:
self._delete(
resource_name=endpoint_name,
resource_id=obj["id"],
endpoint=endpoint,
payload={
"project": self.project_id,
"zone": obj["zone"].split("/")[-1],
self._singular_name(endpoint_name): obj["id"],
},
)
def delete_all_disks(self) -> None:
self._delete_all("disks")
def delete_all_instances(self) -> None:
self._delete_all("instances")
class GKEClient(BaseDiscoveryClient):
endpoint = "container"
def _delete_all_in_location(self, location: str):
raise NotImplementedError(
"GKEClient is able to list all clusters in one request."
)
def delete_all_clusters(self):
logger.debug("Deleting GKE clusters in ALL locations")
endpoint = self.client.projects().locations().clusters()
list_response = endpoint.list(
parent=f"projects/{self.project_id}/locations/-"
).execute()
if "clusters" not in list_response:
logger.error("No `clusters` in in GKE api.")
return
for cluster in list_response["clusters"]:
if SKIP_LABEL not in cluster.get("resourceLabels", {}) and self.is_stale(
cluster["createTime"]
):
cluster_name = cluster["name"]
zone = cluster["zone"]
self._delete(
resource_name="cluster",
resource_id=cluster_name,
endpoint=self.client.projects().locations().clusters(),
payload={
"name": f"projects/{self.project_id}/locations/{zone}/clusters/{cluster_name}"
},
)
class DataprocClient(BaseDiscoveryClient):
endpoint = "dataproc"
def _delete_all_in_location(self, location: str):
clusters = self._iterate(
endpoint=self.client.projects().regions().clusters(),
key="clusters",
payload={
"projectId": self.project_id,
"region": location,
},
)
for cluster in clusters:
last_state_date = cluster["status"]["stateStartTime"]
if SKIP_LABEL not in cluster.get("labels", []) and self.is_stale(
last_state_date
):
self._delete(
resource_name="cluster",
resource_id=cluster["clusterName"],
endpoint=self.client.projects().regions().clusters(),
payload={
"projectId": self.project_id,
"region": location,
"clusterName": cluster["clusterName"],
},
)
def delete_all_clusters(self, locations: List[str]):
self._delete_in_all_locations(
locations=locations, object_name="dataproc clusters"
)
class ComposerClient(BaseDiscoveryClient):
endpoint = "composer"
def _delete_all_in_location(self, location: str) -> None:
conn = self.client.projects().locations().environments()
environments = self._iterate(
endpoint=conn,
key="environments",
payload={"parent": f"projects/{self.project_id}/locations/{location}"},
)
for env in environments:
if SKIP_LABEL not in env.get("labels", {}) and self.is_stale(
env["updateTime"]
):
self._delete(
resource_name="composer",
resource_id=env["name"].split("/")[-1],
endpoint=conn,
payload={"name": env["name"]},
)
def delete_all_environments(self, locations: List[str]) -> None:
self._delete_in_all_locations(locations=locations, object_name="composers")
class MemorystoreRedisClient(BaseDiscoveryClient):
endpoint = "redis"
def _delete_all_in_location(self, location: str):
instances = self._iterate(
endpoint=self.client.projects().locations().instances(),
key="instances",
payload={"parent": f"projects/{self.project_id}/locations/{location}"},
)
for instance in instances:
create_date = instance["createTime"]
if SKIP_LABEL not in instance.get("labels", []) and self.is_stale(
create_date
):
self._delete(
resource_name="instance",
resource_id=instance["name"],
endpoint=self.client.projects().locations().instances(),
payload={
"name": instance["name"],
},
)
def delete_all_instances(self, locations: List[str]):
self._delete_in_all_locations(
locations=locations, object_name="memorystore redis instances"
)
class SpannerClient(BaseDiscoveryClient):
endpoint = "spanner"
def _delete_all_in_location(self, location: str):
raise NotImplementedError(
"Spanner is able to list all instances in one request."
)
def delete_all_instances(self):
logger.debug("Deleting Spanner instances in ALL locations")
instances = self._iterate(
endpoint=self.client.projects().instances(),
key="instances",
payload={"parent": f"projects/{self.project_id}"},
)
for instance in instances:
if SKIP_LABEL not in instance.get("labels", []):
self._delete(
resource_name="instance",
resource_id=instance["name"],
endpoint=self.client.projects().instances(),
payload={
"name": instance["name"],
},
)
class BigTableClient(BaseDiscoveryClient):
endpoint = "bigtableadmin"
version = "v2"
def _delete_all_in_location(self, location: str):
raise NotImplementedError(
"BigTable is able to list all instances in one request."
)
def delete_all_instances(self):
logger.debug("Deleting BigTable instances in ALL locations")
instances = self._iterate(
endpoint=self.client.projects().instances(),
key="instances",
payload={"parent": f"projects/{self.project_id}"},
)
for instance in instances:
if SKIP_LABEL not in instance.get("labels", []):
self._delete(
resource_name="instance",
resource_id=instance["name"],
endpoint=self.client.projects().instances(),
payload={
"name": instance["name"],
},
)
def run_cleaning(name, func, **kwargs):
logger.warning(f"Attempting to clean {name}")
try:
func(**kwargs)
logger.info(f"Cleaning of {name} done")
except Exception: # pylint: disable=broad-except
logger.exception(f"Failed to clean {name}")
def delete_resources():
warnings.filterwarnings(
"ignore",
message=".*Your application has authenticated using end user credentials.*",
)
logger.info("Starting the cleanup 🐈")
# Discovery API
credentials, project_id = google.auth.default()
composer = ComposerClient(project_id=project_id, credentials=credentials)
gke = GKEClient(project_id=project_id, credentials=credentials)
dataproc = DataprocClient(project_id=project_id, credentials=credentials)
compute = ComputeClient(project_id=project_id, credentials=credentials)
memorystore_redis = MemorystoreRedisClient(
project_id=project_id, credentials=credentials
)
spanner = SpannerClient(project_id=project_id, credentials=credentials)
big_table = BigTableClient(project_id=project_id, credentials=credentials)
# Get locations and zones
locations = compute.locations
_ = compute.zones
# Clean everything we can, mind that the order may have impact
run_cleaning(
"composer instances", composer.delete_all_environments, locations=locations
)
run_cleaning("GKE clusters", gke.delete_all_clusters)
run_cleaning("dataproc clusters", dataproc.delete_all_clusters, locations=locations)
run_cleaning("compute instances", compute.delete_all_instances)
run_cleaning("compute disks", compute.delete_all_disks)
run_cleaning(
"memorystore redis instances",
memorystore_redis.delete_all_instances,
locations=locations,
)
run_cleaning(
"spanner instances",
spanner.delete_all_instances,
)
run_cleaning(
"big table instances",
big_table.delete_all_instances,
)
logger.info("Done")