-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsql_q1.py
34 lines (27 loc) · 995 Bytes
/
sql_q1.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("q1-sql").getOrCreate()
def get_year(timestamp):
if timestamp == None:
return None
else:
return timestamp.year
movies = spark.read.format('csv'). \
options(header = 'false' , inferSchema='true'). \
load('hdfs://master:9000/data/movies.csv')
movies.registerTempTable('movies')
#spark.udf.register('year' , get_year)
sqlString = \
"SELECT " + \
"m._c1 as Title, 100*(m._c6-m._c5)/m._c5 as Earnings, EXTRACT(YEAR FROM m._c3) as Year " + \
"FROM movies as m "+\
"INNER JOIN " +\
"( " + \
"SELECT max(100*(_c6-_c5)/_c5) as Earn, EXTRACT(YEAR FROM _c3) as Year "+\
"from movies " + \
"WHERE _c3 is not null and _c6 is not null and _c5 is not null and _c5 != 0 "+\
"group by EXTRACT(YEAR FROM _c3) " +\
") as MaxProfit "+ \
"on MaxProfit.Earn = 100*(m._c6-m._c5)/m._c5 and MaxProfit.Year = EXTRACT(YEAR FROM m._c3) " +\
"order by EXTRACT(YEAR FROM m._c3) DESC"
res = spark.sql(sqlString)
res.show()