Skip to content

Commit

Permalink
Merge pull request #3946 from nateprewitt/crt_transfer
Browse files Browse the repository at this point in the history
Add CRT Transfer option to Boto3 on select instance types
  • Loading branch information
nateprewitt authored Nov 27, 2023
2 parents 196a2da + 2118dc9 commit 675e9b3
Show file tree
Hide file tree
Showing 11 changed files with 666 additions and 13 deletions.
31 changes: 31 additions & 0 deletions .github/workflows/run-crt-test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
name: Run CRT tests

on:
push:
pull_request:
branches-ignore: [master]

permissions:
contents: read

jobs:
build:
runs-on: '${{ matrix.os }}'
strategy:
fail-fast: false
matrix:
python-version: ['3.7', '3.8', '3.9', '3.10', '3.11', '3.12']
os: [ubuntu-latest, macOS-latest, windows-latest]

steps:
- uses: actions/checkout@8ade135a41bc03ea155e62e844d188df1ea18608
- name: 'Set up Python ${{ matrix.python-version }}'
uses: actions/setup-python@61a6322f88396a6271a6ee3565807d608ecaddd1
with:
python-version: '${{ matrix.python-version }}'
- name: Install dependencies and CRT
run: |
python scripts/ci/install --extras crt
- name: Run tests
run: |
python scripts/ci/run-crt-tests --with-cov --with-xdist
167 changes: 167 additions & 0 deletions boto3/crt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
# Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# https://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""
This file contains private functionality for interacting with the AWS
Common Runtime library (awscrt) in boto3.
All code contained within this file is for internal usage within this
project and is not intended for external consumption. All interfaces
contained within are subject to abrupt breaking changes.
"""

import threading

import botocore.exceptions
from botocore.session import Session
from s3transfer.crt import (
BotocoreCRTCredentialsWrapper,
BotocoreCRTRequestSerializer,
CRTTransferManager,
acquire_crt_s3_process_lock,
create_s3_crt_client,
)

# Singletons for CRT-backed transfers
CRT_S3_CLIENT = None
BOTOCORE_CRT_SERIALIZER = None

CLIENT_CREATION_LOCK = threading.Lock()
PROCESS_LOCK_NAME = 'boto3'


def _create_crt_client(session, config, region_name, cred_provider):
"""Create a CRT S3 Client for file transfer.
Instantiating many of these may lead to degraded performance or
system resource exhaustion.
"""
create_crt_client_kwargs = {
'region': region_name,
'use_ssl': True,
'crt_credentials_provider': cred_provider,
}
return create_s3_crt_client(**create_crt_client_kwargs)


def _create_crt_request_serializer(session, region_name):
return BotocoreCRTRequestSerializer(
session, {'region_name': region_name, 'endpoint_url': None}
)


def _create_crt_s3_client(
session, config, region_name, credentials, lock, **kwargs
):
"""Create boto3 wrapper class to manage crt lock reference and S3 client."""
cred_wrapper = BotocoreCRTCredentialsWrapper(credentials)
cred_provider = cred_wrapper.to_crt_credentials_provider()
return CRTS3Client(
_create_crt_client(session, config, region_name, cred_provider),
lock,
region_name,
cred_wrapper,
)


def _initialize_crt_transfer_primatives(client, config):
lock = acquire_crt_s3_process_lock(PROCESS_LOCK_NAME)
if lock is None:
# If we're unable to acquire the lock, we cannot
# use the CRT in this process and should default to
# the classic s3transfer manager.
return None, None

session = Session()
region_name = client.meta.region_name
credentials = client._get_credentials()

serializer = _create_crt_request_serializer(session, region_name)
s3_client = _create_crt_s3_client(
session, config, region_name, credentials, lock
)
return serializer, s3_client


def get_crt_s3_client(client, config):
global CRT_S3_CLIENT
global BOTOCORE_CRT_SERIALIZER

with CLIENT_CREATION_LOCK:
if CRT_S3_CLIENT is None:
serializer, s3_client = _initialize_crt_transfer_primatives(
client, config
)
BOTOCORE_CRT_SERIALIZER = serializer
CRT_S3_CLIENT = s3_client

return CRT_S3_CLIENT


class CRTS3Client:
"""
This wrapper keeps track of our underlying CRT client, the lock used to
acquire it and the region we've used to instantiate the client.
Due to limitations in the existing CRT interfaces, we can only make calls
in a single region and does not support redirects. We track the region to
ensure we don't use the CRT client when a successful request cannot be made.
"""

def __init__(self, crt_client, process_lock, region, cred_provider):
self.crt_client = crt_client
self.process_lock = process_lock
self.region = region
self.cred_provider = cred_provider


def is_crt_compatible_request(client, crt_s3_client):
"""
Boto3 client must use same signing region and credentials
as the CRT_S3_CLIENT singleton. Otherwise fallback to classic.
"""
if crt_s3_client is None:
return False

boto3_creds = client._get_credentials()
if boto3_creds is None:
return False

is_same_identity = compare_identity(
boto3_creds.get_frozen_credentials(), crt_s3_client.cred_provider
)
is_same_region = client.meta.region_name == crt_s3_client.region
return is_same_region and is_same_identity


def compare_identity(boto3_creds, crt_s3_creds):
try:
crt_creds = crt_s3_creds()
except botocore.exceptions.NoCredentialsError:
return False

is_matching_identity = (
boto3_creds.access_key == crt_creds.access_key_id
and boto3_creds.secret_key == crt_creds.secret_access_key
and boto3_creds.token == crt_creds.session_token
)
return is_matching_identity


def create_crt_transfer_manager(client, config):
"""Create a CRTTransferManager for optimized data transfer."""
crt_s3_client = get_crt_s3_client(client, config)
if is_crt_compatible_request(client, crt_s3_client):
return CRTTransferManager(
crt_s3_client.crt_client, BOTOCORE_CRT_SERIALIZER
)
return None
17 changes: 17 additions & 0 deletions boto3/s3/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# https://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.


# TransferConfig preferred_transfer_client settings
CLASSIC_TRANSFER_CLIENT = "classic"
AUTO_RESOLVE_TRANSFER_CLIENT = "auto"
8 changes: 7 additions & 1 deletion boto3/s3/inject.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import copy as python_copy

from botocore.exceptions import ClientError

from boto3 import utils
Expand Down Expand Up @@ -432,7 +434,11 @@ def copy(
if config is None:
config = TransferConfig()

with create_transfer_manager(self, config) as manager:
# copy is not supported in the CRT
new_config = python_copy.copy(config)
new_config.preferred_transfer_client = "classic"

with create_transfer_manager(self, new_config) as manager:
future = manager.copy(
copy_source=CopySource,
bucket=Bucket,
Expand Down
68 changes: 67 additions & 1 deletion boto3/s3/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,11 @@ def __call__(self, bytes_amount):
"""
from os import PathLike, fspath
import logging
import threading
from os import PathLike, fspath, getpid

from botocore.compat import HAS_CRT
from botocore.exceptions import ClientError
from s3transfer.exceptions import (
RetriesExceededError as S3TransferRetriesExceededError,
Expand All @@ -134,11 +137,19 @@ def __call__(self, bytes_amount):
from s3transfer.subscribers import BaseSubscriber
from s3transfer.utils import OSUtils

import boto3.s3.constants as constants
from boto3.exceptions import RetriesExceededError, S3UploadFailedError

if HAS_CRT:
import awscrt.s3

from boto3.crt import create_crt_transfer_manager

KB = 1024
MB = KB * KB

logger = logging.getLogger(__name__)


def create_transfer_manager(client, config, osutil=None):
"""Creates a transfer manager based on configuration
Expand All @@ -155,6 +166,50 @@ def create_transfer_manager(client, config, osutil=None):
:rtype: s3transfer.manager.TransferManager
:returns: A transfer manager based on parameters provided
"""
if _should_use_crt(config):
crt_transfer_manager = create_crt_transfer_manager(client, config)
if crt_transfer_manager is not None:
logger.debug(
f"Using CRT client. pid: {getpid()}, thread: {threading.get_ident()}"
)
return crt_transfer_manager

# If we don't resolve something above, fallback to the default.
logger.debug(
f"Using default client. pid: {getpid()}, thread: {threading.get_ident()}"
)
return _create_default_transfer_manager(client, config, osutil)


def _should_use_crt(config):
if HAS_CRT:
is_optimized_instance = awscrt.s3.is_optimized_for_system()
else:
is_optimized_instance = False
pref_transfer_client = config.preferred_transfer_client.lower()

if (
is_optimized_instance
and pref_transfer_client == constants.AUTO_RESOLVE_TRANSFER_CLIENT
):
logger.debug(
"Attempting to use CRTTransferManager. Config settings may be ignored."
)
return True

logger.debug(
"Opting out of CRT Transfer Manager. Preferred client: "
"{pref_transfer_client}, CRT available: {HAS_CRT}, "
"Instance Optimized: {is_optimized_instance}.",
pref_transfer_client,
HAS_CRT,
is_optimized_instance,
)
return False


def _create_default_transfer_manager(client, config, osutil):
"""Create the default TransferManager implementation for s3transfer."""
executor_cls = None
if not config.use_threads:
executor_cls = NonThreadedExecutor
Expand All @@ -177,6 +232,7 @@ def __init__(
io_chunksize=256 * KB,
use_threads=True,
max_bandwidth=None,
preferred_transfer_client=constants.AUTO_RESOLVE_TRANSFER_CLIENT,
):
"""Configuration object for managed S3 transfers
Expand Down Expand Up @@ -217,6 +273,15 @@ def __init__(
:param max_bandwidth: The maximum bandwidth that will be consumed
in uploading and downloading file content. The value is an integer
in terms of bytes per second.
:param preferred_transfer_client: String specifying preferred transfer
client for transfer operations.
Current supported settings are:
* auto (default) - Use the CRTTransferManager when calls
are made with supported environment and settings.
* classic - Only use the origin S3TransferManager with
requests. Disables possible CRT upgrade on requests.
"""
super().__init__(
multipart_threshold=multipart_threshold,
Expand All @@ -233,6 +298,7 @@ def __init__(
for alias in self.ALIAS:
setattr(self, alias, getattr(self, self.ALIAS[alias]))
self.use_threads = use_threads
self.preferred_transfer_client = preferred_transfer_client

def __setattr__(self, name, value):
# If the alias name is used, make sure we set the name that it points
Expand Down
14 changes: 13 additions & 1 deletion scripts/ci/install
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#!/usr/bin/env python
import argparse
import os
import shutil
from contextlib import contextmanager
Expand All @@ -25,11 +26,22 @@ def run(command):


if __name__ == "__main__":
parser = argparse.ArgumentParser()
group = parser.add_mutually_exclusive_group()
group.add_argument(
'-e',
'--extras',
help='Install extras_require along with normal install',
)
args = parser.parse_args()
with cd(REPO_ROOT):
run("pip install -r requirements.txt")
run("python scripts/ci/install-dev-deps")
if os.path.isdir("dist") and os.listdir("dist"):
shutil.rmtree("dist")
run("python setup.py bdist_wheel")
wheel_dist = os.listdir("dist")[0]
run("pip install %s" % (os.path.join("dist", wheel_dist)))
package = os.path.join('dist', wheel_dist)
if args.extras:
package = f"\"{package}[{args.extras}]\""
run(f"pip install {package}")
3 changes: 2 additions & 1 deletion scripts/ci/run-crt-tests
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@ except ImportError:

if __name__ == "__main__":
with cd(os.path.join(REPO_ROOT, "tests")):
run(f"{REPO_ROOT}/scripts/ci/run-tests unit/ functional/")
test_script = os.sep.join([REPO_ROOT, 'scripts', 'ci', 'run-tests'])
run(f"python {test_script} unit functional")
Loading

0 comments on commit 675e9b3

Please sign in to comment.