Sr. No | Contents |
---|---|
1.0 | Problem Statement Overview |
1.1 | Description of the Input dataset |
2.0 | Solution Description |
3.0 | System Design |
5.0 | Implementation |
5.1 | Prerequisite Environment |
5.2 | System Modules |
5.3 | GROUPBY Logic (Mapreduce & Spark) |
5.4 | JOIN Logic (Mapreduce & Spark) |
6.0 | User Manual |
The aim of the project is to develop a REST-based service for the given dataset. This service will accept a query in the form of one of the two templates provided below.
The two standard templates are:
SELECT*FROM<TABLE1>
INNERJOIN<TABLE2>
ON<CONDITION1>
WHERE<CONDITION2>
Group By Template
SELECT<COLUMNS>,FUNC(COLUMN1)
FROM<TABLE>
GROUPBY<COLUMNS>
HAVING FUNC(COLUMN1) > X
Here FUNC can be COUNT, MAX, MIN, SUM
The service translates the query into MapReduce jobs and also into Spark job. It should run these two jobs separately and return the following in a JSON object:
- Time taken for Hadoop MapReduce execution.
- Time taken for Spark execution.
- Input and output of map and reduce tasks in a chain as they are applied on the data.
- Spark transformations and actions in the order they are applied. Result of the query.
“Users” file stores the demographic information about the users. The “zipcodes” file contains the city and state information for the zipcodes. Zipcode is referenced by users file. “Movies” file stores the information about movies. The last 19 fields in movies table are the genres, a 1 indicates the movie is of that genre, a 0 indicates it is not; movies can be in several genres. “Rating” file contains the ratings given by users for movies. Userid and movieid refer to the users and movies files respectively.
Users: userid | age | gender | occupation | zipcode
Zipcodes: zipcode | zipcodetype | city | state
Movies table: movieid | title | releasedate | unknown | Action | Adventure | Animation |Children | Comedy | Crime | Documentary | Drama | Fantasy |Film_Noir | Horror | Musical | Mystery | Romance | Sci_Fi |Thriller | War | Western |
Rating: userid | movieid | rating | timestamp
Refer to section 3.0 System Design along with this description.
- User submits query.
- Core sends the string query to the Parser.
- The Parser returns query object to the Core.
- Core sends the parsed object to the Driver.
- Driver sends the parsed query object to the MR Mapper and receives the TSV.
- Driver also sneds the parsed query object to the Spark Mapper and receives the RDD.
- TSV & RDD from Driver is sent to MR Result and Spark Result respectively.
- JSON String from MR Result and Spark Result is passed to the Driver.
- Driver passes the JSON String to the Core.
- Core sends response to client as a JSON string.
- LINUX MACHINE
- JAVA 1.8_221
- Hadoop 3.1.2 - HDFS & MapReduce
- Spark 2.4.3
- Python 3.6 & later
- OpenSSH
-
Core: Handles REST API request from user and sends to server It’s the only module that talks to the parser After receiving the parsed query from the parser it delegates the further processing to the driver. Gives the final output to the client in the form of JSON string.
-
Parser: Parses the input query and returns a JSON object
-
Driver: Driver takes over the control of the process after receiving the JSON object. It manages the MR Mapper, Spark Mapper and their results.
-
MR Mapper: Takes each row input from the file through stdin (hadoop streaming) , identifies the key, value pair and pipes it to the reducer. Reducer does the JOIN/GROUPBY operation, filters according to WHERE condition and outputs the result.
-
Spark Mapper: Reads the file from HDFS and converts it into dataframe. Applies transformations on the dataframe like JOIN, FILTER, GROUPBY to get the desired functionality.
-
MR Result: Reads the output file from the HDFS and converts it to JSON string and outputs the result to the driver.
-
Spark Result: Takes the transformed dataframe and converts it record by record into JSON string.
The GROUPBY mapper takes the comma separated values from the input file line by line through stdin. It also gets the parsed query object from the Driver. Based on the query it produces the key value pairs. Here the key is the SELECT columns (which may be one or more) and the value is the aggregate column. It pipes the key-value pair to the reducer.
The reducer takes the key-value pairs generated by the mapper and groups the pairs having the same keys and filters the data according to the having condition specified in the query. It outputs the data in the form of tab separated values.
The GROUPBY Spark mapper takes the comma separated values from the input file line by line through stdin. It also gets the parsed query object from the Driver. Based on the condition the spark mapper starts a spark job and read the csv file.
As per the "aggregate" function and the "where" condition the spark mapper generates the output data.
The JOIN mapper takes the comma separated values from the 2 input files line by line through stdin. It also gets the parsed query object from the Driver. Based on the query it produces the key value pairs. It appends key “1” if the data is coming from table 1 otherwise appends 2 otherwise. It pipes the key-value pair to the reducer.
The reducer takes the key-value pairs generated by the mapper and checks for the same values in join column indexes. If the values are same then it checks for the ‘where’ clause and if condition is satisfied then outputs the row values.
The JOIN Spark mapper takes the comma separated values from the 2 input files line by line through stdin. It also gets the parsed query object from the Driver.
Based on the query it distinguishes from_table and the join_table and retrieves data as required. It then performs join operation on those retrieved tables on the provided "KEY" or the "JOIN COLUMN" and based on the "where" clause provides the required rows.
This is built and tested on Hadoop 3.1.2 and Spark 2.4.3 which need Java 1.8.0_221 as dependency.
This is built and tested on Python 3.6 and later.
This project is only supported on Linux environment until now. (Windows support is on the way)
After all the pre-requisites mentioned above is installed on the machine then run the following commands on the terminal of your choice.
- Clone this Repository
git clone https://github.com/AshirwadPradhan/yasmss.git
cd
into the directoryyasmss
- Then run
pip3 install -r requirements.txt
- Then setup the following config files inside the project:
4.1
cd yasmss/
Openconfig.yaml
in your favoraite code editor and modify the following config without the{}
pathconfig: host_ip_port : {insert path here For ex - https://localhost:9000} hadoop_streaming_jar: {insert path here} input_dir: {insert path here} parent_output_dir: {insert path here} child_output_dir: {insert path here}
4.2
cd
intoschema
directory
4.3 Openschemas.yaml
in your favoraite code editor. This file contains schema of all the tables to be used. Add the desired table schema in the following format:users: userid : IntegerType age: IntegerType gender: StringType occupation: StringType zipcode: StringType
- Now
cd ..
into the parent directoryyasmss
- Run
python core.py
and it will deploy the application on the localhost. - Go the address prompted by the flask using your browser and submit your query to receive the JSON reponse.
- Clone the repo
- Make sure the repo is working locally
- Ensure you are on the master branch
- Make a branch of your own
- Make changes in the the local copy and write good commit messages
- Write test for your changes
- Ensure that the test cases are passing
- Push the changes to the origin remote and create a PR
- Go to the fork of your project in the browser
- Compare and pull request your branch
- Ensure no merge conflicts are there
- Ensure the base fork: is the correct repo and branch
- Most Important : Add a description and check the diff
- Now click on Create pull request