Skip to content

Commit

Permalink
feat(s3): notifications to existing buckets (#15158)
Browse files Browse the repository at this point in the history
An update custom resource changes to support adding notifications to an existing S3 bucket

Resolves #2004

----

*By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
  • Loading branch information
Michael Brewer committed Jun 16, 2021
1 parent abc9b37 commit 7d218c2
Show file tree
Hide file tree
Showing 16 changed files with 695 additions and 217 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,6 @@
"FServiceRole3AC82EE1"
]
},
"B08E7C7AF": {
"Type": "AWS::S3::Bucket",
"UpdateReplacePolicy": "Delete",
"DeletionPolicy": "Delete"
},
"BNotificationsEB8DA980": {
"Type": "Custom::S3BucketNotifications",
"Properties": {
Expand Down Expand Up @@ -91,12 +86,18 @@
}
}
]
}
},
"Managed": true
},
"DependsOn": [
"BAllowBucketNotificationsTolambdaeventsources3F741608059EF9F709"
]
},
"B08E7C7AF": {
"Type": "AWS::S3::Bucket",
"UpdateReplacePolicy": "Delete",
"DeletionPolicy": "Delete"
},
"BAllowBucketNotificationsTolambdaeventsources3F741608059EF9F709": {
"Type": "AWS::Lambda::Permission",
"Properties": {
Expand Down Expand Up @@ -176,7 +177,7 @@
"Properties": {
"Description": "AWS CloudFormation handler for \"Custom::S3BucketNotifications\" resources (@aws-cdk/aws-s3)",
"Code": {
"ZipFile": "exports.handler = (event, context) => {\n // eslint-disable-next-line @typescript-eslint/no-require-imports, import/no-extraneous-dependencies\n const s3 = new (require('aws-sdk').S3)();\n // eslint-disable-next-line @typescript-eslint/no-require-imports\n const https = require('https');\n // eslint-disable-next-line @typescript-eslint/no-require-imports\n const url = require('url');\n log(JSON.stringify(event, undefined, 2));\n const props = event.ResourceProperties;\n if (event.RequestType === 'Delete') {\n props.NotificationConfiguration = {}; // this is how you clean out notifications\n }\n const req = {\n Bucket: props.BucketName,\n NotificationConfiguration: props.NotificationConfiguration,\n };\n return s3.putBucketNotificationConfiguration(req, (err, data) => {\n log({ err, data });\n if (err) {\n return submitResponse('FAILED', err.message + `\\nMore information in CloudWatch Log Stream: ${context.logStreamName}`);\n }\n else {\n return submitResponse('SUCCESS');\n }\n });\n function log(obj) {\n console.error(event.RequestId, event.StackId, event.LogicalResourceId, obj);\n }\n // eslint-disable-next-line max-len\n // adapted from https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-function-code.html#cfn-lambda-function-code-cfnresponsemodule\n // to allow sending an error message as a reason.\n function submitResponse(responseStatus, reason) {\n const responseBody = JSON.stringify({\n Status: responseStatus,\n Reason: reason || 'See the details in CloudWatch Log Stream: ' + context.logStreamName,\n PhysicalResourceId: event.PhysicalResourceId || event.LogicalResourceId,\n StackId: event.StackId,\n RequestId: event.RequestId,\n LogicalResourceId: event.LogicalResourceId,\n NoEcho: false,\n });\n log({ responseBody });\n const parsedUrl = url.parse(event.ResponseURL);\n const options = {\n hostname: parsedUrl.hostname,\n port: 443,\n path: parsedUrl.path,\n method: 'PUT',\n headers: {\n 'content-type': '',\n 'content-length': responseBody.length,\n },\n };\n const request = https.request(options, (r) => {\n log({ statusCode: r.statusCode, statusMessage: r.statusMessage });\n context.done();\n });\n request.on('error', (error) => {\n log({ sendError: error });\n context.done();\n });\n request.write(responseBody);\n request.end();\n }\n};"
"ZipFile": "import boto3 # type: ignore\nimport json\nimport logging\nimport urllib.request\n\ns3 = boto3.client(\"s3\")\n\nCONFIGURATION_TYPES = [\"TopicConfigurations\", \"QueueConfigurations\", \"LambdaFunctionConfigurations\"]\n\ndef handler(event: dict, context):\n response_status = \"SUCCESS\"\n error_message = \"\"\n try:\n props = event[\"ResourceProperties\"]\n bucket = props[\"BucketName\"]\n notification_configuration = props[\"NotificationConfiguration\"]\n request_type = event[\"RequestType\"]\n managed = props.get('Managed', 'true').lower() == 'true'\n stack_id = event['StackId']\n\n if managed:\n config = handle_managed(request_type, notification_configuration)\n else:\n config = handle_unmanaged(bucket, stack_id, request_type, notification_configuration)\n\n put_bucket_notification_configuration(bucket, config)\n except Exception as e:\n logging.exception(\"Failed to put bucket notification configuration\")\n response_status = \"FAILED\"\n error_message = f\"Error: {str(e)}. \"\n finally:\n submit_response(event, context, response_status, error_message)\n\n\ndef handle_managed(request_type, notification_configuration):\n if request_type == 'Delete':\n return {}\n return notification_configuration\n\n\ndef handle_unmanaged(bucket, stack_id, request_type, notification_configuration):\n\n # find external notifications\n external_notifications = find_external_notifications(bucket, stack_id)\n\n # if delete, that's all we need\n if request_type == 'Delete':\n return external_notifications\n\n def with_id(notification):\n notification['Id'] = f\"{stack_id}-{hash(json.dumps(notification, sort_keys=True))}\"\n return notification\n\n # otherwise, merge external with incoming config and augment with id\n notifications = {}\n for t in CONFIGURATION_TYPES:\n external = external_notifications.get(t, [])\n incoming = [with_id(n) for n in notification_configuration.get(t, [])]\n notifications[t] = external + incoming\n return notifications\n\n\ndef find_external_notifications(bucket, stack_id):\n existing_notifications = get_bucket_notification_configuration(bucket)\n external_notifications = {}\n for t in CONFIGURATION_TYPES:\n # if the notification was created by us, we know what id to expect\n # so we can filter by it.\n external_notifications[t] = [n for n in existing_notifications.get(t, []) if not n['Id'].startswith(f\"{stack_id}-\")]\n\n return external_notifications\n\n\ndef get_bucket_notification_configuration(bucket):\n return s3.get_bucket_notification_configuration(Bucket=bucket)\n\n\ndef put_bucket_notification_configuration(bucket, notification_configuration):\n s3.put_bucket_notification_configuration(Bucket=bucket, NotificationConfiguration=notification_configuration)\n\n\ndef submit_response(event: dict, context, response_status: str, error_message: str):\n response_body = json.dumps(\n {\n \"Status\": response_status,\n \"Reason\": f\"{error_message}See the details in CloudWatch Log Stream: {context.log_stream_name}\",\n \"PhysicalResourceId\": event.get(\"PhysicalResourceId\") or event[\"LogicalResourceId\"],\n \"StackId\": event[\"StackId\"],\n \"RequestId\": event[\"RequestId\"],\n \"LogicalResourceId\": event[\"LogicalResourceId\"],\n \"NoEcho\": False,\n }\n ).encode(\"utf-8\")\n headers = {\"content-type\": \"\", \"content-length\": str(len(response_body))}\n try:\n req = urllib.request.Request(url=event[\"ResponseURL\"], headers=headers, data=response_body, method=\"PUT\")\n with urllib.request.urlopen(req) as response:\n print(response.read().decode(\"utf-8\"))\n print(\"Status code: \" + response.reason)\n except Exception as e:\n print(\"send(..) failed executing request.urlopen(..): \" + str(e))\n"
},
"Handler": "index.handler",
"Role": {
Expand All @@ -185,7 +186,7 @@
"Arn"
]
},
"Runtime": "nodejs12.x",
"Runtime": "python3.8",
"Timeout": 300
},
"DependsOn": [
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
{
"Resources": {
"Bucket83908E77": {
"Type": "AWS::S3::Bucket",
"UpdateReplacePolicy": "Delete",
"DeletionPolicy": "Delete"
},
"BucketNotifications8F2E257D": {
"Type": "Custom::S3BucketNotifications",
"Properties": {
Expand Down Expand Up @@ -46,7 +41,8 @@
}
}
]
}
},
"Managed": true
},
"DependsOn": [
"TopicPolicyA1747468",
Expand All @@ -55,6 +51,11 @@
"Topic3DEAE47A7"
]
},
"Bucket83908E77": {
"Type": "AWS::S3::Bucket",
"UpdateReplacePolicy": "Delete",
"DeletionPolicy": "Delete"
},
"TopicBFC7AF6E": {
"Type": "AWS::SNS::Topic"
},
Expand Down Expand Up @@ -143,6 +144,36 @@
"Ref": "Topic3DEAE47A7"
},
"Sid": "1"
},
{
"Action": "sns:Publish",
"Condition": {
"ArnLike": {
"aws:SourceArn": {
"Fn::Join": [
"",
[
"arn:",
{
"Ref": "AWS::Partition"
},
":s3:::",
{
"Ref": "Bucket25524B414"
}
]
]
}
}
},
"Effect": "Allow",
"Principal": {
"Service": "s3.amazonaws.com"
},
"Resource": {
"Ref": "Topic3DEAE47A7"
},
"Sid": "2"
}
],
"Version": "2012-10-17"
Expand Down Expand Up @@ -194,6 +225,11 @@
"Action": "s3:PutBucketNotification",
"Effect": "Allow",
"Resource": "*"
},
{
"Action": "s3:GetBucketNotification",
"Effect": "Allow",
"Resource": "*"
}
],
"Version": "2012-10-17"
Expand All @@ -211,7 +247,7 @@
"Properties": {
"Description": "AWS CloudFormation handler for \"Custom::S3BucketNotifications\" resources (@aws-cdk/aws-s3)",
"Code": {
"ZipFile": "exports.handler = (event, context) => {\n // eslint-disable-next-line @typescript-eslint/no-require-imports, import/no-extraneous-dependencies\n const s3 = new (require('aws-sdk').S3)();\n // eslint-disable-next-line @typescript-eslint/no-require-imports\n const https = require('https');\n // eslint-disable-next-line @typescript-eslint/no-require-imports\n const url = require('url');\n log(JSON.stringify(event, undefined, 2));\n const props = event.ResourceProperties;\n if (event.RequestType === 'Delete') {\n props.NotificationConfiguration = {}; // this is how you clean out notifications\n }\n const req = {\n Bucket: props.BucketName,\n NotificationConfiguration: props.NotificationConfiguration,\n };\n return s3.putBucketNotificationConfiguration(req, (err, data) => {\n log({ err, data });\n if (err) {\n return submitResponse('FAILED', err.message + `\\nMore information in CloudWatch Log Stream: ${context.logStreamName}`);\n }\n else {\n return submitResponse('SUCCESS');\n }\n });\n function log(obj) {\n console.error(event.RequestId, event.StackId, event.LogicalResourceId, obj);\n }\n // eslint-disable-next-line max-len\n // adapted from https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-function-code.html#cfn-lambda-function-code-cfnresponsemodule\n // to allow sending an error message as a reason.\n function submitResponse(responseStatus, reason) {\n const responseBody = JSON.stringify({\n Status: responseStatus,\n Reason: reason || 'See the details in CloudWatch Log Stream: ' + context.logStreamName,\n PhysicalResourceId: event.PhysicalResourceId || event.LogicalResourceId,\n StackId: event.StackId,\n RequestId: event.RequestId,\n LogicalResourceId: event.LogicalResourceId,\n NoEcho: false,\n });\n log({ responseBody });\n const parsedUrl = url.parse(event.ResponseURL);\n const options = {\n hostname: parsedUrl.hostname,\n port: 443,\n path: parsedUrl.path,\n method: 'PUT',\n headers: {\n 'content-type': '',\n 'content-length': responseBody.length,\n },\n };\n const request = https.request(options, (r) => {\n log({ statusCode: r.statusCode, statusMessage: r.statusMessage });\n context.done();\n });\n request.on('error', (error) => {\n log({ sendError: error });\n context.done();\n });\n request.write(responseBody);\n request.end();\n }\n};"
"ZipFile": "import boto3 # type: ignore\nimport json\nimport logging\nimport urllib.request\n\ns3 = boto3.client(\"s3\")\n\nCONFIGURATION_TYPES = [\"TopicConfigurations\", \"QueueConfigurations\", \"LambdaFunctionConfigurations\"]\n\ndef handler(event: dict, context):\n response_status = \"SUCCESS\"\n error_message = \"\"\n try:\n props = event[\"ResourceProperties\"]\n bucket = props[\"BucketName\"]\n notification_configuration = props[\"NotificationConfiguration\"]\n request_type = event[\"RequestType\"]\n managed = props.get('Managed', 'true').lower() == 'true'\n stack_id = event['StackId']\n\n if managed:\n config = handle_managed(request_type, notification_configuration)\n else:\n config = handle_unmanaged(bucket, stack_id, request_type, notification_configuration)\n\n put_bucket_notification_configuration(bucket, config)\n except Exception as e:\n logging.exception(\"Failed to put bucket notification configuration\")\n response_status = \"FAILED\"\n error_message = f\"Error: {str(e)}. \"\n finally:\n submit_response(event, context, response_status, error_message)\n\n\ndef handle_managed(request_type, notification_configuration):\n if request_type == 'Delete':\n return {}\n return notification_configuration\n\n\ndef handle_unmanaged(bucket, stack_id, request_type, notification_configuration):\n\n # find external notifications\n external_notifications = find_external_notifications(bucket, stack_id)\n\n # if delete, that's all we need\n if request_type == 'Delete':\n return external_notifications\n\n def with_id(notification):\n notification['Id'] = f\"{stack_id}-{hash(json.dumps(notification, sort_keys=True))}\"\n return notification\n\n # otherwise, merge external with incoming config and augment with id\n notifications = {}\n for t in CONFIGURATION_TYPES:\n external = external_notifications.get(t, [])\n incoming = [with_id(n) for n in notification_configuration.get(t, [])]\n notifications[t] = external + incoming\n return notifications\n\n\ndef find_external_notifications(bucket, stack_id):\n existing_notifications = get_bucket_notification_configuration(bucket)\n external_notifications = {}\n for t in CONFIGURATION_TYPES:\n # if the notification was created by us, we know what id to expect\n # so we can filter by it.\n external_notifications[t] = [n for n in existing_notifications.get(t, []) if not n['Id'].startswith(f\"{stack_id}-\")]\n\n return external_notifications\n\n\ndef get_bucket_notification_configuration(bucket):\n return s3.get_bucket_notification_configuration(Bucket=bucket)\n\n\ndef put_bucket_notification_configuration(bucket, notification_configuration):\n s3.put_bucket_notification_configuration(Bucket=bucket, NotificationConfiguration=notification_configuration)\n\n\ndef submit_response(event: dict, context, response_status: str, error_message: str):\n response_body = json.dumps(\n {\n \"Status\": response_status,\n \"Reason\": f\"{error_message}See the details in CloudWatch Log Stream: {context.log_stream_name}\",\n \"PhysicalResourceId\": event.get(\"PhysicalResourceId\") or event[\"LogicalResourceId\"],\n \"StackId\": event[\"StackId\"],\n \"RequestId\": event[\"RequestId\"],\n \"LogicalResourceId\": event[\"LogicalResourceId\"],\n \"NoEcho\": False,\n }\n ).encode(\"utf-8\")\n headers = {\"content-type\": \"\", \"content-length\": str(len(response_body))}\n try:\n req = urllib.request.Request(url=event[\"ResponseURL\"], headers=headers, data=response_body, method=\"PUT\")\n with urllib.request.urlopen(req) as response:\n print(response.read().decode(\"utf-8\"))\n print(\"Status code: \" + response.reason)\n except Exception as e:\n print(\"send(..) failed executing request.urlopen(..): \" + str(e))\n"
},
"Handler": "index.handler",
"Role": {
Expand All @@ -220,19 +256,14 @@
"Arn"
]
},
"Runtime": "nodejs12.x",
"Runtime": "python3.8",
"Timeout": 300
},
"DependsOn": [
"BucketNotificationsHandler050a0587b7544547bf325f094a3db834RoleDefaultPolicy2CF63D36",
"BucketNotificationsHandler050a0587b7544547bf325f094a3db834RoleB6FB88EC"
]
},
"Bucket25524B414": {
"Type": "AWS::S3::Bucket",
"UpdateReplacePolicy": "Delete",
"DeletionPolicy": "Delete"
},
"Bucket2NotificationsD9BA2A77": {
"Type": "Custom::S3BucketNotifications",
"Properties": {
Expand Down Expand Up @@ -270,7 +301,44 @@
}
}
]
}
},
"Managed": true
},
"DependsOn": [
"Topic3Policy49BDDFBD",
"Topic3DEAE47A7"
]
},
"Bucket25524B414": {
"Type": "AWS::S3::Bucket",
"UpdateReplacePolicy": "Delete",
"DeletionPolicy": "Delete"
},
"Bucket3NotificationsAFEFF359": {
"Type": "Custom::S3BucketNotifications",
"Properties": {
"ServiceToken": {
"Fn::GetAtt": [
"BucketNotificationsHandler050a0587b7544547bf325f094a3db8347ECC3691",
"Arn"
]
},
"BucketName": {
"Ref": "Bucket25524B414"
},
"NotificationConfiguration": {
"TopicConfigurations": [
{
"Events": [
"s3:ObjectCreated:Copy"
],
"TopicArn": {
"Ref": "Topic3DEAE47A7"
}
}
]
},
"Managed": false
},
"DependsOn": [
"Topic3Policy49BDDFBD",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,7 @@ const bucket2 = new s3.Bucket(stack, 'Bucket2', {
});
bucket2.addObjectRemovedNotification(new s3n.SnsDestination(topic3), { prefix: 'foo' }, { suffix: 'foo/bar' });

const bucket3 = s3.Bucket.fromBucketName(stack, 'Bucket3', bucket2.bucketName);
bucket3.addEventNotification(s3.EventType.OBJECT_CREATED_COPY, new s3n.SnsDestination(topic3));

app.synth();
Loading

0 comments on commit 7d218c2

Please sign in to comment.