Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transient S3Upload failure: KeyError('endpoint_resolver') #3925

Closed
gregoryroche opened this issue Jan 6, 2021 · 3 comments
Closed

Transient S3Upload failure: KeyError('endpoint_resolver') #3925

gregoryroche opened this issue Jan 6, 2021 · 3 comments
Labels
bug Something isn't working integrations Related to integrations with other services

Comments

@gregoryroche
Copy link

gregoryroche commented Jan 6, 2021

Description

Occasionally, one single mapped instance of an S3Upload task will fail with an unexpected KeyError('endpoint_resolver') which causes the flow to fail. Every time I have encountered this error, the flow was subsequently successful when it was restarted, without making any changes to the code or to any aspect of the Prefect setup.

Full stacktrace:

Unexpected error: KeyError('endpoint_resolver')
Traceback (most recent call last):
  File "c:\my-project\.venv\lib\site-packages\prefect\engine\runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "c:\my-project\.venv\lib\site-packages\prefect\engine\task_runner.py", line 856, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "c:\my-project\.venv\lib\site-packages\prefect\utilities\executors.py", line 298, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "c:\my-project\.venv\lib\site-packages\prefect\utilities\tasks.py", line 449, in method
    return run_method(self, *args, **kwargs)
  File "c:\my-project\.venv\lib\site-packages\prefect\tasks\aws\s3.py", line 147, in run
    s3_client = get_boto_client("s3", credentials=credentials, **self.boto_kwargs)
  File "c:\my-project\.venv\lib\site-packages\prefect\utilities\aws.py", line 55, in get_boto_client
    return boto3.client(
  File "c:\my-project\.venv\lib\site-packages\boto3\__init__.py", line 91, in client
    return _get_default_session().client(*args, **kwargs)
  File "c:\my-project\.venv\lib\site-packages\boto3\session.py", line 258, in client
    return self._session.create_client(
  File "c:\my-project\.venv\lib\site-packages\botocore\session.py", line 824, in create_client
    endpoint_resolver = self._get_internal_component('endpoint_resolver')
  File "c:\my-project\.venv\lib\site-packages\botocore\session.py", line 697, in _get_internal_component
    return self._internal_components.get_component(name)
  File "c:\my-project\.venv\lib\site-packages\botocore\session.py", line 923, in get_component
    del self._deferred[name]
KeyError: 'endpoint_resolver'

My personal experience with this error is with a flow which creates around 25 mapped instances of the S3Upload task, each of which uploads one file to the same S3 bucket using the same credentials. This flow runs every day and seems to encounter an error every four or five runs, meaning the S3Upload task fails around 1% of the time. I think I can exclude transient connection issues as the root cause, because the S3Upload tasks all run within a minute or so of each other and only one task ever fails. Curiously, every time I have seen this failure, it has occurred on the mapped task with index 1.

Expected Behavior

That all of the S3Upload tasks execute reliably without encountering this error. Or, failing that, I would expect that all of the S3Upload tasks would fail for this reason if there really is some logical or structural problem with the flow.

Reproduction

Unfortunately I am unable to reliably replicate this error. Included below are details of my flow and setup for reference. My best suggestion to try to replicate this reliably would be to create a flow which maps over ~25 items and causes that many instances of the S3Upload task to execute, then running that flow repeatedly.

Flow logic:

from prefect.tasks.aws.s3 import S3Upload

upload_to_s3 = S3Upload(bucket=Config.S3_BUCKET)

with Flow() as flow:
    # ETL logic here which maps over 25 CSV files

    s3_objects = upload_to_s3.map(
        data=csv_bytes,
        key=s3_keys,
        compression=unmapped("gzip"))

Following lines are in config.toml:

[cloud]
use_local_secrets = true

[AWS_CREDENTIALS]
ACCESS_KEY = "foo"
SECRET_ACCESS_KEY = "bar"

Environment

Output from running prefect diagnostics:

{
  "config_overrides": {
    "AWS_CREDENTIALS": {
      "ACCESS_KEY": true,
      "SECRET_ACCESS_KEY": true
    },
    "backend": true,
    "context": {
      "secrets": false
    },
    "debug": true,
    "server": {
      "database": {
        "host": true
      },
      "graphql": {
        "host": true
      },
      "hasura": {
        "host": true
      },
      "host": true,
      "ui": {
        "graphql_url": true,
        "host": true
      }
    }
  },
  "env_vars": [],
  "system_information": {
    "platform": "Windows-10-10.0.14393-SP0",
    "prefect_backend": "server",
    "prefect_version": "0.13.15",
    "python_version": "3.8.1"
  }
}

(in the next couple of weeks our team plans to upgrade these flows to use prefect version >=0.14, if the update changes anything I'll update this issue)

@joshmeek joshmeek added bug Something isn't working integrations Related to integrations with other services labels Jan 6, 2021
@zanieb
Copy link
Contributor

zanieb commented Jan 6, 2021

This appears to be a boto3 issue -- client creation is not thread safe. See boto/boto3#801

We could consider using resource instead

@gregoryroche
Copy link
Author

Created a PR to fix this issue: #3981

For anyone affected by this issue in the meantime, I wrote a workaround for our project which has worked well for the past week, by creating a new task that inherits from S3Upload / S3Download and overriding the run() method to pass use_session = True to the call to get_boto_client().

Sample code:

class S3UploadThreadSafe(S3Upload):
    """ Re-implementation of Prefect's S3Upload task to use thread-safe logic.
    It differs from the parent task by passing "use_session=True" to "get_boto_client()". """
    @defaults_from_attrs("bucket")
    def run(
            self,
            data: str,
            key: str = None,
            credentials: dict = None,
            bucket: str = None,
            compression: str = None,
    ):
        """
        Task run method.
        Args:
            - data (str): the data payload to upload
            - key (str, optional): the Key to upload the data under; if not
                provided, a random `uuid` will be created
            - credentials (dict, optional): your AWS credentials passed from an upstream
                Secret task; this Secret must be a JSON string
                with two keys: `ACCESS_KEY` and `SECRET_ACCESS_KEY` which will be
                passed directly to `boto3`.  If not provided here or in context, `boto3`
                will fall back on standard AWS rules for authentication.
            - bucket (str, optional): the name of the S3 Bucket to upload to
            - compression (str, optional): specifies a file format for compression,
                compressing data before upload. Currently supports `'gzip'`.
        Returns:
            - str: the name of the Key the data payload was uploaded to
        """
        if bucket is None:
            raise ValueError("A bucket name must be provided.")

        s3_client = get_boto_client(
            "s3",
            credentials=credentials,
            use_session=True,
            **self.boto_kwargs)

        # compress data if compression is specified
        if compression:
            if compression == "gzip":
                data = gzip.compress(data)
            else:
                raise ValueError(f"Unrecognized compression method '{compression}'.")

        # prepare data
        try:
            stream = io.BytesIO(data)
        except TypeError:
            stream = io.BytesIO(data.encode())

        # create key if not provided
        if key is None:
            key = str(uuid.uuid4())

        # upload
        s3_client.upload_fileobj(stream, Bucket=bucket, Key=key)
        return key

@jcrist jcrist closed this as completed in 9c34e32 Feb 1, 2021
@nialloriordanroo
Copy link
Contributor

FYI PR is here: #3981. Don't need to pass in use_session=True as this is now hardcoded in the boto3 client

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working integrations Related to integrations with other services
Projects
None yet
Development

No branches or pull requests

4 participants