-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathanalyse.py
179 lines (140 loc) · 7.72 KB
/
analyse.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
import os.path
import datetime as dt
from typing import List
import logging
import pandas as pd
import numpy as np
logging.getLogger().setLevel(logging.INFO)
class Journey:
def __init__(self, start_time, end_time, origin, destination, charge, note):
self.start_time = start_time
self.end_time = end_time
self.origin = origin
self.destination = destination
self.charge = charge
self.note = note
if self.end_time:
self.journey_time = self.end_time - self.start_time
else:
self.journey_time = None
def __repr__(self):
return 'Journey(start_time={!r}, end_time={!r}, origin={!r}, destination={!r}, journey_time={!r}, ' \
'charge={!r}, note={!r})'.format(self.start_time, self.end_time, self.origin, self.destination,
self.journey_time, self.charge, self.note)
def __lt__(self, other):
if not (self.journey_time or other.journey_time):
return False
return self.journey_time < other.journey_time
def __gt__(self, other):
if not (self.journey_time or other.journey_time):
return False
return self.journey_time > other.journey_time
class JourneyHistory:
def __init__(self, history_files: List[str] = None, history_dir: str = None):
self.raw_dfs = {}
if history_dir is not None:
if history_files is not None:
raise ValueError('Only provide either the list of journey history files or the directory containing the'
' history files, but not both.')
assert os.path.exists(history_dir), 'Journey history directory does not exist: {}'.format(history_dir)
self.df = self.load_history_from_dir(history_dir)
else:
assert isinstance(history_files, list), '`history_files` must be a list of filepaths'
self.df = self.load_history_from_file_list(history_files)
def __len__(self):
""" Number of total rows of the dataframe """
if self.df is None:
return 0
return len(self.df)
def __repr__(self):
return 'JourneyHistory(journeys={})'.format(len(self))
def __getitem__(self, item):
if item >= len(self):
raise IndexError('Index out of range of number of DataFrame rows')
return self.df.iloc[item]
def load_history_from_dir(self, history_dir: str) -> pd.DataFrame:
# List of filepaths for all CSVs in `history_dir`
csv_filepaths = [os.path.join(history_dir, f) for f in os.listdir(history_dir) if f.endswith('.csv')]
return self.load_history_from_file_list(csv_filepaths)
def load_history_from_file_list(self, history_files: List[str]) -> pd.DataFrame:
""" For a given list of filename, load the CSVs into one dataframe.
Columns: ['Start Time', 'End Time', 'Duration', 'From', 'To', 'Bus Route', 'Charge', 'Note']
"""
individual_history_dfs = []
# Use to validate CSV file as a journey history file
expected_columns = ['Date', 'Start Time', 'End Time', 'Journey/Action', 'Charge', 'Credit', 'Balance', 'Note']
for csv_file in history_files:
df = pd.read_csv(csv_file)
if df.columns.tolist() == expected_columns: # having the correct headers is the condition for a valid file
self.raw_dfs[csv_file] = df
individual_history_dfs.append(df)
if len(individual_history_dfs) == 0:
logging.info('No valid CSV files')
return pd.DataFrame()
# Join all the individual dfs into one big df
combined_df = pd.concat(individual_history_dfs)
return self._clean_raw_df(combined_df)
def _clean_raw_df(self, combined_df: pd.DataFrame) -> pd.DataFrame:
df = combined_df
# Initialise empty `Bus Journeys` columns that will be filled
df['Bus Route'] = np.nan
df = df.reset_index().drop('index', axis=1)
# Processing of dates and times (mainly combining)
df['Start Time'] = pd.to_datetime(df['Date'] + ' ' + df['Start Time'])
df['End Time'] = pd.to_datetime(df['Date'] + ' ' + df['End Time'])
# Add 1 day to journeys whose end times go into the next day
df.loc[df['End Time'] < df['Start Time'], 'End Time'] += dt.timedelta(days=1)
# Calculate durations
df['Duration'] = df['End Time'] - df['Start Time']
# Get the origin and destination columns
df['From'] = df['Journey/Action'].str.split(' to ').str[0]
df['To'] = df['Journey/Action'].str.split(' to ').str[1]
# Filter out unwanted rows
# todo - find better way of chaining these ORs
df = df[~(df['To'].astype(str).str.contains("No touch-out") |
df['Journey/Action'].str.contains('Oyster helpline refund') |
df['Journey/Action'].str.contains('Auto top-up') |
df['Journey/Action'].str.contains('Topped-up on touch in'))]
# Bus journeys
bus_journeys = df.loc[df['Journey/Action'].str.contains('Bus journey')]
bus_journeys['Bus Route'] = bus_journeys['Journey/Action'].str.extract(r'(\w\d+)')
bus_journeys['Journey/Action'] = np.nan
bus_journeys['From'] = np.nan
# Merging the processed dataframe subset for bus journeys back into the main dataframe
df.loc[bus_journeys.index] = bus_journeys
final_columns = ['Start Time', 'End Time', 'Duration', 'From', 'To', 'Bus Route', 'Charge', 'Note']
df = df[final_columns].sort_values('Start Time').reset_index().drop('index', axis=1)
self.df = df
return self.df
@staticmethod
def _df_row_to_journey(row: pd.Series) -> Journey:
start_time = row['Start Time'].to_pydatetime() if not pd.isnull(row['Start Time']) else None
end_time = row['End Time'].to_pydatetime() if not pd.isnull(row['End Time']) else None
origin = row['From'] if not pd.isnull(row['From']) else None
destination = row['To'] if not pd.isnull(row['To']) else None
charge = row['Charge'] if not pd.isnull(row['Charge']) else None
note = row['Note'] if not pd.isnull(row['Note']) else None
return Journey(start_time, end_time, origin, destination, charge, note)
def _create_journeys(self) -> list:
journeys = [self._df_row_to_journey(row) for _, row in self.df.iterrows()]
return journeys
def get_summary_stats(self) -> dict:
total_journey_time = self.df['Duration'].sum()
total_fare_expense = self.df['Charge'].sum()
return {'total_journey_time': total_journey_time,
'total_fare_expense': total_fare_expense}
def get_top_origin_stations(self, n=5) -> pd.Series:
return self._get_top_stations_from_series(self.df['From'], n)
def get_top_destination_stations(self, n=5) -> pd.Series:
return self._get_top_stations_from_series(self.df['To'], n)
def _get_top_stations_from_series(self, stations: pd.Series, n=5):
top_frequencies = stations.value_counts().head(n).values
top_origin_stations_df = stations.value_counts().to_frame('freq')
top_frequencies = top_origin_stations_df.loc[top_origin_stations_df['freq'].isin(top_frequencies)]
return top_frequencies.sort_values(by='freq', ascending=False)
def get_top_stations(self, n=5) -> pd.Series:
return pd.concat([self.df['From'], self.df['To']]).value_counts().sort_values(ascending=False).head(n)
def get_longest_journeys(self, n=5) -> pd.Series:
return self.df.sort_values('Duration', ascending=False).head(n)
def get_most_common_journeys(self, n=5) -> pd.Series:
return self.df.groupby(['From', 'To']).size().sort_values(ascending=False).head(n)