Example of PyFlink application reading from Kinesis Data Stream and writing to Kinesis Data Firehose
- Flink version: 1.15.2
- Flink API: Table API/SQL
- Language: Python
Simple application reading records from Kinesis Data Stream and publishing to Kinesis Data Firehose.
🚨 This example also shows how to package a PyFlink application with multiple jar dependencies (e.g. multiple connectors).
If you need to use multiple connectors in your streaming application, you will need to create a fat jar, bundle it with your application and reference it in your application configuration as described here. This sample application shows how to bundle multiple connectors into a fat jar.
Pre-requisites:
- Apache Maven
- A Kinesis Data Stream and Kinesis Data Firehose Stream
- (If running locally) Apache Flink installed and appropriate AWS credentials to access Kinesis and Firehose streams
To get this sample application working locally:
- Run
mvn clean package
in the FirehoseSink folder - Ensure the resulting jar is referenced correctly in the python script
- Ensure the
application_properties.json
parameters are configured correctly - Set the environment variable
IS_LOCAL=True
- Run the python script
python streaming-firehose-sink.py
To get this sample application working in Amazon Managed Service for Apache Flink:
- Run
mvn clean package
in the FirehoseSink folder - Zip the python script and fat jar generated in the previous step
- Upload the zip to an in region S3 bucket
- Create the Amazon Managed Service for Apache Flink application
- Configure the application to use the zip uploaded to the S3 bucket and configure the application IAM role to be able to access both Kinesis and Firehose streams
- Run the application
A sample script to produce appropriate Kinesis records, as well as detailed configuration instructions for Amazon Managed Service for Apache Flink can be found here.
- Local development: reads application_properties.json
- Deployed on Amazon Managed Service for Apache Fink: set up Runtime Properties, using Groupd ID and property names based on the content of application_properties.json
Use the Python script to generate sample stock data to Kinesis Data Stream.