Processing .json files to build a parquet table with Spark.
Since our platform growth exponentially in the past few weeks we are not able to store songplays
table and its sources on a data warehouse. So our engineering team was asked to develop a pipeline to generate and populate the songplays
table and its sources in a datalake in aS3
bucket.
The data we will be working on is stored in two S3
buckets.
- Log data: contains users events on platform and have the following columns:
artist, auth, firstName, gender, itemInSession, lastName, length, level location, method, page, registration, sessionId, song, status, ts, userAgent
The song dataset on the other hand is a json
file with the structure presented in the example bellow:
{
"num_songs": 1,
"artist_id": "ARJIE2Y1187B994AB7",
"artist_latitude": null,
"artist_longitude": null,
"artist_location": "",
"artist_name": "Line Renaud",
"song_id": "SOUPIRU12A6D4FA1E1",
"title": "Der Kleine Dompfaff",
"duration": 152.92036,
"year": 0
}
To execute this project you'll need a dl.cfg
file containing an AWS access and secret ID for an IAM User role with permissions of write and read an S3
bucket. The content of this file must be in the following format:
[AWS]
AWS_ACCESS_KEY_ID=A******************5
AWS_SECRET_ACCESS_KEY=E**************************************S
You will need to install Python 3.6.3
with PySpark 2.4.3
installed. After installing Python
, you can install PySpark
by running the following command:
pip install pyspark==2.4.3
- In order to run
PySpark 2.4.3
, you will need to installJava JDK 8
.- We strongly recomment to use a specific virtual environment in Python to execute this project. Click here to know more about virtual environments in Python or here to know more about virtual environments in Anaconda.
To execute this project you only need to download this repository, activate the virtual environment with Python 3.6.3
and PySpark 2.4.3
in it and run the following command in the root of this repository:
python ./src/etl.py
- You can run this code in test mode without accessing the
S3
bucket. To do this you just have to uncomment the lines280
and281
and comment the lines277
and278
inetl.py
.- The test files are in
input
andoutput
subirectories in this repository.
After a few seconds you should see the following lines in your command line:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
YY/MM/DD HH:MM:SS WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
[Stage 2:===============================> (116 + 8) / 200]YY/MM/DD HH:MM:SS WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
If the file was successfully finished you should see a sample of the final table:
############
Songplays table:
+------------+--------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+
|songplays_id| start_time|user_id|level| song_id| artist_id|session_id| location| user_agent|
+------------+--------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+
| 0|2018-11-21 19:56:...| 15| paid|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4| 818|Chicago-Napervill...|"Mozilla/5.0 (X11...|
+------------+--------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+
############
- PySpark - Cluster computing system.
- @kellermann92 - Idea & Initial work