Tabla de contenido
Ingestar fácilmente datos de numerosas fuentes y tomar decisiones oportunas se está convirtiendo en una capacidad crítica y central para muchas empresas. En este laboratorio, brindamos experiencia práctica en el uso de Kinesis Data Stream para capurar datos en tiempo real, Kinesis Data Firehose para cargar flujos de datos en Amazon S3 y realizar análisis en tiempo real en el flujo con Kinesis Data Analytics.
Para este laboratorio, necesitaremos tener una cuenta de AWS con los permisos le lectura y escritura sobre los siguientes servicios
- AWS Kinesis Data Streams
- AWS Kinesis Data Firehose
- AWS Kinesis Data Analytics
- AWS Glue Catalog
- AWS Lambda
- Amazon S3
- Cognito user Pools
En este paso utilizaremos Amazon Kinesis Data Generator (KDG) que nos facilitará el envío de datos a Kinesis Streams o Kinesis Firehose.
Antes de poder enviar datos a Kinesis, primero debe crear un usuario de Amazon Cognito en su cuenta de AWS con permisos para acceder a Amazon Kinesis. Para simplificar este proceso, se proporcionan una función de Amazon Lambda y una plantilla de Amazon CloudFormation para crear el usuario y asignar los permisos necesarios para usar el KDG.
NOTA: La configuración de Kinesis Data Generator (KDG) en una cuenta de AWS creará un conjunto de credenciales de Cognito. Los usuarios que puedan autenticarse con esas credenciales podrán publicar en todos los Kinesis Data Streams y Kinesis Data Firehoses de la cuenta. Después de ejecutar la configuración a continuación, puede cambiar los roles de IAM que se crean para restringir los permisos para publicar en flujos especificos.
- Inicie Sesión en la consola de AWS
- Ingresa al siguiente enlace: Cloudformation deploy Donde se te abrirá un template de cloudformation ya listo para desplegar en tu cuenta de AWS
Tambien puedes cargar el template de cero en cloudformation utilizando el template cloudformation, descargandolo desde el siguiente enlace
cognito-setup.yaml
3. Clic en el botón Next
- Debes crear un usuario y una contraseña para tu usuario de cognito rellenando los campos
username
ypassword
y le damos clic en el botónNext
- Para este ejercicio vamos a dejar los demas campos con los valores por defecto y le damos clic en el botón
Next
- Revisamos la Configuración, y Aceptamos los terminos para que se incie la creación de los recursos en nuestra cuent de AWS y hacemos clic en el botón
Submit
Inmediatamente iniciaría el proceso de creación de los recursos necesarios para autorizar al Kinesis Data Generator (KDG) a escribir datos en nuestros recursos de Kinesis.
Una vez se haya completado la creación de los recursos, nos debe mostrar el estado CREATE_COMPLETE
Podemos comprobar que recusos se crearon, haciendo clic en el stack y llendo a la sección Resources
Una vez hayamos comprobado que los recursos desplegados en el stack se crearon correctamente, vamos a la pestaña Output
y accedemos a la URL que nos generó
Cuando nos cargue por completo la página del Kinesis Data Generator, Iniciamos sesión con el username
y la password
que creamos en uno de los pasos anteriores y nos debe mostrar la siguiente pantalla:
Nuestro Kinesis Data stream reibirá los datos que enviemos desde el Kinesis Data Generator en tiempo real para posteriormente enviarlos a un destino en S3 utilizando Kinesis Data Firehose y Realizando análisis en tiempo real con Kinesis Data Analytics
- En el búscador de la consola de AWS escribimos
Kinesis
y seleccionamosKinesis Data Stream
-
Hacemos clic en
Create Data Streams
-
Ingresamos la siguiente configuración:
- Data stream name:
kds-workshop-ug
- Capacity mode :
Provisioned
- Provisioned shards:
1
- Data stream name:
-
Clic en el botón
Create Data Stream
Nuestro Kinesis se empezará a crear y una vez finalicé se debe ver de la siguiente manera:
Crearemos una secuencia de entrega de datos de Kinesis Data Firehose, para recibir los datos que se ingestan en Kinesis Data Streams y llevarlos a un bucket de S3 de manera particionada
-
Estando en el servicio de Kinesis en la consola de AWS, hacemos clic en la opción
Delivery Stream
y hacemos clic en el botónCreate Delivery Stream
-
Ingresamos la siguiente configuración
- Source:
Amazon Kinesis Data Stream
- Destination:
Amazon S3
- Source:
-
En la sección
Source settings
, hacemos clic en el botónBrowse
y seleccionamos nuestro Kinesis Data Stream creado anteriormente -
Diligenciamos el campo Delivery stream name:
kdf-workshop-ug
-
En la sección
Destination settings
crearemos un nuevo bucket de S3, a donde nos llegaran nuestros datos particionadosHacemos clic en la opción
Create
y llenamos la siguiente configuración:- Bucket name:
workshop-ug-med-bucket
- dejamos la demas configuración por defecto
- Hacemos clic en el Botón
Crear Bucket
- Bucket name:
-
Volvemos a la pantalla donde estamos creando nuestro Kinesis Data Firehose y en el campo
S3 bucket
ingresamos el nombre el bucket que acabamos de crear -
dejamos los demas campos con la configuración por defecto y hacemos clic en el botón
Create Delivery Stream
Una vez se cree nuestro delivery steam, se verá de la siguiente manera y estará listo para recibir los datos y almacenarlos en S3
- Estando en el servicio de Kinesis en la consola de AWS, ingresamos a
Streaming Applications
, nos vamos a la secciónStudio
y hacemos clic en en botónCreate Studio Notebook
- Ingresas la siguiente configuración:
- Creation method:
Quick create with sample code
- Studio notebook name:
kda-workshop-ug
- AWS Glue database:
Default
- Clic botón
Create Studio Notebook
- Creation method:
Se iniciará el proceso de creación de nuestra aplicación de Kinesis Data Analytics, y en unos segundos estará lista para usar
En el paso anterior, por defecto se crea un rol, el cual debemos modificarlo para darle acceso al nuestro notebook a los servicios de S3, Kinesis y Glue
- Hacemos clic en el nombre del rol y nos llevará directamente a IAM
- Seleccionamos nuestro rol y hacemos clic en el boton
Attach Policies
- Buscamos y agregamos
AmazonS3FullAccess
,AmazonKinesisFullAccess
yAWSGlueServiceRole
La configuración de nuestra aplicación ya está completa, cuando esté lista pasamos a a siguiente sesión.
Ahora que nuestra aplicación ya está lista, lo que debemos hacer es iniciarla, y empezar a explorar nuestros datos.
- hacemos clic en el botón Run y esperamos unos minutos a que se inicie nuestra aplicación
-
Cuando la apliación esté en estado
Running
, clic enOpen in Apache Zeppelin
-
Creamos nuestro primer Notebook: Estando en la interfaz de Zeppelin, hacemos clic en la opción
Create new note
y ya tendremos nuestra interfaz lista para empezar a analizar datos de manera interactiva
Ya está todo listo. Ahora solo nos falta, empezar a recibir datos para iniciar nuestro análisis en tiempo real, asi que volvemos al Kinesis Data Generator y llenamos la siguiente configuración:
- Region:
us-east-1
- Stream/delivery stream:
kds-workshop-ug
- Records per second:
1
- Record template: Pegamos el siguiente script, el cual nos generará data de manera dinamica:
{
"fecha_compra": "{{date.now("YYYY-MM-DD HH:mm:ss")}}",
"nombre_cliente": "{{name.firstName}} {{name.lastName}}",
"numero_documento":{{random.number(1000000000)}},
"cantidad_items": {{random.number(
{
"min":1,
"max":150
}
)}},
"estado_transaccion": "{{random.arrayElement(
["OK","SIN FONDOS","ERROR PLATAFORMA"]
)}}",
"valor_total":{{random.number(
{
"min":1000,
"max":10000000
}
)}}
}
Hacemos Clic en el botón `Send Data` y se empezarán a ingestar los datos en nuestro Kinesis Data Streams
Volvemos a nuestro Notebook de Zeppelin, y lo primero que haremos es crear una tabla con nuestra estructura de datos, asi que pegamos el siguiente fragmeto y lo ejecutamos en el notebook:
%flink.ssql
CREATE TABLE ventasug (
`fecha_compra` TIMESTAMP(3),
`nombre_cliente` STRING,
`numero_documento` STRING,
`cantidad_items` INT,
`estado_transaccion` STRING,
`valor_total` INT
)
PARTITIONED BY (estado_transaccion)
WITH (
'connector' = 'kinesis',
'stream' = 'kds-workshop-ug',
'aws.region' = 'us-east-1',
'scan.stream.initpos' = 'LATEST',
'format' = 'json'
);
Nos debe salir el siguiente mensaje:
con esto se ha creado una tabla llamada ventasug
que estará disponible en nuestro catálogo de GLUE en la base de datos Default
.
Ya podremos empezar a realizar consultas sobre nuestros datos, ya sea SQL o utilizando Python y Escala.
Ahora, ejecutamos el siguiete Query y podemos ver como va llegando la data en tiempo real y podremos empezar a realizar análisis mas profundos
%flink.ssql(type=update)
select *
from ventasug;
- Parar el Kinesis Data Generator:
- Eliminar Kinesis Data Streams
- Eliminar/Detener Aplicación de Kinesis Data Analytics
- Eliminar Kinesis Data Firehose
- Eliminar Bucket S3
- Eliminar catálogo de GLUE