-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathspark_sql_bq_country.py
86 lines (62 loc) · 2.61 KB
/
spark_sql_bq_country.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import argparse
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, regexp_extract
# pylint: disable=R0801
# Define command-line arguments
parser = argparse.ArgumentParser(description="Generate a hotel reviews report.")
parser.add_argument("--input_path", required=True)
parser.add_argument("--country", type=str, help="Specify a country to filter the report.")
parser.add_argument("--output", type=str, help="Report file.", required=True)
# Parse the command-line arguments
args = parser.parse_args()
# Initialize a Spark session
spark = SparkSession.builder.appName("test").getOrCreate()
# Set a temporary GCS bucket for intermediate storage (if applicable)
spark.conf.set("temporaryGcsBucket", "dataproc-temp-europe-west6-509013154381-b69h2glg")
# Get the input path from command-line arguments
input_path = args.input_path
# Read Parquet data into a DataFrame
df = spark.read.parquet(f"{input_path}")
# # Data Transformation
# 1) Transform 'United Kingdom' to 'UK' in the 'Hotel_Address' column
df = df.withColumn("Hotel_Address", expr("regexp_replace(Hotel_Address, 'United Kingdom', 'UK')"))
# 2) Get the last word of each row and create a new 'Hotel_Country' column
df = df.withColumn("Hotel_Country", regexp_extract(df["Hotel_Address"], r"\b(\w+)$", 1))
# Select relevant columns for the report
df_selected = df.select(
"Hotel_Address",
"Hotel_Country",
"Hotel_Name",
"Review_Date",
"Average_Score",
"Reviewer_Nationality",
"Reviewer_Score",
)
# Register the DataFrame as a temporary SQL table
df_selected.createOrReplaceTempView("country_hotel_reviews")
# Get the country argument value
selected_country = args.country
# Define the Spark SQL query with optional country filtering
report_query = """
SELECT
Hotel_Country,
Hotel_Name,
FORMAT_NUMBER(AVG(Reviewer_Score), 2) AS Avg_Reviewer_Score
FROM
country_hotel_reviews
{country_filter}
GROUP BY
Hotel_Country, Hotel_Name
ORDER BY
Hotel_Country, Avg_Reviewer_Score DESC
""".format(
country_filter=f"WHERE Hotel_Country = '{selected_country}'" if selected_country else ""
)
# Execute the query to generate the report DataFrame
report_df = spark.sql(report_query)
# Get the output path from command-line arguments
output = args.output
# Write the report DataFrame to BigQuery
report_df.write.format("bigquery").option("table", output).mode("overwrite").save()
# Start an interactive session to check results
spark.sql(report_query).show()