Skip to content

Commit 22c109b

Browse files
feat: Add possibility to save dataset as table, when spark config has remote warehouse info (feast-dev#3645)
feat: add possibility to save dataset as table, when spark config has remote warehouse info Signed-off-by: nsuraeva <nsuraeva@neoflex.ru> Co-authored-by: nsuraeva <nsuraeva@neoflex.ru>
1 parent ff199df commit 22c109b

File tree

1 file changed

+25
-2
lines changed
  • sdk/python/feast/infra/offline_stores/contrib/spark_offline_store

1 file changed

+25
-2
lines changed

sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py

+25-2
Original file line numberDiff line numberDiff line change
@@ -352,13 +352,36 @@ def persist(
352352
):
353353
"""
354354
Run the retrieval and persist the results in the same offline store used for read.
355-
Please note the persisting is done only within the scope of the spark session.
355+
Please note the persisting is done only within the scope of the spark session for local warehouse directory.
356356
"""
357357
assert isinstance(storage, SavedDatasetSparkStorage)
358358
table_name = storage.spark_options.table
359359
if not table_name:
360360
raise ValueError("Cannot persist, table_name is not defined")
361-
self.to_spark_df().createOrReplaceTempView(table_name)
361+
if self._has_remote_warehouse_in_config():
362+
file_format = storage.spark_options.file_format
363+
if not file_format:
364+
self.to_spark_df().write.saveAsTable(table_name)
365+
else:
366+
self.to_spark_df().write.format(file_format).saveAsTable(table_name)
367+
else:
368+
self.to_spark_df().createOrReplaceTempView(table_name)
369+
370+
def _has_remote_warehouse_in_config(self) -> bool:
371+
"""
372+
Check if Spark Session config has info about hive metastore uri
373+
or warehouse directory is not a local path
374+
"""
375+
self.spark_session.sparkContext.getConf().getAll()
376+
try:
377+
self.spark_session.conf.get("hive.metastore.uris")
378+
return True
379+
except Exception:
380+
warehouse_dir = self.spark_session.conf.get("spark.sql.warehouse.dir")
381+
if warehouse_dir and warehouse_dir.startswith("file:"):
382+
return False
383+
else:
384+
return True
362385

363386
def supports_remote_storage_export(self) -> bool:
364387
return self._config.offline_store.staging_location is not None

0 commit comments

Comments
 (0)