-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathnyc_taxi_dag.py
135 lines (118 loc) · 5.11 KB
/
nyc_taxi_dag.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
"""
This DAG is intended to demonstrate how to import Parquet files into CrateDB instance using Airflow.
This is performed using the NYC taxi dataset which is publicly available in their website
here https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page the data is also available
in their public S3 Bucket s3://nyc-tlc/trip data/ or through the CDN described in the website above.
A detailed tutorial is available at https://community.crate.io/t/tutorial-how-to-automate-the-import-of-parquet-files-using-airflow/1247
Prerequisites
-------------
The variables ORIGIN_PATH and DESTINATION_PATH were configured
in Airflow interface on Admin > Variables as ORIGIN_PATH and DESTINATION_PATH, respectively.
The credentials and bucket info, were also configured using the approach described above.
In the CrateDB schema "nyc_taxi", the tables "load_trips_staging" and "trips" need to be
present before running the DAG. You can retrieve the CREATE TABLE statements
from the file setup/taxi-schema.sql in this repository.
"""
import pendulum
from airflow.models import Variable
from airflow.decorators import task, dag
from airflow.models.baseoperator import chain
from airflow.operators.bash import BashOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.providers.amazon.aws.transfers.local_to_s3 import (
LocalFilesystemToS3Operator,
)
# The URL of the directory containing the Parquet files
ORIGIN_PATH = Variable.get("ORIGIN_PATH", "test-path")
# Any local directory to which the Parquet files are temporarily downloaded to, such as /tmp
DESTINATION_PATH = Variable.get("DESTINATION_PATH", "test-path")
# The name of an S3 bucket to which CSV files are temporarily uploaded to
S3_BUCKET = Variable.get("S3_BUCKET", "test-bucket")
# AWS Access Key ID
ACCESS_KEY_ID = Variable.get("ACCESS_KEY_ID", "access-key")
# AWS Secret Access Key
SECRET_ACCESS_KEY = Variable.get("SECRET_ACCESS_KEY", "secret-key")
# Append trailing slash if missing
DESTINATION_PATH = (
DESTINATION_PATH + "/" if not DESTINATION_PATH.endswith("/") else DESTINATION_PATH
)
# The configuration of the DAG was done based on the info shared by NYC TLC here: https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page
# The documentation mentioned that the Parquet files are released monthly since January 2009
@dag(
dag_id="nyc-taxi-parquet",
schedule="@monthly",
start_date=pendulum.datetime(2009, 3, 1, tz="UTC"),
catchup=True,
template_searchpath=["include"],
)
def taskflow():
@task
def format_file_name(ds=None):
# The files are released with 2 months of delay, therefore the .subtract(months=2)
timestamp = pendulum.parse(ds)
timestamp = timestamp.subtract(months=2)
date_formatted = timestamp.format("_YYYY-MM")
return f"yellow_tripdata{date_formatted}"
formatted_file_date = format_file_name()
process_parquet = BashOperator(
task_id="process_parquet",
bash_command="""
curl -o "${destination_path}${formatted_file_date}.parquet" "${origin_path}${formatted_file_date}.parquet" &&
parquet-tools csv "${destination_path}${formatted_file_date}.parquet" > "${destination_path}${formatted_file_date}.csv"
""",
env={
"origin_path": ORIGIN_PATH,
"destination_path": DESTINATION_PATH,
"formatted_file_date": formatted_file_date,
},
)
copy_csv_to_s3 = LocalFilesystemToS3Operator(
task_id="copy_csv_to_s3",
filename=f"{DESTINATION_PATH}{formatted_file_date}.csv",
dest_bucket=S3_BUCKET,
dest_key=f"{formatted_file_date}.csv",
aws_conn_id="s3_conn",
replace=True,
)
copy_csv_staging = SQLExecuteQueryOperator(
task_id="copy_csv_staging",
conn_id="cratedb_connection",
# pylint: disable=C0301
sql=f"""
COPY nyc_taxi.load_trips_staging
FROM 's3://{ACCESS_KEY_ID}:{SECRET_ACCESS_KEY}@{S3_BUCKET}/{formatted_file_date}.csv'
WITH (format = 'csv', empty_string_as_null = true)
RETURN SUMMARY;
""",
)
copy_staging_to_trips = SQLExecuteQueryOperator(
task_id="copy_staging_to_trips",
conn_id="cratedb_connection",
sql="taxi-insert.sql",
)
delete_staging = SQLExecuteQueryOperator(
task_id="delete_staging",
conn_id="cratedb_connection",
sql="DELETE FROM nyc_taxi.load_trips_staging;",
)
delete_local_parquet_csv = BashOperator(
task_id="delete_local_parquet_csv",
bash_command="""
rm "${destination_path}${formatted_file_date}.parquet";
rm "${destination_path}${formatted_file_date}.csv"
""",
env={
"destination_path": DESTINATION_PATH,
"formatted_file_date": formatted_file_date,
},
)
chain(
formatted_file_date,
process_parquet,
copy_csv_to_s3,
copy_csv_staging,
copy_staging_to_trips,
delete_staging,
delete_local_parquet_csv,
)
taskflow()