The purpose of this project is to build an end-to-end AWS-hosted data engineering pipeline, supporting both batch and stream processing using pinterest's experiment processing pipeline.
- Extract the experimental data from an
RDS
database containing the 3 pinterest tables (pinterest_data
,geolocation_data
anduser_data
). - Setup an
EC2
instance withkafka
andKafka REST Proxy
installed to produce messages toAWS MSK
cluster. - Create an
S3
bucket to store data. - Create an
AWS MSK
cluster with anS3
endpoint to distribute data from aREST API
to anS3
Data Lake. - Create 3
kafka
topics corresponding to the 3RDS
tables. - Create a
REST API
usingAPI Gateway
to send data via aPOST
request to 3kafka
topics. - Extract the batch data from S3 then clean & transform it in Databricks using spark queries.
- Create a DAG using
Airflow
and upload it aAWS MWAA
environment to periodically trigger the Databricks notebook for batch processing.
- Create 3
Kinesis
data streams corresponding to the 3 pinterest tables to collect, process, and analyze data streams in real time. - Configure the previous
REST API
to allow it to invokeKineses
actions. - Send data to the
Kinesis
streams using theREST API
. - Read the data from the
Kinesis
streams within a Databricks Notebook, create streaming dataframes for the each of the 3Kinesis
data streams and transform the data using spark queries. - Write each stream to a
Delta Table
.
-
Step 1: Create an
EC2
instance.- On the AWS console create a
EC2
instance. - Create a
key-par
with a.pem
file format, save it locally. SSH
into theEC2
instance.
- On the AWS console create a
-
Step 2: Install
kafka
onEC2
.-
If
java
is not installed, install it:sudo yum install java-1.8.0
-
Download and install
kafka
:wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.12-2.8.1.tgz
tar -xzf kafka_2.12-2.8.1.tgz
-
The
kafka_installation_folder
should now be visible in the /home directory.
-
-
Step 3: Setup
IAM Authentication
on theEC2
to produce messages to theAWS MSK
cluster. This package is necessary to connect toAWS MSK
clusters that requireIAM Authentication
.- Navigate to the
kafka_installation_folder/libs
directory and install theIAM Authentication
package:wget https://github.com/aws/aws-msk-iam-auth/releases/download/v1.1.5/aws-msk-iam-auth-1.1.5-all.jar
- Navigate to the
-
Step 4: Configure the
EC2
kafka client to use AWS IAM.-
Navigate to the
kafka_installation_folder/bin
and edit theclient.properties
file to include theawsRoleArn
which can be found in the AWS console for theIAM
that was created for thisEC2
instance:# Sets up TLS for encryption and SASL for authN. security.protocol = SASL_SSL # Identifies the SASL mechanism to use. sasl.mechanism = AWS_MSK_IAM # Binds SASL client implementation. sasl.jaas.config = software.amazon.msk.auth.iam.IAMLoginModule required awsRoleArn="Your Access Role"; # Encapsulates constructing a SigV4 signature based on extracted credentials. # The SASL client bound by "sasl.jaas.config" invokes this class. sasl.client.callback.handler.class = software.amazon.msk.auth.iam.IAMClientCallbackHandler
-
-
Step 1: Create an
AWS MSK
cluster if not created and navigate to theAWS MSK
dashboard on the AWS console to extract from the cluster:Bootstrap servers string
Plaintext Apache Zookeeper connection string
-
Step 2: On the
EC2
kafka client create 3 kafka topics (Be consistent with the naming schema).- A topic for
pinterest_data
- A topic for
geolocation_data
- A topic for
user_data
- A topic for
-
Step 3: Create an
S3
bucket. Here all data passing through the kafka topics will be exported to the designated bucket. -
Step 4: Create a custom plugin.
-
This plugin will contain the code that defines the logic of our connector:
# assume admin user privileges sudo -u ec2-user -i # create directory where we will save our connector mkdir kafka-connect-s3 && cd kafka-connect-s3 # download connector from Confluent wget https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-s3/versions/10.0.3/confluentinc-kafka-connect-s3-10.0.3.zip # copy connector to our S3 bucket aws s3 cp ./confluentinc-kafka-connect-s3-10.0.3.zip s3://<BUCKET_NAME>/kafka-connect-s3/
-
You should find the uploaded file within the S3
bucket
-
Step 5: Create a connector to write to the bucket.
-
Select the previously created plugin and configure the
Connector configuration settings
template below accordingly.connector.class=io.confluent.connect.s3.S3SinkConnector # same region as our bucket and cluster s3.region=us-east-1 flush.size=1 schema.compatibility=NONE tasks.max=3 # include nomenclature of topic name, given here as an example will read all data from topic names starting with msk.topic.... topics.regex=<YOUR_UUID>.* format.class=io.confluent.connect.s3.format.json.JsonFormat partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner value.converter.schemas.enable=false value.converter=org.apache.kafka.connect.json.JsonConverter storage.class=io.confluent.connect.s3.storage.S3Storage key.converter=org.apache.kafka.connect.storage.StringConverter s3.bucket.name=<BUCKET_NAME>
-
- Step 1: Create a REST API via
API Gateway
with a proxy resource. By creating a proxy resource with the {proxy+} parameter and the ANY method, you can provide your integration with access to all available resources.
-
Step 2: For the proxy resource, create a HTTP ANY method. For the Endpoint URL, it should include the public ipv4 DNS of the
EC2
kafka client with the following format: http://KafkaClientEC2InstancePublicDNS:8082/{proxy} -
Step 3: Deploy the API and make note of the
Invoke URL
.
The endpoint URL
-
Step 1: Install confluent REST:
sudo wget https://packages.confluent.io/archive/7.2/confluent-7.2.0.tar.gz
tar -xvzf confluent-7.2.0.tar.gz
-
Step 2: Modify the
kafka-rest.properties
file.-
Navigate to
confluent-7.2.0/etc/kafka-rest
and modify thekafka-rest.properties
file similar to theclient.properties
file, however this time add "client." before each of the 4 variables. To allow communication between the REST proxy and the cluster brokers, all configurations should be prefixed with "client.". -
Modify the
zookeeper.connect
andbootstrap.severs
variables in the file, these were obtained during the AWS MSK Configuration section.
-
-
Step 4: Messages can now be produced to the kafka cluster by navigating to the
confluent-7.2.0/bin
directory and running:./kafka-rest-start /home/ec2-user/confluent-7.2.0/etc/kafka-rest/kafka-rest.properties
-
Step 5: Run the Batch Data Script. This script sends data from the 3 tables to their corresponding Kafka topics. Data passing through the
AWS MSK
cluster should be falling into theS3
bucket.
Using Databricks, we can provision a cluster running Apache Spark
to run spark queries for data transformations. See Batch Processing Notebook and Stream Processing Notebook for a comprehensive guide.
-
Step 1: Create a Databricks Notebook for batch processing and read the
AWS Credentials
that have been uploaded to a DatabricksDelta Table
and mount theS3
bucket to Databricks.aws_keys_df = spark.read.format("delta").load(delta_table_path) # Get the AWS access key and secret key from the spark dataframe ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID'] SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key'] # Mount the drive (RUN ONLY ONCE!) # dbutils command is supported only with databricks ecosystem. dbutils.fs.mount(SOURCE_URL, MOUNT_NAME)
-
Step 2: Data cleaning. Performing necessary transformations of the dataframes. Example using the
pinterest_data
dataframe.df_pin = df_pin.withColumn("description", when(col("description").contains("No description available"), "None").otherwise(col("description"))) df_pin = df_pin.withColumn("image_src", when(col("image_src").contains("Image src error"), "None").otherwise(col("image_src"))) df_pin = df_pin.withColumn("follower_count", when(col("follower_count").contains("User Info Error"), "None").otherwise(col("follower_count"))) df_pin = df_pin.withColumn("follower_count", regexp_replace(df_pin["follower_count"], "M", "000000")) df_pin = df_pin.withColumn("follower_count", regexp_replace(df_pin["follower_count"], "k", "000")) df_pin = df_pin.withColumn("downloaded",col("downloaded").cast("int")) df_pin = df_pin.withColumn("follower_count",col("follower_count").cast("int")) df_pin = df_pin.withColumn("index",col("index").cast("int")) df_pin = df_pin.withColumnRenamed("index","ind") df_pin = df_pin.select("ind", "unique_id", "title", "description", "follower_count", "poster_name", "tag_list", "is_image_or_video", "image_src", "save_location", "category")
-
Most popular annual category:
result= spark.sql("""SELECT post_year, category, category_count FROM ( SELECT year(timestamp) as post_year, category, count(*) as category_count, DENSE_RANK() OVER (PARTITION by year(timestamp) ORDER BY count(*) DESC) AS ranking FROM geo JOIN pin ON geo.ind = pin.ind GROUP BY year(timestamp), category ) where ranking = 1""" ) display(result)
-
Most popular category based on age group:
result = spark.sql("""WITH CTE AS ( SELECT age_group, category, category_count, DENSE_RANK() OVER (PARTITION BY age_group ORDER BY category_count DESC) AS ranking FROM ( SELECT CASE WHEN age BETWEEN 18 AND 24 THEN "18-24" WHEN age BETWEEN 25 AND 35 THEN "25-35" WHEN age BETWEEN 36 AND 50 THEN "36-50" WHEN age > 50 THEN "50+" --ELSE "NONE" END AS age_group, category, COUNT(*) AS category_count FROM user JOIN pin ON pin.ind = user.ind GROUP BY age_group, category ) ) SELECT age_group, category, category_count FROM CTE WHERE ranking = 1""" ) display(result)
- Step 3: Create a DAG using
Airflow
and upload it aAWS MWAA
environment to periodically trigger the Databricks notebook for batch processing. See Airflow DAG
- Step 1: Navigate to the AWS
Kinesis
dashboard on the AWS console and selectKinesis
Data Streams. Create 3 data streams, one for each table.
- Step 2: Configure the REST API created in the previous steps to allow for `Kinesis Proxy integration. Under the streams resource create a new child resource with the Resource name {stream-name}. The configuration should look like this:
-
Step 3: Run the Streaming Data Script. This script sends a
PUT
requests to the API, which adds one record at a time to the 3Kinesis
streams, one for each pinterest table. -
Step 4: Create a Databricks Notebook for stream processing and read the
AWS Credentials
that have been uploaded to a Databrick Delta Table. -
Step 5: Create a
streaming dataframe
for structured streaming events by reading in the data fromKinesis
streams.-
Example: Use the
readstream
method to create a dataframe for the pinterest data stream.df_pin = spark \ .readStream \ .format('kinesis') \ .option('streamName','streaming-12f6b2c1ae4f-pin') \ .option('initialPosition','earliest') \ .option('region','us-east-1') \ .option('awsAccessKey', ACCESS_KEY) \ .option('awsSecretKey', SECRET_KEY) \ .load()
-
-
Step 6: Define the streaming schema for the 3 data streams.
-
A schema can be defined using the
StructType
andStructField
classes, as well as related classes from thepyspark.sql.types
module.streaming_schema = StructType([ StructField("field1", StringType(), True), StructField("field2", IntegerType(), True), StructField("field3", TimestampType(), True), # Add more fields as needed ])
-
-
Step 7: Data cleaning. Performing necessary transformations of the dataframes as done in the Batch Processing: Databricks section.
-
Step 8: Write the streams to a Databricks
Delta Table
.-
Example: Use the
writestream
method to store the pinterest data stream to aDelta Table
.df.writeStream \ .format("delta") \ .outputMode("append") \ .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \ .table("<TABLE_NAME>")
-
You should be able to see the created table, its schema and the streaming data that has been stored inside it.
- Sample Data from the pinterest post stream.