Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use boto3 session in S3Upload and S3Download tasks #3981

Merged
merged 10 commits into from
Feb 1, 2021
Merged

Use boto3 session in S3Upload and S3Download tasks #3981

merged 10 commits into from
Feb 1, 2021

Conversation

gregoryroche
Copy link

@gregoryroche gregoryroche commented Jan 19, 2021

Summary

Create a boto3 session during tasks.aws.S3Upload and tasks.aws.S3Download to ensure the task is executed in a thread-safe way. Provides a workaround for #3925.

Changes

The only change is the addition of one argument to the run() method for each of the two tasks, with the value of this argument being passed to get_boto_client(). The default value is False, which makes the task use the exact same behaviour from before this PR to avoid backwards compatibility issues. If True, a boto3 session is created before the boto3 client is created, requiring slightly more overhead but ensuring thread-safe execution.

After discussion with code owners, updated the logic so that the only change is to always pass use_session = True to get_boto_client().

Importance

This PR makes the S3Upload and S3Download tasks more robust when used in flows with parallel execution.

Checklist

This PR:

  • [✔] adds new tests (if appropriate)
  • [✔] adds a change file in the changes/ directory (if appropriate)
  • [✔] updates docstrings for any new functions or function arguments, including docs/outline.toml for API reference docs (if appropriate)

(note: this is my first ever pull request, and my first ever commit to a public repo, apologies in advance for any mistakes or weirdness)

Fixes #3925 by allowing boto3 client for S3Upload and S3Download tasks
to be created using a session. Default set to False for backwards
compatibility.
Fixes #3925 by allowing boto3 client for S3Upload and S3Download tasks
to be created using a session. Default set to False for backwards
compatibility.
Adds tests to verify fix for #3925
@marvin-robot
Copy link
Member

Here I am, brain the size of a planet and they ask me to welcome you to Prefect.

So, welcome to the community @gregoryroche! 🎉 🎉

@codecov
Copy link

codecov bot commented Jan 19, 2021

Codecov Report

Merging #3981 (0c7777d) into master (cd6c30f) will not change coverage.
The diff coverage is n/a.

Copy link

@jcrist jcrist left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @gregoryroche. In general this seems fine to me.

I wonder if we should make use_session the default though here - Prefect tasks are commonly ran in parallel, and a quick benchmark shows negligible difference between using sessions and not. I might drop the use_session kwarg from the tasks (users shouldn't need it, and extra options can be confusing) and just pass use_session=True to get_boto_client everywhere. Thoughts?

@zanieb
Copy link
Contributor

zanieb commented Jan 20, 2021

I'd also vote in favor of at least defaulting to True

@gregoryroche
Copy link
Author

Thanks for your comments @jcrist and @madkinsz, I think just passing use_session=True to get_boto_client sounds good to me too, especially for the sake of avoiding having extra task options that most users won't use and might find confusing. I was probably too overly cautious with my original approach :) I will push an update to my fork with this change.

@gregoryroche
Copy link
Author

Unfortunately, after updating locally so that the only change from master is passing use_session=True, I'm getting 3 test failures:

FAILED tests/tasks/aws/test_s3.py::TestS3Download::test_gzip_compression - AssertionError: assert '' == 'col1,col2,co...alse,data,2\n'
FAILED tests/tasks/aws/test_s3.py::TestS3Upload::test_generated_key_is_str - TypeError: 'NoneType' object is not subscriptable
FAILED tests/tasks/aws/test_s3.py::TestS3Upload::test_gzip_compression - IndexError: list index out of range

I'm certain that this is an issue with the tests: for one, in my organisation we've been using this exact fix for our production flows the last 10 days and it's worked perfectly (including specifying gzip compression). Also, if this change really had broken the functionality which these tests are checking, then I would have expected different errors from these tests. For example, in TestS3Download::test_gzip_compression if this change had actually broken the gzip functionality, the error would look something like AssertionError: assert b'\x1f\x8b\x08\x00\xe8\x7f\x08(etc)' == 'col1,col2,co...alse,data,2\n', but the mocked task run just returns an empty string. When I debug the failing tests, the mocked client is completely untouched, suggesting that the testing logic didn't take into account the creation of a session.

I'm now kinda stuck to be honest, as I don't have much experience with test mocking like this and my attempts to fix the errors so far haven't lead anywhere. I don't want to just remove the failing tests, is there anything you can suggest to unblock this?

@zanieb
Copy link
Contributor

zanieb commented Jan 20, 2021

@gregoryroche

So the issue here is as follows

The mock is setup on aws.boto3.client

# From test_gzip_compression
        client = MagicMock()
        boto3 = MagicMock(client=MagicMock(return_value=client))
        monkeypatch.setattr("prefect.utilities.aws.boto3", boto3)

However, since we are using a session, we are accessing aws.boto3.session.Session.client

# From get_boto_client
    if use_session:
        # see https://boto3.amazonaws.com/v1/documentation/api/latest/guide/resources.html?#multithreading-multiprocessing  # noqa
        region_name = kwargs.pop("region_name", None)
        botocore_session = kwargs.pop("botocore_session", None)
        session = boto3.session.Session(
            profile_name=profile_name,
            region_name=region_name,
            botocore_session=botocore_session,
        )
        return session.client(
            resource,
            aws_access_key_id=aws_access_key,
            aws_secret_access_key=aws_secret_access_key,
            aws_session_token=aws_session_token,
            **kwargs
        )
    else:
        return boto3.client(
            resource,
            aws_access_key_id=aws_access_key,
            aws_secret_access_key=aws_secret_access_key,
            aws_session_token=aws_session_token,
            **kwargs
        )

We'll want to introduce a pytest fixture that mocks both of these routes and returns the client.

Something like

@pytest.fixture
def mocked_boto_client(monkeypatch):
        client = MagicMock()
        session = MagicMock(client=MagicMock(return_value=client))
        boto3 = MagicMock(client=MagicMock(return_value=client), session=MagicMock(return_value=session))
        monkeypatch.setattr("prefect.utilities.aws.boto3", boto3)
        return client

def test_using_mocked_boto_client(mocked_boto_client):
        mocked_boto_client.download_fileobj.side_effect = ....

Per discussion in #3981, drop use_session kwarg from task
and instead pass use_session = True to get_boto_client
Work-in-progress changes to tests for S3Upload and S3Download
to try to mock the clients correctly now a boto3 session is created.
Three tests still fail as of this commit.
@gregoryroche
Copy link
Author

Thanks @madkinsz for your help and suggestions. I'm afraid I haven't been able to fix these tests using the mocked_boto_client boilerplate, I must still be doing something wrong with this functionality, with which I have regrettably little experience. I've pushed my WIP changes to my fork for reference, if either you or @jcrist could find some time to take a look I'd be very grateful.

In short, the new mocked client (returned from mocked_boto_client(monkeypatch)) isn't being touched at all by the tests, even after adding the @pytest.mark.usefixtures("mocked_boto_client") decorator to the test classes. A breakpoint on the modify_stream() function inside test_gzip_compression() is never hit when the test is executed, so it seems like the mocked client is created and then just never accessed.

My suspicion is that it might have something to do with monkeypatch.setattr("prefect.utilities.aws.boto3", boto3). If I understand correctly this line is supposed to replace functionality that would normally be imported from prefect.utilities.aws.boto3 with the mock boto3 client, but I feel like somehow this logic just isn't being applied at the time of the test, possibly because the tests are class methods instead of standalone functions, or because of something weird I've done with the implementation.

Any pointers you can give will be gratefully received :)

@zanieb
Copy link
Contributor

zanieb commented Jan 26, 2021

I'll take a look

@zanieb
Copy link
Contributor

zanieb commented Jan 26, 2021

I'm sorry, I gave you some bad pseudocode for that fixture. A better way to do it is:

@pytest.fixture
def mocked_boto_client(monkeypatch):
    boto3 = MagicMock()
    client = boto3.session.Session().client()
    boto3.client = MagicMock(return_value=client)
    monkeypatch.setattr("prefect.utilities.aws.boto3", boto3)
    return client

To debug this in the future:

  • Put a breakpoint at the top of your test
  • Put a breakpoint in the S3Download run after it gets the s3_client
  • Compare the magic mock id numbers -- they should match

You don't need to add the use-fixture decorator to the class.

Thanks for spending your time on this @gregoryroche ! Let me know if you have any questions.

Adds a fix for the mocked_boto_client logic from @madkinsz, and adds two
tests to ensure the boto3 client is created with a session.
Fixes #3925
@gregoryroche
Copy link
Author

Thanks very much for your help @madkinsz , your updated fixture code works great, I was able to mend the existing broken tests and add a couple for the new session logic. I used mocked_boto_client.return_value._extract_mock_name() to determine whether a session was created but if there's a more elegant way of checking this I'd be happy to update that. Thanks to you both for your time and patience :)

@gregoryroche gregoryroche changed the title Add use_session argument to S3 tasks Use boto3 session in S3Upload and S3Download tasks Jan 26, 2021
@gregoryroche
Copy link
Author

Hi @jcrist and @madkinsz, just wanted to check if there are any blockers on your side from merging this PR, please let me know if I should take any further action on this :)

Copy link

@jcrist jcrist left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks!

@jcrist jcrist merged commit 974c4a2 into PrefectHQ:master Feb 1, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants