-
Notifications
You must be signed in to change notification settings - Fork 0
/
preprocess_data.py
86 lines (66 loc) · 2.58 KB
/
preprocess_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import argparse
import os
import pickle
import pandas as pd
from sklearn.feature_extraction import DictVectorizer
def dump_pickle(obj, filename):
with open(filename, "wb") as f_out:
return pickle.dump(obj, f_out)
def read_dataframe(filename: str):
df = pd.read_parquet(filename)
df['duration'] = df.lpep_dropoff_datetime - df.lpep_pickup_datetime
df.duration = df.duration.apply(lambda td: td.total_seconds() / 60)
df = df[(df.duration >= 1) & (df.duration <= 60)]
categorical = ['PULocationID', 'DOLocationID']
df[categorical] = df[categorical].astype(str)
return df
def preprocess(df: pd.DataFrame, dv: DictVectorizer, fit_dv: bool = False):
df['PU_DO'] = df['PULocationID'] + '_' + df['DOLocationID']
categorical = ['PU_DO']
numerical = ['trip_distance']
dicts = df[categorical + numerical].to_dict(orient='records')
if fit_dv:
X = dv.fit_transform(dicts)
else:
X = dv.transform(dicts)
return X, dv
def run(raw_data_path: str, dest_path: str, dataset: str = "green"):
# load parquet files
df_train = read_dataframe(
os.path.join(raw_data_path, f"{dataset}_tripdata_2021-01.parquet")
)
df_valid = read_dataframe(
os.path.join(raw_data_path, f"{dataset}_tripdata_2021-02.parquet")
)
df_test = read_dataframe(
os.path.join(raw_data_path, f"{dataset}_tripdata_2021-03.parquet")
)
# extract the target
target = 'duration'
y_train = df_train[target].values
y_valid = df_valid[target].values
y_test = df_test[target].values
# fit the dictvectorizer and preprocess data
dv = DictVectorizer()
X_train, dv = preprocess(df_train, dv, fit_dv=True)
X_valid, _ = preprocess(df_valid, dv, fit_dv=False)
X_test, _ = preprocess(df_test, dv, fit_dv=False)
# create dest_path folder unless it already exists
os.makedirs(dest_path, exist_ok=True)
# save dictvectorizer and datasets
dump_pickle(dv, os.path.join(dest_path, "dv.pkl"))
dump_pickle((X_train, y_train), os.path.join(dest_path, "train.pkl"))
dump_pickle((X_valid, y_valid), os.path.join(dest_path, "valid.pkl"))
dump_pickle((X_test, y_test), os.path.join(dest_path, "test.pkl"))
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
"--raw_data_path",
help="the location where the raw NYC taxi trip data was saved"
)
parser.add_argument(
"--dest_path",
help="the location where the resulting files will be saved."
)
args = parser.parse_args()
run(args.raw_data_path, args.dest_path)