Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add retries to S3 delete_bucket #31192

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions airflow/providers/amazon/aws/hooks/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from io import BytesIO
from pathlib import Path
from tempfile import NamedTemporaryFile, gettempdir
from time import sleep
from typing import TYPE_CHECKING, Any, Callable, TypeVar, cast
from urllib.parse import urlsplit
from uuid import uuid4
Expand Down Expand Up @@ -891,7 +892,7 @@ def copy_object(
return response

@provide_bucket_name
def delete_bucket(self, bucket_name: str, force_delete: bool = False) -> None:
def delete_bucket(self, bucket_name: str, force_delete: bool = False, max_retries: int = 5) -> None:
"""
To delete s3 bucket, delete all s3 bucket objects and then delete the bucket.

Expand All @@ -900,12 +901,24 @@ def delete_bucket(self, bucket_name: str, force_delete: bool = False) -> None:

:param bucket_name: Bucket name
:param force_delete: Enable this to delete bucket even if not empty
:param max_retries: A bucket must be empty to be deleted. If force_delete is true,
then retries may help prevent a race condition between deleting objects in the
bucket and trying to delete the bucket.
:return: None
"""
tries_remaining = max_retries + 1
if force_delete:
bucket_keys = self.list_keys(bucket_name=bucket_name)
if bucket_keys:
while tries_remaining:
bucket_keys = self.list_keys(bucket_name=bucket_name)
if not bucket_keys:
break
if tries_remaining <= max_retries:
# Avoid first loop
sleep(500)

self.delete_objects(bucket=bucket_name, keys=bucket_keys)
tries_remaining -= 1

self.conn.delete_bucket(Bucket=bucket_name)

def delete_objects(self, bucket: str, keys: str | list) -> None:
Expand Down