@@ -46,8 +46,8 @@ def pull_latest_from_table_or_query_ibis(
46
46
created_timestamp_column : Optional [str ],
47
47
start_date : datetime ,
48
48
end_date : datetime ,
49
- data_source_reader : Callable [[DataSource ], Table ],
50
- data_source_writer : Callable [[pyarrow .Table , DataSource ], None ],
49
+ data_source_reader : Callable [[DataSource , str ], Table ],
50
+ data_source_writer : Callable [[pyarrow .Table , DataSource , str ], None ],
51
51
staging_location : Optional [str ] = None ,
52
52
staging_location_endpoint_override : Optional [str ] = None ,
53
53
) -> RetrievalJob :
@@ -57,7 +57,7 @@ def pull_latest_from_table_or_query_ibis(
57
57
start_date = start_date .astimezone (tz = timezone .utc )
58
58
end_date = end_date .astimezone (tz = timezone .utc )
59
59
60
- table = data_source_reader (data_source )
60
+ table = data_source_reader (data_source , str ( config . repo_path ) )
61
61
62
62
table = table .select (* fields )
63
63
@@ -87,6 +87,7 @@ def pull_latest_from_table_or_query_ibis(
87
87
data_source_writer = data_source_writer ,
88
88
staging_location = staging_location ,
89
89
staging_location_endpoint_override = staging_location_endpoint_override ,
90
+ repo_path = str (config .repo_path ),
90
91
)
91
92
92
93
@@ -147,8 +148,8 @@ def get_historical_features_ibis(
147
148
entity_df : Union [pd .DataFrame , str ],
148
149
registry : BaseRegistry ,
149
150
project : str ,
150
- data_source_reader : Callable [[DataSource ], Table ],
151
- data_source_writer : Callable [[pyarrow .Table , DataSource ], None ],
151
+ data_source_reader : Callable [[DataSource , str ], Table ],
152
+ data_source_writer : Callable [[pyarrow .Table , DataSource , str ], None ],
152
153
full_feature_names : bool = False ,
153
154
staging_location : Optional [str ] = None ,
154
155
staging_location_endpoint_override : Optional [str ] = None ,
@@ -174,7 +175,9 @@ def get_historical_features_ibis(
174
175
def read_fv (
175
176
feature_view : FeatureView , feature_refs : List [str ], full_feature_names : bool
176
177
) -> Tuple :
177
- fv_table : Table = data_source_reader (feature_view .batch_source )
178
+ fv_table : Table = data_source_reader (
179
+ feature_view .batch_source , str (config .repo_path )
180
+ )
178
181
179
182
for old_name , new_name in feature_view .batch_source .field_mapping .items ():
180
183
if old_name in fv_table .columns :
@@ -247,6 +250,7 @@ def read_fv(
247
250
data_source_writer = data_source_writer ,
248
251
staging_location = staging_location ,
249
252
staging_location_endpoint_override = staging_location_endpoint_override ,
253
+ repo_path = str (config .repo_path ),
250
254
)
251
255
252
256
@@ -258,16 +262,16 @@ def pull_all_from_table_or_query_ibis(
258
262
timestamp_field : str ,
259
263
start_date : datetime ,
260
264
end_date : datetime ,
261
- data_source_reader : Callable [[DataSource ], Table ],
262
- data_source_writer : Callable [[pyarrow .Table , DataSource ], None ],
265
+ data_source_reader : Callable [[DataSource , str ], Table ],
266
+ data_source_writer : Callable [[pyarrow .Table , DataSource , str ], None ],
263
267
staging_location : Optional [str ] = None ,
264
268
staging_location_endpoint_override : Optional [str ] = None ,
265
269
) -> RetrievalJob :
266
270
fields = join_key_columns + feature_name_columns + [timestamp_field ]
267
271
start_date = start_date .astimezone (tz = timezone .utc )
268
272
end_date = end_date .astimezone (tz = timezone .utc )
269
273
270
- table = data_source_reader (data_source )
274
+ table = data_source_reader (data_source , str ( config . repo_path ) )
271
275
272
276
table = table .select (* fields )
273
277
@@ -290,6 +294,7 @@ def pull_all_from_table_or_query_ibis(
290
294
data_source_writer = data_source_writer ,
291
295
staging_location = staging_location ,
292
296
staging_location_endpoint_override = staging_location_endpoint_override ,
297
+ repo_path = str (config .repo_path ),
293
298
)
294
299
295
300
@@ -319,7 +324,7 @@ def offline_write_batch_ibis(
319
324
feature_view : FeatureView ,
320
325
table : pyarrow .Table ,
321
326
progress : Optional [Callable [[int ], Any ]],
322
- data_source_writer : Callable [[pyarrow .Table , DataSource ], None ],
327
+ data_source_writer : Callable [[pyarrow .Table , DataSource , str ], None ],
323
328
):
324
329
pa_schema , column_names = get_pyarrow_schema_from_batch_source (
325
330
config , feature_view .batch_source
@@ -330,7 +335,9 @@ def offline_write_batch_ibis(
330
335
f"The schema is expected to be { pa_schema } with the columns (in this exact order) to be { column_names } ."
331
336
)
332
337
333
- data_source_writer (ibis .memtable (table ), feature_view .batch_source )
338
+ data_source_writer (
339
+ ibis .memtable (table ), feature_view .batch_source , str (config .repo_path )
340
+ )
334
341
335
342
336
343
def deduplicate (
@@ -469,6 +476,7 @@ def __init__(
469
476
data_source_writer ,
470
477
staging_location ,
471
478
staging_location_endpoint_override ,
479
+ repo_path ,
472
480
) -> None :
473
481
super ().__init__ ()
474
482
self .table = table
@@ -480,6 +488,7 @@ def __init__(
480
488
self .data_source_writer = data_source_writer
481
489
self .staging_location = staging_location
482
490
self .staging_location_endpoint_override = staging_location_endpoint_override
491
+ self .repo_path = repo_path
483
492
484
493
def _to_df_internal (self , timeout : Optional [int ] = None ) -> pd .DataFrame :
485
494
return self .table .execute ()
@@ -502,7 +511,11 @@ def persist(
502
511
timeout : Optional [int ] = None ,
503
512
):
504
513
self .data_source_writer (
505
- self .table , storage .to_data_source (), "overwrite" , allow_overwrite
514
+ self .table ,
515
+ storage .to_data_source (),
516
+ self .repo_path ,
517
+ "overwrite" ,
518
+ allow_overwrite ,
506
519
)
507
520
508
521
@property
0 commit comments