diff --git a/docs/src/content/docs/triggers/s3-event-trigger.md b/docs/src/content/docs/triggers/s3-event-trigger.md index 88353f92..916c5310 100644 --- a/docs/src/content/docs/triggers/s3-event-trigger.md +++ b/docs/src/content/docs/triggers/s3-event-trigger.md @@ -53,7 +53,7 @@ class Stack extends cdk.Stack { } ``` -You can also specify multiple buckets to be monitored by the S3 trigger. +You can also specify multiple buckets to be monitored by the S3 trigger by passing an array of S3 buckets to the `.withBuckets` method. ```typescript const trigger = new S3EventTrigger.Builder() @@ -64,7 +64,7 @@ const trigger = new S3EventTrigger.Builder() .build(); ``` -
+
--- @@ -85,12 +85,32 @@ const trigger = new S3EventTrigger.Builder() .build(); ``` -
+
--- ### 🗂️ Metadata +The S3 event trigger middleware makes it optionally possible to fetch the [metadata](https://docs.aws.amazon.com/AmazonS3/latest/userguide/UsingMetadata.html) associated with the S3 object, and enrich the created cloud event with the object metadata. + +> 💁 Metadata retrieval is disabled by default, and can be enabled by using the `.withFetchMetadata` API. + +```typescript +const trigger = new S3EventTrigger.Builder() + .withScope(this) + .withIdentifier('Trigger') + .withCacheStorage(cache) + .withBucket(bucket) + .withFetchMetadata(true) + .build(); +``` + +
+ +--- + +### 👨‍💻 Algorithm + The S3 event trigger middleware converts S3 native events into the [CloudEvents](/project-lakechain/general/events) specification and enriches the document description with required metadata, such as the mime-type, the size, and the Etag associated with the document. All those information cannot be inferred from the S3 event alone, and to efficiently compile those metadata, this middleware uses the following algorithm. @@ -101,8 +121,9 @@ All those information cannot be inferred from the S3 event alone, and to efficie 4. If the mime-type cannot be inferred from the extension, we try to infer it from the S3 reported content type. 5. If the mime-type cannot be inferred from the S3 reported content type, we try to infer it from the first bytes of the document using a chunked request. 6. If the mime-type cannot be inferred at all, we set the mime-type to `application/octet-stream`. +7. If [S3 object metadata retrieval](#%EF%B8%8F-metadata) is enabled, the middleware will issue a request to S3 and enrich the Cloud Event with the object metadata. -
+
--- @@ -184,7 +205,7 @@ This middleware emits [Cloud Events](/project-lakechain/general/events) whenever -
+
--- @@ -194,13 +215,13 @@ The S3 trigger receives S3 events from subscribed buckets on its SQS input queue ![Architecture](../../../assets/s3-event-trigger-architecture.png) -
+
--- ### 🏷️ Properties -
+
##### Supported Inputs @@ -218,7 +239,7 @@ The S3 trigger receives S3 events from subscribed buckets on its SQS input queue | ----- | ----------- | | `CPU` | This middleware is based on a Lambda architecture. | -
+
--- diff --git a/packages/middlewares/triggers/s3-event-trigger/src/definitions/opts.ts b/packages/middlewares/triggers/s3-event-trigger/src/definitions/opts.ts index 33b0434a..8ebd6947 100644 --- a/packages/middlewares/triggers/s3-event-trigger/src/definitions/opts.ts +++ b/packages/middlewares/triggers/s3-event-trigger/src/definitions/opts.ts @@ -30,7 +30,17 @@ export const S3EventTriggerPropsSchema = MiddlewarePropsSchema.extend({ .array(z.custom( (data) => data instanceof Object )) - .nonempty() + .nonempty(), + + /** + * Whether to fetch the metadata of the S3 objects to + * enrich the document metadata. + * @default false + */ + fetchMetadata: z + .boolean() + .optional() + .default(false) }); // Export the `S3EventTriggerProps` type. diff --git a/packages/middlewares/triggers/s3-event-trigger/src/index.ts b/packages/middlewares/triggers/s3-event-trigger/src/index.ts index 4ec5f0c7..ab8ef22b 100644 --- a/packages/middlewares/triggers/s3-event-trigger/src/index.ts +++ b/packages/middlewares/triggers/s3-event-trigger/src/index.ts @@ -96,6 +96,17 @@ class S3EventTriggerBuilder extends MiddlewareBuilder { return (this); } + /** + * Sets whether to fetch the metadata of the S3 objects to + * enrich the document metadata. + * @param value whether to fetch the metadata. + * @default false + */ + public withFetchMetadata(value: boolean) { + this.triggerProps.fetchMetadata = value; + return (this); + } + /** * @returns a new instance of the `S3EventTrigger` * service constructed with the given parameters. @@ -157,7 +168,8 @@ export class S3EventTrigger extends Middleware { environment: { POWERTOOLS_SERVICE_NAME: description.name, POWERTOOLS_METRICS_NAMESPACE: NAMESPACE, - SNS_TARGET_TOPIC: this.eventBus.topicArn + SNS_TARGET_TOPIC: this.eventBus.topicArn, + FETCH_METADATA: this.props.fetchMetadata ? 'true' : 'false' }, bundling: { minify: true, diff --git a/packages/middlewares/triggers/s3-event-trigger/src/lambdas/event-handler/get-document.ts b/packages/middlewares/triggers/s3-event-trigger/src/lambdas/event-handler/get-document.ts index e69b5421..189be9f5 100644 --- a/packages/middlewares/triggers/s3-event-trigger/src/lambdas/event-handler/get-document.ts +++ b/packages/middlewares/triggers/s3-event-trigger/src/lambdas/event-handler/get-document.ts @@ -19,14 +19,15 @@ import mimeTypes from './mime-types.json'; import { Readable } from 'stream'; import { S3DocumentDescriptor } from '@project-lakechain/sdk/helpers'; import { S3Bucket, S3Object } from './definitions/s3'; -import { Document, EventType } from '@project-lakechain/sdk/models'; +import { Document, DocumentMetadata, EventType } from '@project-lakechain/sdk/models'; import { tracer } from '@project-lakechain/sdk/powertools'; import { S3Client, GetObjectCommand, + HeadObjectCommand, NotFound, - NoSuchKey, + NoSuchKey } from '@aws-sdk/client-s3'; import { ObjectNotFoundException, @@ -189,6 +190,39 @@ const onDeleted = (bucket: S3Bucket, obj: S3Object): Document => { .build()); }; +/** + * @param bucket the bucket information. + * @param obj the object information. + * @param eventType whether the event is an object created or removed event. + * @returns the metadata associated with the S3 object. + */ +export const getMetadata = async (bucket: S3Bucket, obj: S3Object, eventType: EventType): Promise => { + const metadata: DocumentMetadata = {}; + + if (eventType === EventType.DOCUMENT_DELETED) { + // When a document is deleted, it is not possible + // to fetch its metadata. + return ({}); + } + + try { + // Fetch the metadata of the S3 object. + const res = await client.send(new HeadObjectCommand({ + Bucket: bucket.name, + Key: obj.key + })); + + // Add the metadata to the document. + if (res.Metadata && Object.keys(res.Metadata).length > 0) { + metadata.custom = res.Metadata; + } + } catch (err) { + return (metadata); + } + + return (metadata); +}; + /** * @param bucket the bucket information. * @param obj the object information. diff --git a/packages/middlewares/triggers/s3-event-trigger/src/lambdas/event-handler/index.ts b/packages/middlewares/triggers/s3-event-trigger/src/lambdas/event-handler/index.ts index 21b66cf0..1b5db15a 100644 --- a/packages/middlewares/triggers/s3-event-trigger/src/lambdas/event-handler/index.ts +++ b/packages/middlewares/triggers/s3-event-trigger/src/lambdas/event-handler/index.ts @@ -15,7 +15,7 @@ */ import { randomUUID } from 'crypto'; -import { getDocument } from './get-document'; +import { getDocument, getMetadata } from './get-document'; import { ObjectNotFoundException, InvalidDocumentObjectException } from './exceptions'; import { LambdaInterface } from '@aws-lambda-powertools/commons/types'; import { logger, tracer } from '@project-lakechain/sdk/powertools'; @@ -32,7 +32,8 @@ import { import { CloudEvent, EventType as DocumentEvent, - DataEnvelope + DataEnvelope, + DocumentMetadata } from '@project-lakechain/sdk/models'; import { BatchProcessor, @@ -40,6 +41,11 @@ import { processPartialResponse } from '@aws-lambda-powertools/batch'; +/** + * Environment variables. + */ +const FETCH_METADATA = process.env.FETCH_METADATA === 'true'; + /** * The async batch processor processes the received * events from SQS in parallel. @@ -83,7 +89,7 @@ const unquote = (event: S3EventRecord): S3EventRecord => { class Lambda implements LambdaInterface { /** - * @param event the S3 event record. + * @param s3Event the S3 event record. * @note the `next` decorator will automatically forward the * returned cloud event to the next middlewares */ @@ -91,6 +97,7 @@ class Lambda implements LambdaInterface { async s3RecordHandler(s3Event: S3EventRecord): Promise { const event = unquote(s3Event); const eventType = getEventType(event.eventName); + let metadata: DocumentMetadata = {}; // Construct a document from the S3 object. const document = await getDocument( @@ -99,6 +106,15 @@ class Lambda implements LambdaInterface { eventType ); + // Optionally fetch the object metadata. + if (FETCH_METADATA) { + metadata = await getMetadata( + event.s3.bucket, + event.s3.object, + eventType + ); + } + // Construct the initial event that will be consumed // by the next middlewares. return (new CloudEvent.Builder() @@ -107,7 +123,7 @@ class Lambda implements LambdaInterface { .withChainId(randomUUID()) .withSourceDocument(document) .withDocument(document) - .withMetadata({}) + .withMetadata(metadata) .build()) .build()); }