forked from feast-dev/feast
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathfile.py
287 lines (244 loc) · 12.6 KB
/
file.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
from datetime import datetime
from typing import Callable, List, Optional, Union
import pandas as pd
import pyarrow
import pytz
from pydantic.typing import Literal
from feast import FileSource
from feast.data_source import DataSource
from feast.errors import FeastJoinKeysDuringMaterialization
from feast.feature_view import FeatureView
from feast.infra.offline_stores.offline_store import OfflineStore, RetrievalJob
from feast.infra.offline_stores.offline_utils import (
DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL,
)
from feast.infra.provider import (
_get_requested_feature_views_to_features_dict,
_run_field_mapping,
)
from feast.registry import Registry
from feast.repo_config import FeastConfigBaseModel, RepoConfig
class FileOfflineStoreConfig(FeastConfigBaseModel):
""" Offline store config for local (file-based) store """
type: Literal["file"] = "file"
""" Offline store type selector"""
class FileRetrievalJob(RetrievalJob):
def __init__(self, evaluation_function: Callable):
"""Initialize a lazy historical retrieval job"""
# The evaluation function executes a stored procedure to compute a historical retrieval.
self.evaluation_function = evaluation_function
def to_df(self):
# Only execute the evaluation function to build the final historical retrieval dataframe at the last moment.
df = self.evaluation_function()
return df
def to_arrow(self):
# Only execute the evaluation function to build the final historical retrieval dataframe at the last moment.
df = self.evaluation_function()
return pyarrow.Table.from_pandas(df)
class FileOfflineStore(OfflineStore):
@staticmethod
def get_historical_features(
config: RepoConfig,
feature_views: List[FeatureView],
feature_refs: List[str],
entity_df: Union[pd.DataFrame, str],
registry: Registry,
project: str,
full_feature_names: bool = False,
) -> RetrievalJob:
if not isinstance(entity_df, pd.DataFrame):
raise ValueError(
f"Please provide an entity_df of type {type(pd.DataFrame)} instead of type {type(entity_df)}"
)
entity_df_event_timestamp_col = DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL # local modifiable copy of global variable
if entity_df_event_timestamp_col not in entity_df.columns:
datetime_columns = entity_df.select_dtypes(
include=["datetime", "datetimetz"]
).columns
if len(datetime_columns) == 1:
print(
f"Using {datetime_columns[0]} as the event timestamp. To specify a column explicitly, please name it {DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL}."
)
entity_df_event_timestamp_col = datetime_columns[0]
else:
raise ValueError(
f"Please provide an entity_df with a column named {DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL} representing the time of events."
)
feature_views_to_features = _get_requested_feature_views_to_features_dict(
feature_refs, feature_views
)
# Create lazy function that is only called from the RetrievalJob object
def evaluate_historical_retrieval():
# Make sure all event timestamp fields are tz-aware. We default tz-naive fields to UTC
entity_df[entity_df_event_timestamp_col] = entity_df[
entity_df_event_timestamp_col
].apply(lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc))
# Create a copy of entity_df to prevent modifying the original
entity_df_with_features = entity_df.copy()
# Convert event timestamp column to datetime and normalize time zone to UTC
# This is necessary to avoid issues with pd.merge_asof
entity_df_with_features[entity_df_event_timestamp_col] = pd.to_datetime(
entity_df_with_features[entity_df_event_timestamp_col], utc=True
)
# Sort event timestamp values
entity_df_with_features = entity_df_with_features.sort_values(
entity_df_event_timestamp_col
)
# Load feature view data from sources and join them incrementally
for feature_view, features in feature_views_to_features.items():
event_timestamp_column = (
feature_view.batch_source.event_timestamp_column
)
created_timestamp_column = (
feature_view.batch_source.created_timestamp_column
)
# Read offline parquet data in pyarrow format.
filesystem, path = FileSource.create_filesystem_and_path(
feature_view.batch_source.path,
feature_view.batch_source.file_options.s3_endpoint_override,
)
table = pyarrow.parquet.read_table(path, filesystem=filesystem)
# Rename columns by the field mapping dictionary if it exists
if feature_view.batch_source.field_mapping is not None:
table = _run_field_mapping(
table, feature_view.batch_source.field_mapping
)
# Convert pyarrow table to pandas dataframe. Note, if the underlying data has missing values,
# pandas will convert those values to np.nan if the dtypes are numerical (floats, ints, etc.) or boolean
# If the dtype is 'object', then missing values are inferred as python `None`s.
# More details at:
# https://pandas.pydata.org/pandas-docs/stable/user_guide/missing_data.html#values-considered-missing
df_to_join = table.to_pandas()
# Make sure all timestamp fields are tz-aware. We default tz-naive fields to UTC
df_to_join[event_timestamp_column] = df_to_join[
event_timestamp_column
].apply(
lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc)
)
if created_timestamp_column:
df_to_join[created_timestamp_column] = df_to_join[
created_timestamp_column
].apply(
lambda x: x
if x.tzinfo is not None
else x.replace(tzinfo=pytz.utc)
)
# Sort dataframe by the event timestamp column
df_to_join = df_to_join.sort_values(event_timestamp_column)
# Build a list of all the features we should select from this source
feature_names = []
for feature in features:
# Modify the separator for feature refs in column names to double underscore. We are using
# double underscore as separator for consistency with other databases like BigQuery,
# where there are very few characters available for use as separators
if full_feature_names:
formatted_feature_name = f"{feature_view.name}__{feature}"
else:
formatted_feature_name = feature
# Add the feature name to the list of columns
feature_names.append(formatted_feature_name)
# Ensure that the source dataframe feature column includes the feature view name as a prefix
df_to_join.rename(
columns={feature: formatted_feature_name}, inplace=True,
)
# Build a list of entity columns to join on (from the right table)
join_keys = []
for entity_name in feature_view.entities:
entity = registry.get_entity(entity_name, project)
join_keys.append(entity.join_key)
right_entity_columns = join_keys
right_entity_key_columns = [
event_timestamp_column
] + right_entity_columns
# Remove all duplicate entity keys (using created timestamp)
right_entity_key_sort_columns = right_entity_key_columns
if created_timestamp_column:
# If created_timestamp is available, use it to dedupe deterministically
right_entity_key_sort_columns = right_entity_key_sort_columns + [
created_timestamp_column
]
df_to_join.sort_values(by=right_entity_key_sort_columns, inplace=True)
df_to_join.drop_duplicates(
right_entity_key_sort_columns,
keep="last",
ignore_index=True,
inplace=True,
)
# Select only the columns we need to join from the feature dataframe
df_to_join = df_to_join[right_entity_key_columns + feature_names]
# Do point in-time-join between entity_df and feature dataframe
entity_df_with_features = pd.merge_asof(
entity_df_with_features,
df_to_join,
left_on=entity_df_event_timestamp_col,
right_on=event_timestamp_column,
by=right_entity_columns,
tolerance=feature_view.ttl,
)
# Remove right (feature table/view) event_timestamp column.
if event_timestamp_column != entity_df_event_timestamp_col:
entity_df_with_features.drop(
columns=[event_timestamp_column], inplace=True
)
# Ensure that we delete dataframes to free up memory
del df_to_join
# Move "event_timestamp" column to front
current_cols = entity_df_with_features.columns.tolist()
current_cols.remove(entity_df_event_timestamp_col)
entity_df_with_features = entity_df_with_features[
[entity_df_event_timestamp_col] + current_cols
]
return entity_df_with_features
job = FileRetrievalJob(evaluation_function=evaluate_historical_retrieval)
return job
@staticmethod
def pull_latest_from_table_or_query(
config: RepoConfig,
data_source: DataSource,
join_key_columns: List[str],
feature_name_columns: List[str],
event_timestamp_column: str,
created_timestamp_column: Optional[str],
start_date: datetime,
end_date: datetime,
) -> RetrievalJob:
assert isinstance(data_source, FileSource)
# Create lazy function that is only called from the RetrievalJob object
def evaluate_offline_job():
filesystem, path = FileSource.create_filesystem_and_path(
data_source.path, data_source.file_options.s3_endpoint_override
)
source_df = pd.read_parquet(path, filesystem=filesystem)
# Make sure all timestamp fields are tz-aware. We default tz-naive fields to UTC
source_df[event_timestamp_column] = source_df[event_timestamp_column].apply(
lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc)
)
if created_timestamp_column:
source_df[created_timestamp_column] = source_df[
created_timestamp_column
].apply(
lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc)
)
source_columns = set(source_df.columns)
if not set(join_key_columns).issubset(source_columns):
raise FeastJoinKeysDuringMaterialization(
data_source.path, set(join_key_columns), source_columns
)
ts_columns = (
[event_timestamp_column, created_timestamp_column]
if created_timestamp_column
else [event_timestamp_column]
)
source_df.sort_values(by=ts_columns, inplace=True)
filtered_df = source_df[
(source_df[event_timestamp_column] >= start_date)
& (source_df[event_timestamp_column] < end_date)
]
last_values_df = filtered_df.drop_duplicates(
join_key_columns, keep="last", ignore_index=True
)
columns_to_extract = set(
join_key_columns + feature_name_columns + ts_columns
)
return last_values_df[columns_to_extract]
return FileRetrievalJob(evaluation_function=evaluate_offline_job)