Skip to content

Commit

Permalink
[AIRFLOW-5825] SageMakerEndpointOperator is not idempotent (apache#7891)
Browse files Browse the repository at this point in the history
  • Loading branch information
OmairK authored Mar 28, 2020
1 parent 3f26427 commit 438da72
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 6 deletions.
25 changes: 19 additions & 6 deletions airflow/providers/amazon/aws/operators/sagemaker_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
# specific language governing permissions and limitations
# under the License.

from botocore.exceptions import ClientError

from airflow.exceptions import AirflowException
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
from airflow.providers.amazon.aws.operators.sagemaker_base import SageMakerBaseOperator
Expand Down Expand Up @@ -129,13 +131,24 @@ def execute(self, context):
raise ValueError('Invalid value! Argument operation has to be one of "create" and "update"')

self.log.info('%s SageMaker endpoint %s.', log_str, endpoint_info['EndpointName'])
try:
response = sagemaker_operation(
endpoint_info,
wait_for_completion=self.wait_for_completion,
check_interval=self.check_interval,
max_ingestion_time=self.max_ingestion_time
)
except ClientError: # Botocore throws a ClientError if the endpoint is already created
self.operation = 'update'
sagemaker_operation = self.hook.update_endpoint
log_str = 'Updating'
response = sagemaker_operation(
endpoint_info,
wait_for_completion=self.wait_for_completion,
check_interval=self.check_interval,
max_ingestion_time=self.max_ingestion_time
)

response = sagemaker_operation(
endpoint_info,
wait_for_completion=self.wait_for_completion,
check_interval=self.check_interval,
max_ingestion_time=self.max_ingestion_time
)
if response['ResponseMetadata']['HTTPStatusCode'] != 200:
raise AirflowException(
'Sagemaker endpoint creation failed: %s' % response)
Expand Down
17 changes: 17 additions & 0 deletions tests/providers/amazon/aws/operators/test_sagemaker_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import unittest

import mock
from botocore.exceptions import ClientError

from airflow.exceptions import AirflowException
from airflow.providers.amazon.aws.hooks.sagemaker import SageMakerHook
Expand Down Expand Up @@ -112,6 +113,22 @@ def test_execute_with_failure(self, mock_endpoint, mock_endpoint_config,
{'HTTPStatusCode': 404}}
self.assertRaises(AirflowException, self.sagemaker.execute, None)

@mock.patch.object(SageMakerHook, 'get_conn')
@mock.patch.object(SageMakerHook, 'create_model')
@mock.patch.object(SageMakerHook, 'create_endpoint_config')
@mock.patch.object(SageMakerHook, 'create_endpoint')
@mock.patch.object(SageMakerHook, 'update_endpoint')
def test_execute_with_duplicate_endpoint_creation(self, mock_endpoint_update,
mock_endpoint, mock_endpoint_config,
mock_model, mock_client):
response = {"Error": {"Code": "ValidationException",
"Message": "Cannot create already existing endpoint."}}
mock_endpoint.side_effect = ClientError(error_response=response, operation_name="CreateEndpoint")
mock_endpoint_update.return_value = {'EndpointArn': 'testarn',
'ResponseMetadata':
{'HTTPStatusCode': 200}}
self.sagemaker.execute(None)


if __name__ == '__main__':
unittest.main()

0 comments on commit 438da72

Please sign in to comment.