Skip to content

Commit 01a98f0

Browse files
fix: Support param timeout when persisting (feast-dev#3593)
* fix: Support param timeout when persisting Signed-off-by: Hai Nguyen <quanghai.ng1512@gmail.com> * fix: Revert default timeout value in `to_bigquery` Signed-off-by: Hai Nguyen <quanghai.ng1512@gmail.com> --------- Signed-off-by: Hai Nguyen <quanghai.ng1512@gmail.com>
1 parent 7e77382 commit 01a98f0

File tree

11 files changed

+70
-14
lines changed

11 files changed

+70
-14
lines changed

sdk/python/feast/infra/offline_stores/bigquery.py

+10-4
Original file line numberDiff line numberDiff line change
@@ -456,8 +456,8 @@ def to_sql(self) -> str:
456456
def to_bigquery(
457457
self,
458458
job_config: Optional[bigquery.QueryJobConfig] = None,
459-
timeout: int = 1800,
460-
retry_cadence: int = 10,
459+
timeout: Optional[int] = 1800,
460+
retry_cadence: Optional[int] = 10,
461461
) -> str:
462462
"""
463463
Synchronously executes the underlying query and exports the result to a BigQuery table. The
@@ -530,11 +530,17 @@ def _execute_query(
530530
block_until_done(client=self.client, bq_job=bq_job, timeout=timeout or 1800)
531531
return bq_job
532532

533-
def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False):
533+
def persist(
534+
self,
535+
storage: SavedDatasetStorage,
536+
allow_overwrite: Optional[bool] = False,
537+
timeout: Optional[int] = None,
538+
):
534539
assert isinstance(storage, SavedDatasetBigQueryStorage)
535540

536541
self.to_bigquery(
537-
bigquery.QueryJobConfig(destination=storage.bigquery_options.table)
542+
bigquery.QueryJobConfig(destination=storage.bigquery_options.table),
543+
timeout=timeout,
538544
)
539545

540546
@property

sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -412,7 +412,12 @@ def _to_arrow_internal(self, timeout: Optional[int] = None) -> pa.Table:
412412
def metadata(self) -> Optional[RetrievalMetadata]:
413413
return self._metadata
414414

415-
def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False):
415+
def persist(
416+
self,
417+
storage: SavedDatasetStorage,
418+
allow_overwrite: Optional[bool] = False,
419+
timeout: Optional[int] = None,
420+
):
416421
assert isinstance(storage, SavedDatasetAthenaStorage)
417422
self.to_athena(table_name=storage.athena_options.table)
418423

sdk/python/feast/infra/offline_stores/contrib/mssql_offline_store/mssql.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,12 @@ def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table:
356356

357357
## Implements persist in Feast 0.18 - This persists to filestorage
358358
## ToDo: Persist to Azure Storage
359-
def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False):
359+
def persist(
360+
self,
361+
storage: SavedDatasetStorage,
362+
allow_overwrite: Optional[bool] = False,
363+
timeout: Optional[int] = None,
364+
):
360365
assert isinstance(storage, SavedDatasetFileStorage)
361366

362367
filesystem, path = FileSource.create_filesystem_and_path(

sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,12 @@ def _to_arrow_internal(self, timeout: Optional[int] = None) -> pa.Table:
302302
def metadata(self) -> Optional[RetrievalMetadata]:
303303
return self._metadata
304304

305-
def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False):
305+
def persist(
306+
self,
307+
storage: SavedDatasetStorage,
308+
allow_overwrite: Optional[bool] = False,
309+
timeout: Optional[int] = None,
310+
):
306311
assert isinstance(storage, SavedDatasetPostgreSQLStorage)
307312

308313
df_to_postgres_table(

sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,12 @@ def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table:
344344
"""Return dataset as pyarrow Table synchronously"""
345345
return pyarrow.Table.from_pandas(self._to_df_internal(timeout=timeout))
346346

347-
def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False):
347+
def persist(
348+
self,
349+
storage: SavedDatasetStorage,
350+
allow_overwrite: Optional[bool] = False,
351+
timeout: Optional[int] = None,
352+
):
348353
"""
349354
Run the retrieval and persist the results in the same offline store used for read.
350355
Please note the persisting is done only within the scope of the spark session.

sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,12 @@ def to_trino(
126126
self._client.execute_query(query_text=query)
127127
return destination_table
128128

129-
def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False):
129+
def persist(
130+
self,
131+
storage: SavedDatasetStorage,
132+
allow_overwrite: Optional[bool] = False,
133+
timeout: Optional[int] = None,
134+
):
130135
"""
131136
Run the retrieval and persist the results in the same offline store used for read.
132137
"""

sdk/python/feast/infra/offline_stores/file.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,12 @@ def _to_arrow_internal(self, timeout: Optional[int] = None):
8888
df = self.evaluation_function().compute()
8989
return pyarrow.Table.from_pandas(df)
9090

91-
def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False):
91+
def persist(
92+
self,
93+
storage: SavedDatasetStorage,
94+
allow_overwrite: Optional[bool] = False,
95+
timeout: Optional[int] = None,
96+
):
9297
assert isinstance(storage, SavedDatasetFileStorage)
9398

9499
# Check if the specified location already exists.

sdk/python/feast/infra/offline_stores/offline_store.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,12 @@ def on_demand_feature_views(self) -> List[OnDemandFeatureView]:
189189
pass
190190

191191
@abstractmethod
192-
def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False):
192+
def persist(
193+
self,
194+
storage: SavedDatasetStorage,
195+
allow_overwrite: bool = False,
196+
timeout: Optional[int] = None,
197+
):
193198
"""
194199
Synchronously executes the underlying query and persists the result in the same offline store
195200
at the specified destination.

sdk/python/feast/infra/offline_stores/redshift.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -476,7 +476,12 @@ def to_redshift(self, table_name: str) -> None:
476476
query,
477477
)
478478

479-
def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False):
479+
def persist(
480+
self,
481+
storage: SavedDatasetStorage,
482+
allow_overwrite: Optional[bool] = False,
483+
timeout: Optional[int] = None,
484+
):
480485
assert isinstance(storage, SavedDatasetRedshiftStorage)
481486
self.to_redshift(table_name=storage.redshift_options.table)
482487

sdk/python/feast/infra/offline_stores/snowflake.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -531,7 +531,12 @@ def to_spark_df(self, spark_session: "SparkSession") -> "DataFrame":
531531
else:
532532
raise InvalidSparkSessionException(spark_session)
533533

534-
def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False):
534+
def persist(
535+
self,
536+
storage: SavedDatasetStorage,
537+
allow_overwrite: Optional[bool] = False,
538+
timeout: Optional[int] = None,
539+
):
535540
assert isinstance(storage, SavedDatasetSnowflakeStorage)
536541
self.to_snowflake(table_name=storage.snowflake_options.table)
537542

sdk/python/tests/unit/infra/offline_stores/test_offline_store.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,12 @@ def on_demand_feature_views(self) -> List[OnDemandFeatureView]:
6767
"""Returns a list containing all the on demand feature views to be handled."""
6868
pass
6969

70-
def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False):
70+
def persist(
71+
self,
72+
storage: SavedDatasetStorage,
73+
allow_overwrite: bool = False,
74+
timeout: Optional[int] = None,
75+
):
7176
"""
7277
Synchronously executes the underlying query and persists the result in the same offline store
7378
at the specified destination.

0 commit comments

Comments
 (0)