Skip to content

Commit

Permalink
feat: merge the s3 object metadata retrieval
Browse files Browse the repository at this point in the history
  • Loading branch information
HQarroum committed Jul 18, 2024
1 parent e13c855 commit 0d9e198
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 16 deletions.
37 changes: 29 additions & 8 deletions docs/src/content/docs/triggers/s3-event-trigger.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class Stack extends cdk.Stack {
}
```

You can also specify multiple buckets to be monitored by the S3 trigger.
You can also specify multiple buckets to be monitored by the S3 trigger by passing an array of S3 buckets to the `.withBuckets` method.

```typescript
const trigger = new S3EventTrigger.Builder()
Expand All @@ -64,7 +64,7 @@ const trigger = new S3EventTrigger.Builder()
.build();
```

<br>
<br />

---

Expand All @@ -85,12 +85,32 @@ const trigger = new S3EventTrigger.Builder()
.build();
```

<br>
<br />

---

### 🗂️ Metadata

The S3 event trigger middleware makes it optionally possible to fetch the [metadata](https://docs.aws.amazon.com/AmazonS3/latest/userguide/UsingMetadata.html) associated with the S3 object, and enrich the created cloud event with the object metadata.

> 💁 Metadata retrieval is disabled by default, and can be enabled by using the `.withFetchMetadata` API.
```typescript
const trigger = new S3EventTrigger.Builder()
.withScope(this)
.withIdentifier('Trigger')
.withCacheStorage(cache)
.withBucket(bucket)
.withFetchMetadata(true)
.build();
```

<br />

---

### 👨‍💻 Algorithm

The S3 event trigger middleware converts S3 native events into the [CloudEvents](/project-lakechain/general/events) specification and enriches the document description with required metadata, such as the mime-type, the size, and the Etag associated with the document.

All those information cannot be inferred from the S3 event alone, and to efficiently compile those metadata, this middleware uses the following algorithm.
Expand All @@ -101,8 +121,9 @@ All those information cannot be inferred from the S3 event alone, and to efficie
4. If the mime-type cannot be inferred from the extension, we try to infer it from the S3 reported content type.
5. If the mime-type cannot be inferred from the S3 reported content type, we try to infer it from the first bytes of the document using a chunked request.
6. If the mime-type cannot be inferred at all, we set the mime-type to `application/octet-stream`.
7. If [S3 object metadata retrieval](#%EF%B8%8F-metadata) is enabled, the middleware will issue a request to S3 and enrich the Cloud Event with the object metadata.

<br>
<br />

---

Expand Down Expand Up @@ -184,7 +205,7 @@ This middleware emits [Cloud Events](/project-lakechain/general/events) whenever
</table>
</details>

<br>
<br />

---

Expand All @@ -194,13 +215,13 @@ The S3 trigger receives S3 events from subscribed buckets on its SQS input queue

![Architecture](../../../assets/s3-event-trigger-architecture.png)

<br>
<br />

---

### 🏷️ Properties

<br>
<br />

##### Supported Inputs

Expand All @@ -218,7 +239,7 @@ The S3 trigger receives S3 events from subscribed buckets on its SQS input queue
| ----- | ----------- |
| `CPU` | This middleware is based on a Lambda architecture. |

<br>
<br />

---

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,17 @@ export const S3EventTriggerPropsSchema = MiddlewarePropsSchema.extend({
.array(z.custom<SourceDescriptor>(
(data) => data instanceof Object
))
.nonempty()
.nonempty(),

/**
* Whether to fetch the metadata of the S3 objects to
* enrich the document metadata.
* @default false
*/
fetchMetadata: z
.boolean()
.optional()
.default(false)
});

// Export the `S3EventTriggerProps` type.
Expand Down
14 changes: 13 additions & 1 deletion packages/middlewares/triggers/s3-event-trigger/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,17 @@ class S3EventTriggerBuilder extends MiddlewareBuilder {
return (this);
}

/**
* Sets whether to fetch the metadata of the S3 objects to
* enrich the document metadata.
* @param value whether to fetch the metadata.
* @default false
*/
public withFetchMetadata(value: boolean) {
this.triggerProps.fetchMetadata = value;
return (this);
}

/**
* @returns a new instance of the `S3EventTrigger`
* service constructed with the given parameters.
Expand Down Expand Up @@ -157,7 +168,8 @@ export class S3EventTrigger extends Middleware {
environment: {
POWERTOOLS_SERVICE_NAME: description.name,
POWERTOOLS_METRICS_NAMESPACE: NAMESPACE,
SNS_TARGET_TOPIC: this.eventBus.topicArn
SNS_TARGET_TOPIC: this.eventBus.topicArn,
FETCH_METADATA: this.props.fetchMetadata ? 'true' : 'false'
},
bundling: {
minify: true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ import mimeTypes from './mime-types.json';
import { Readable } from 'stream';
import { S3DocumentDescriptor } from '@project-lakechain/sdk/helpers';
import { S3Bucket, S3Object } from './definitions/s3';
import { Document, EventType } from '@project-lakechain/sdk/models';
import { Document, DocumentMetadata, EventType } from '@project-lakechain/sdk/models';
import { tracer } from '@project-lakechain/sdk/powertools';

import {
S3Client,
GetObjectCommand,
HeadObjectCommand,
NotFound,
NoSuchKey,
NoSuchKey
} from '@aws-sdk/client-s3';
import {
ObjectNotFoundException,
Expand Down Expand Up @@ -189,6 +190,39 @@ const onDeleted = (bucket: S3Bucket, obj: S3Object): Document => {
.build());
};

/**
* @param bucket the bucket information.
* @param obj the object information.
* @param eventType whether the event is an object created or removed event.
* @returns the metadata associated with the S3 object.
*/
export const getMetadata = async (bucket: S3Bucket, obj: S3Object, eventType: EventType): Promise<DocumentMetadata> => {
const metadata: DocumentMetadata = {};

if (eventType === EventType.DOCUMENT_DELETED) {
// When a document is deleted, it is not possible
// to fetch its metadata.
return ({});
}

try {
// Fetch the metadata of the S3 object.
const res = await client.send(new HeadObjectCommand({
Bucket: bucket.name,
Key: obj.key
}));

// Add the metadata to the document.
if (res.Metadata && Object.keys(res.Metadata).length > 0) {
metadata.custom = res.Metadata;
}
} catch (err) {
return (metadata);
}

return (metadata);
};

/**
* @param bucket the bucket information.
* @param obj the object information.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/

import { randomUUID } from 'crypto';
import { getDocument } from './get-document';
import { getDocument, getMetadata } from './get-document';
import { ObjectNotFoundException, InvalidDocumentObjectException } from './exceptions';
import { LambdaInterface } from '@aws-lambda-powertools/commons/types';
import { logger, tracer } from '@project-lakechain/sdk/powertools';
Expand All @@ -32,14 +32,20 @@ import {
import {
CloudEvent,
EventType as DocumentEvent,
DataEnvelope
DataEnvelope,
DocumentMetadata
} from '@project-lakechain/sdk/models';
import {
BatchProcessor,
EventType,
processPartialResponse
} from '@aws-lambda-powertools/batch';

/**
* Environment variables.
*/
const FETCH_METADATA = process.env.FETCH_METADATA === 'true';

/**
* The async batch processor processes the received
* events from SQS in parallel.
Expand Down Expand Up @@ -83,14 +89,15 @@ const unquote = (event: S3EventRecord): S3EventRecord => {
class Lambda implements LambdaInterface {

/**
* @param event the S3 event record.
* @param s3Event the S3 event record.
* @note the `next` decorator will automatically forward the
* returned cloud event to the next middlewares
*/
@next()
async s3RecordHandler(s3Event: S3EventRecord): Promise<CloudEvent> {
const event = unquote(s3Event);
const eventType = getEventType(event.eventName);
let metadata: DocumentMetadata = {};

// Construct a document from the S3 object.
const document = await getDocument(
Expand All @@ -99,6 +106,15 @@ class Lambda implements LambdaInterface {
eventType
);

// Optionally fetch the object metadata.
if (FETCH_METADATA) {
metadata = await getMetadata(
event.s3.bucket,
event.s3.object,
eventType
);
}

// Construct the initial event that will be consumed
// by the next middlewares.
return (new CloudEvent.Builder()
Expand All @@ -107,7 +123,7 @@ class Lambda implements LambdaInterface {
.withChainId(randomUUID())
.withSourceDocument(document)
.withDocument(document)
.withMetadata({})
.withMetadata(metadata)
.build())
.build());
}
Expand Down

0 comments on commit 0d9e198

Please sign in to comment.