Comunicación entre lambda producer y lambda consumer utilizando el servicio SQS de AWS con colas FIFO implementado con Systems Manager Parameter Store, Api-Gateway, Serverless-Framework, Lambda, NodeJs, aws sdk-v3, ElasticMQ, entre otros.
Ver
- 1.0) Descripción del Proyecto.
- 1.1) Ejecución del Proyecto.
- 1.2) Configurar el proyecto serverless desde cero
- 1.3) Tecnologías.
1.0) Descripción 🔝
Ver
Comunicación entre lambda producer y lambda consumer utilizando el servicio SQS de AWS con colas FIFO implementado con Systems Manager Parameter Store, Api-Gateway, Serverless-Framework, Lambda, NodeJs, Docker, ElasticMQ, entre otros.
- Playlist proyecto
Importante
: Para el uso de colas de tipo FIFO, según la opción de uso de elasticmq como server, es necesario que se tenga la versión 0.15.4 del .jar en adelante para la correcta ejecución de las mismas.
- La imagen de la arquitectura de aws empleada describe el flujo de funcionamiento del sistema de envío de mensajes a través de SQS de forma general. Cualquier petición hacia el mismo, parte desde un cliente (Postman, navegador, etc).
Paso 1
: Dicha solicitud es recibida por el api-gateway y solamente se validará si es que dentro de los encabezados de dicha solicitud se encuentra la x-api-key correcta. Existe la excepción de encolar mensajes desde una URI de referencia (http://localhost:9324/000000000000/queue-one.fifo?Action=SendMessage&MessageBody=HELLO&MessageGroupId=XXXX), pero sin pasar por la lambda senderPaso 2
: El api gateway valida la petición y la reenvía hacia la lambda sender. El único punto de acceso es este. (Para la arquitectura planteada)Paso 3
: La lambda sender realiza las validaciones de las ssm correspondientes con el System Manager Paramater Store.. validan token, valores de sqs definidos (host, puerto, nombres de colas, etc).Pasos 4
: La lambda sender encola el mensaje en la cola de tipo fifo explicitada para luego ser consumida por la lambda receiver.Pasos 5
: La lambda receiver imprime el mensaje (objeto de tipo Record) en consola.Aclaraciones
: Se emula dicho funcionamiento dentro de la misma red y en entorno local con los plugins de serverless correspondientes.
1.1) Ejecución del Proyecto 🔝
Ver
- Creamos un entorno de trabajo a través de algún ide, podemos o no crear una carpeta raíz para el proyecto, nos posicionamos sobre la misma
cd 'projectRootName'
- Una vez creado un entorno de trabajo a través de algún ide, clonamos el proyecto
git clone https://github.com/andresWeitzel/Producer_Consumer_SQS_FIFO_AWS
- Nos posicionamos sobre el proyecto
cd 'projectName'
- Instalamos la última versión LTS de Nodejs(v18)
- Instalamos Serverless Framework de forma global si es que aún no lo hemos realizado
npm install -g serverless
- Verificamos la versión de Serverless instalada
sls -v
- Instalamos todos los paquetes necesarios
npm i
- Creamos un archivo para almacenar las variables ssm utilizadas en el proyecto (Más allá que sea un proyecto con fines no comerciales es una buena práctica utilizar variables de entorno).
- Click der sobre la raíz del proyecto
- New file
- Creamos el archivo con el name
serverless.ssm.yml
. Este deberá estar a la misma altura que el serverless.yml - Añadimos las ssm necesarias dentro del archivo.
# Keys
X_API_KEY : 'f98d8cd98h73s204e3456998ecl9427j'
BEARER_TOKEN : 'Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c'
#GRAL CONFIG
AWS_REGION : 'us-east-1'
AWS_ACCESS_KEY_RANDOM_VALUE: 'xxxx'
AWS_SECRET_KEY_RANDOM_VALUE: 'xxxx'
#SQS CONFIG
SQS_HOST: 127.0.0.1
SQS_PORT: 9324
SQS_API_VERSION: "latest"
SQS_URL: 'http://127.0.0.1:9324'
#QUEUE CONFIG
QUEUE_FIFO_ONE_NAME : 'queue-one.fifo'
QUEUE_FIFO_ONE_URL: 'http://127.0.0.1:9324/queue/queue-one.fifo'
# SERVERLESS CONFIG
SERVERLESS_HTTP_PORT : 4000
SERVERLESS_LAMBDA_PORT : 4002
- El siguiente script configurado en el package.json del proyecto es el encargado de
- Levantar serverless-offline (serverless-offline)
"scripts": {
"serverless-offline": "sls offline start",
"start": "npm run serverless-offline"
},
- Ejecutamos la app desde terminal.
npm start
- Si se presenta algún mensaje indicando qué el puerto 9324 ya está en uso, podemos terminar todos los procesos dependientes y volver a ejecutar la app
npx kill-port 9324
npm start
1.2) Configurar el proyecto serverless desde cero 🔝
Ver
- Creamos un entorno de trabajo a través de algún ide, podemos o no crear una carpeta raíz para el proyecto, nos posicionamos sobre la misma
cd 'projectRootName'
- Una vez creado un entorno de trabajo a través de algún ide, clonamos el proyecto
git clone https://github.com/andresWeitzel/Producer_Consumer_SQS_FIFO_AWS
- Nos posicionamos sobre el proyecto
cd 'projectName'
- Instalamos la última versión LTS de Nodejs(v18)
- Instalamos Serverless Framework de forma global si es que aún no lo hemos realizado
npm install -g serverless
- Verificamos la versión de Serverless instalada
sls -v
- Inicializamos un template de serverles
serverless create --template aws-nodejs
- Inicializamos un proyecto npm
npm init -y
- Instalamos serverless offline y agregamos el plugin al .yml
npm i serverless-offline --save-dev
- Instalamos serverless ssm y agregamos el plugin al .yml
npm i serverless-offline-ssm --save-dev
- Instalamos el plugin @aws-sdk/client-sqs para el uso de sqs..
npm i @aws-sdk/client-sqs
- Seteamos todas las variables de entorno del proyecto
# Keys
X_API_KEY : 'f98d8cd98h73s204e3456998ecl9427j'
BEARER_TOKEN : 'Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c'
#GRAL CONFIG
AWS_REGION : 'us-east-1'
AWS_ACCESS_KEY_RANDOM_VALUE: 'xxxx'
AWS_SECRET_KEY_RANDOM_VALUE: 'xxxx'
#SQS CONFIG
SQS_HOST: 127.0.0.1
SQS_PORT: 9324
SQS_API_VERSION: "latest"
SQS_URL: 'http://127.0.0.1:9324'
#QUEUE CONFIG
QUEUE_FIFO_ONE_NAME : 'queue-one.fifo'
QUEUE_FIFO_ONE_URL: 'http://127.0.0.1:9324/queue/queue-one.fifo'
# SERVERLESS CONFIG
SERVERLESS_HTTP_PORT : 4000
SERVERLESS_LAMBDA_PORT : 4002
- Instalamos serverless SQS y agregamos el plugin al .yml
npm i serverless-offline-sqs --save-dev
- Descargamos el .jar para la ejecución de elasticmq en local. Click en la parte donde dice download (runs stand-alone (download)).
- Creamos un directorio en la raíz del proyecto para almacenar el servidor elasticmq.
mkdir .elasticmq
- Incluimos el .jar ahi dentro y creamos un archivo de configuración necesario.
cd .elasticmq
mkdir elasticmq.config
- Por temas de simplificación partimos de un archivo presetado. Esto es configurable en base a nombres de colas, region, puertos, etc
include classpath("application.conf")
node-address {
protocol = http
host = localhost
port = 9324
context-path = ""
}
rest-sqs {
enabled = true
bind-port = 9324
bind-hostname = "127.0.0.1"
sqs-limits = strict
}
generate-node-address = false
queues {
"queue-one.fifo" {
defaultVisibilityTimeout = 10 seconds
delay = 0 seconds
receiveMessageWait = 0 seconds
deadLettersQueue {
name = "queue-one.fifo-deadletter-queue"
maxReceiveCount = 3
}
fifo = true
contentBasedDeduplication = true
}
queue-one.fifo-deadletter-queue {
fifo = true
}
}
aws {
region = us-east-1
accountId = 000000000000
}
- En base a esta config, declaramos la misma en el .yml para que por cada ejecución de serverless, se creen los recursos, la config anterior del archivo elasticmq.config es para que la tome el server de elastic.mq
- Seteamos los recursos de cola en el .yml
resources:
Resources:
myFirstQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: myFirstQueue
MessageRetentionPeriod: 1209600
RedrivePolicy:
deadLetterTargetArn:
Fn::GetAtt:
- myFirstQueue
- Arn
maxReceiveCount: 3
VisibilityTimeout: 10
- Luego seteamos serverless-offline-sqs
serverless-offline-sqs:
sqsHost: 127.0.0.1
sqsPort: 9324
autoCreate: false
apiVersion: "latest"
endpoint: http://127.0.0.1:9324
region: us-east-1
accessKeyId: local
secretAccessKey: local
skipCacheInvalidation: false
- Seteamos la lambda en el .yml...resumiendo...nos quedaria el serverless.yml de la sig manera
service: aws-sqs-offline
frameworkVersion: "3"
provider:
name: aws
runtime: nodejs18.x
stage: dev
apiGateway:
apiKeys:
- name : xApiKey
value : 'f98d8cd98h73s204e3456998ecl9427j'
plugins:
- serverless-offline-sqs
- serverless-offline
functions:
hello:
handler: handler.hello
QueueSendMessage:
handler: handler.sendMessage
name: Queue-SendMessage-Lambda
description: to send sqs message
events:
- http:
method: POST
path: sender-queue
private: true
QueueReceiveMessage:
handler: handler.receiveMessage
name: Queue-ReceiveMessage-Lambda
description: to receive sqs message
events:
- sqs:
arn:
Fn::GetAtt:
- myFirstQueue
- Arn
batchSize: 10
custom :
serverless-offline:
httpPort: 4000
lambdaPort: 4002
useChildProcesses: false
serverless-offline-sqs:
sqsHost: 127.0.0.1
sqsPort: 9324
autoCreate: false
apiVersion: "latest"
endpoint: http://127.0.0.1:9324
region: us-east-1
accessKeyId: local
secretAccessKey: local
skipCacheInvalidation: false
resources:
Resources:
myFirstQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: myFirstQueue
MessageRetentionPeriod: 1209600
RedrivePolicy:
deadLetterTargetArn:
Fn::GetAtt:
- myFirstQueue
- Arn
maxReceiveCount: 3
VisibilityTimeout: 10
- Creamos el archivo handler que sera una lambda donde emule el envío y recibimiento de mensajes
odule.exports.sendMessage = async (event) => {
const AWS = require("aws-sdk");
const SQS = new AWS.SQS({
accessKeyId: "local",
secretAccessKey: "local",
endpoint: "127.0.0.1:9324"
});
try {
const queueParams = {
Entries: [
{
Id: "1",
MessageBody: "this is a message body",
}
],
QueueUrl: 'http://127.0.0.1:9324/queue/myFirstQueue'
}
const result = await SQS.sendMessageBatch(queueParams).promise();
console.log(JSON.stringify(result, null, 2));
} catch (e) {
console.error(e);
}
};
module.exports.receiveMessage = async (event) => {
console.log(JSON.stringify(event.Records, null, 2));
};
- Instalamos la dependencia para la ejecución de scripts en paralelo
npm i concurrently
- El siguiente script configurado en el package.json del proyecto es el encargado de
- Levantar el server de elasticmq
- Levantar serverless-offline
"scripts": {
"serverless-offline": "sls offline start",
"queue-start": "java -Dconfig.file=.elasticmq/elasticmq.config -jar .elasticmq/elasticmq-server-0.15.4.jar",
"start": "concurrently --kill-others \"npm run queue-start\" \"npm run serverless-offline\""
},
- Ejecutamos la app desde terminal.
npm start
- Si se presenta algún mensaje indicando qué el puerto 9324 ya está en uso, podemos terminar todos los procesos dependientes y volver a ejecutar la app
npx kill-port 9324
npm start
Importante:
El ejemplo base descrito podemos visualizarlo en otro repositorio. Dirigirse a SQS-offline-example-aws
1.3) Tecnologías 🔝
Ver
Tecnologías | Versión | Finalidad |
---|---|---|
SDK | 4.3.2 | Inyección Automática de Módulos para Lambdas |
Serverless Framework Core v3 | 3.23.0 | Core Servicios AWS |
Serverless Plugin | 6.2.2 | Librerías para la Definición Modular |
Systems Manager Parameter Store (SSM) | 3.0 | Manejo de Variables de Entorno |
Amazon Simple Queue Service (SQS) | 7.0 | Servicio de colas de mensajes distribuidos |
Elastic MQ | 1.3 | Interfaz compatible con SQS (msg memory) |
Amazon Api Gateway | 2.0 | Gestor, Autenticación, Control y Procesamiento de la Api |
NodeJS | 14.18.1 | Librería JS |
VSC | 1.72.2 | IDE |
Postman | 10.11 | Cliente Http |
CMD | 10 | Símbolo del Sistema para linea de comandos |
Git | 2.29.1 | Control de Versiones |
Plugin | Descarga |
---|---|
serverless-offline | https://www.serverless.com/plugins/serverless-offline |
serverless-offline-ssm | https://www.npmjs.com/package/serverless-offline-ssm |
serverless-offline-sqs | https://www.npmjs.com/package/serverless-offline-sqs |
Extensión |
---|
Prettier - Code formatter |
YAML - Autoformatter .yml (alt+shift+f) |
DotENV |
2.0) Endpoints y recursos 🔝
Ver
Variable | Initial value | Current value |
---|---|---|
base_url | http://localhost:4000/dev | http://localhost:4000/dev |
x-api-key | f98d8cd98h73s204e3456998ecl9427j | f98d8cd98h73s204e3456998ecl9427j |
bearer_token | Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c | Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c |
curl --location --request GET 'http://localhost:9324/?Action=ListQueues'
<ListQueuesResponse xmlns="http://queue.amazonaws.com/doc/2012-11-05/">
<ListQueuesResult>
<QueueUrl>http://localhost:9324/queue/queue-one</QueueUrl><QueueUrl>http://localhost:9324/queue/queue-one.fifo</QueueUrl>
</ListQueuesResult>
<ResponseMetadata>
<RequestId>00000000-0000-0000-0000-000000000000</RequestId>
</ResponseMetadata>
</ListQueuesResponse>
curl --location --request GET 'http://localhost:9324/000000000000/queue-one.fifo?Action=SendMessage&MessageBody=HELLO&MessageGroupId=XXXX'
<SendMessageResponse xmlns="http://queue.amazonaws.com/doc/2012-11-05/">
<SendMessageResult>
<MD5OfMessageBody>eb61eead90e3b899c6bcbe27ac581660</MD5OfMessageBody>
<MessageId>ead221b3-5ec5-4e00-b69a-fabd46f003fd</MessageId>
</SendMessageResult>
<ResponseMetadata>
<RequestId>00000000-0000-0000-0000-000000000000</RequestId>
</ResponseMetadata>
</SendMessageResponse>
curl --location 'http://localhost:4000/dev/sender-queue/' \
--header 'x-api-key: f98d8cd98h73s204e3456998ecl9427j' \
--header 'Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c' \
--header 'Content-Type: application/json' \
--data '{
"JsonObject": {
"DataType": "String",
"StringValue": "Example for sender an object inside de MessageAttributes"
}
}'
{
"message": {
"middlewareStack": {},
"input": {
"QueueUrl": "http://127.0.0.1:9324/queue/queue-one.fifo",
"DelaySeconds": 0,
"MessageDeduplicationId": "33fbc227-08c7-4bf3-90b4-c705f51f7e4e",
"MessageGroupId": "33fbc227-08c7-4bf3-90b4-c705f51f7e4e",
"MessageBody": "information about sending the message",
"MessageAttributes": {
"JsonObject": {
"DataType": "String",
"StringValue": "Example for sender an object inside de MessageAttributes"
}
}
}
}
}